Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_peak_sample #73

Closed
RESP3CT88 opened this issue Mar 23, 2022 · 17 comments
Closed

get_peak_sample #73

RESP3CT88 opened this issue Mar 23, 2022 · 17 comments

Comments

@RESP3CT88
Copy link

RESP3CT88 commented Mar 23, 2022

I am using get_peak_sample() to display the sound level of a playing sample.
But I notice that everytime it runs it floods the pulseaudio events with:

Pulse event: facility=<EnumValue event-facility=source>, index=23, t=<EnumValue event-type=change>
Pulse event: facility=<EnumValue event-facility=source_output>, index=1933, t=<EnumValue event-type=remove>
Pulse event: facility=<EnumValue event-facility=client>, index=403, t=<EnumValue event-type=remove>

When an actual sink/source port is changed it also sends similar event like so:

Pulse event: facility=<EnumValue event-facility=sink>, index=23, t=<EnumValue event-type=change>
Pulse event: facility=<EnumValue event-facility=sink_input>, index=43, t=<EnumValue event-type=remove>
Pulse event: facility=<EnumValue event-facility=client>, index=413, t=<EnumValue event-type=remove>

I need a reliable way to determine when a sink/source port and card profile is updated .

@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

I'd try something like this:

  • List sinks/sources/cards and find indexes of the things you want to track there, as well as current port/profile value.
  • Upon receiving change event, check facility and index, if they don't match objects you want, discard.
  • If event matches the thing you want, fetch info of that thing, check port/profile for changes.

Two issues exhibited here:

  • Pulseaudio doesn't tell you what changed beyond object type and index, hence the last step where you have to manually poll for change if object is of interest.

  • Reentrancy - can't be running pulse calls from a callback with this blocking module.
    You can work around this with threads or some kind of delayed checks, or maybe better idea would be using asyncio module linked in the README.

Not sure if I understand the problem correctly though.

@RESP3CT88
Copy link
Author

RESP3CT88 commented Mar 23, 2022

Yes, I am using multiple threads. One for reading pulse events, one for reading sink sound level with get_subscribe_peak, and another one for reading source sound level with get_subscribe_peak.

Everytime I run get_subscribe_peak it emits many sink and source change events on the index values i'm interested in. I need to run get_subscribe_peak multiple times in a second to display a smooth sound meter.

So unfortunately, with your solution I would be checking port/profile for changes multiple times a second, which isn't efficient. I would be better off polling for changes every one second instead.

I wish the pulseaudio client name came across in the events, then I could use the client name to ignore the events generated by get_subscribe_peak.

@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

You can debounce these events easily - i.e. upon receiving event, check it once, delay next check to 200ms minimum.
Don't think there's anything you can do about other events from things that legitimately affect these sinks/sources.

@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

You can debounce these events easily - i.e. upon receiving event, check it once, delay next check to 200ms minimum.

With threads or asyncio, I'd probably do it via something like:

ts_last_check, min_delay = 0, 0.2
event = [threading/asyncio].Event()
while True:
  obj_id = event.wait()
  time.sleep(max(0, min_delay - (time.monotonic() - ts_last_check)))
  ts_last_check = time.monotonic()
  do_stuff_if_port_or_profile_changed(obj_id)

...where you run event.set(obj_id) from the other thread/coroutine/etc that listens for those and does the id-filtering.

@RESP3CT88
Copy link
Author

RESP3CT88 commented Mar 23, 2022

Sorry I meant to say get_peak_sample rather than get_subscribe_peak in my last reply.

I'm using threading using the PySide6 library (Qt6). I'm trying to understand your example.

This is what my thread looks like that runs get_peak_sample() for reading the sound levels of a sink. I need to call get_peak_sample() multiple times a second to get the volume level between 0.0 and 1.0. But don't want the pulseaudio events generated from it. I understand that this is what debouncing does, I am just not sure where to intercept the events in order to delay them before reading them with event_callback_set(print_events).

# Thread that checks sink sound level
class AudioTestSinkMonitor(QObject):
    def start(self):
        with pulsectl.Pulse() as pulse:
            while self.monitor:
                logging.info(str(pulse.get_peak_sample(pulse.sink_list()[0].monitor_source_name, 0.2)))

Where are you suggesting that I can debounce the events that are generated from me runing get_peak_sample in my example?

@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

What generates events doesn't really matter for the purposes of such debouncing, idea is only to enforce min 200ms interval between any checks via simple time.sleep(), regardless of why things change in pulse rapidly.

Pretty sure if you'll look at the code that handles pulse events or get to implementing it, you'll figure it out, regardless of whether it'll be anything like my example :)

@mk-fg mk-fg closed this as completed Mar 23, 2022
@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

I need to call get_peak_sample() multiple times a second to get the volume level between 0.0 and 1.0

That sounds like a bit strange thing to do, but idk what the app does, so maybe necessary.

But don't want the pulseaudio events generated from it.
I understand that this is what debouncing does

No no, it doesn't suppress events or whatever, only supposed to delay the check for port/profile that you do in response for these, or at least that was the idea in my example.
(oh, also, maybe use mutable obj_id_set there, so that handler can check multiple things piled-on there and not loose anything, though again, it's just an example, there're infinite ways to express things in code)

@RESP3CT88
Copy link
Author

RESP3CT88 commented Mar 23, 2022

Yes, understood.

My app is simply displaying a bar like this when music is playing:
https://blog.frame.io/wp-content/uploads/2018/01/Premiere-Pro-Audio-Tools-10-Audio-Meters.gif

Should I not be using get_peak_sample?

@mk-fg
Copy link
Owner

mk-fg commented Mar 23, 2022

Hm, yeah, that's kinda what pavucontrol tool does too.

It uses same thing as get_peak_sample, but it doesn't ever close that sampling stream, which I guess is how to best do it in your case as well.
Instead of constantly open/read-N-samples/close in a loop with get_peak_sample, maybe take that function and run it basically forever, updating GUI bars from e.g. "samples within last 100 ms", which you can easily store in some ring buffer (a list with index going 0-1-2-3-...-0-1-2-3-...).

Would be way more efficient and none of those pesky change events!

@RESP3CT88
Copy link
Author

Yes that would be great! I'll look into rewriting that function to avoid all those change events. Thanks!

@RESP3CT88
Copy link
Author

@mk-fg

Any ideas on how to change this function to run forever? It uses c.pa.stream_set_read_callback(s, read_cb, None) to point to a callback function.

class PulseNew(Pulse):
    def __init__(self, client_name=None, server=None, connect=True, threading_lock=False):
        super().__init__(client_name, server, connect, threading_lock)

    def get_peak_sample(self, source, timeout, stream_idx=None):
        samples, proplist = [0], c.pa.proplist_from_string('application.id=org.PulseAudio.pavucontrol')
        ss = c.PA_SAMPLE_SPEC(format=c.PA_SAMPLE_FLOAT32BE, rate=25, channels=1)
        s = c.pa.stream_new_with_proplist(self._ctx, 'peak detect', c.byref(ss), None, proplist)
        c.pa.proplist_free(proplist)

        @c.PA_STREAM_REQUEST_CB_T
        def read_cb(s, bs, userdata):
            buff, bs = c.c_void_p(), c.c_int(bs)
            c.pa.stream_peek(s, buff, c.byref(bs))
            try:
                if not buff or bs.value < 4: return
                # This assumes that native byte order for floats is BE, same as pavucontrol
                samples[0] = max(samples[0], c.cast(buff, c.POINTER(c.c_float))[0])
                        
            finally:
                # stream_drop() flushes buffered data (incl. buff=NULL "hole" data)
                # # stream.h: "should not be called if the buffer is empty"
                if bs.value: c.pa.stream_drop(s)
                
        if stream_idx is not None: c.pa.stream_set_monitor_stream(s, stream_idx)
        c.pa.stream_set_read_callback(s, read_cb, None)
        if source is not None: source = str(source).encode('utf-8')
        
        try:
            c.pa.stream_connect_record( s, source,
                    c.PA_BUFFER_ATTR(fragsize=4, maxlength=2**32-1),
                    c.PA_STREAM_DONT_MOVE | c.PA_STREAM_PEAK_DETECT |
                    c.PA_STREAM_ADJUST_LATENCY | c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND )
        except c.pa.CallError:
            c.pa.stream_unref(s)
            raise

        try: self._pulse_poll(timeout)
        finally:
            try: c.pa.stream_disconnect(s)
            except c.pa.CallError: pass # stream was removed

        return min(1.0, samples[0])

@mk-fg
Copy link
Owner

mk-fg commented Mar 24, 2022

I'd do it something like this:

import threading, array, itertools as it
import pulsectl, pulsectl._pulsectl as c


class SamplerThread(threading.Thread):
  def __init__(self, sample_count, **sampler_opts):
    super().__init__(name='sampler', daemon=True)
    self._samples = array.array('d', b'\0'*8*sample_count)
    self._sampler_opts = sampler_opts

  def run(self):
    with pulsectl.Pulse('sampler') as pulse:
      self.run_sampler(pulse, self._samples, **self._sampler_opts)

  def run_sampler(self, pulse, samples, source, sample_rate=25, stream_idx=None):
    proplist = c.pa.proplist_from_string('application.id=org.PulseAudio.pavucontrol')
    ss = c.PA_SAMPLE_SPEC(format=c.PA_SAMPLE_FLOAT32BE, rate=sample_rate, channels=1)
    s = c.pa.stream_new_with_proplist(pulse._ctx, 'peak detect', c.byref(ss), None, proplist)
    c.pa.proplist_free(proplist)
    sample_ptrs = it.chain.from_iterable(map(range, it.repeat(len(samples))))

    @c.PA_STREAM_REQUEST_CB_T
    def read_cb(s, bs, userdata):
      buff, bs = c.c_void_p(), c.c_int(bs)
      c.pa.stream_peek(s, buff, c.byref(bs))
      try:
        if not buff or bs.value < 4: return
        samples[next(sample_ptrs)] = c.cast(buff, c.POINTER(c.c_float))[0]
      finally:
        if bs.value: c.pa.stream_drop(s)

    if stream_idx is not None: c.pa.stream_set_monitor_stream(s, stream_idx)
    c.pa.stream_set_read_callback(s, read_cb, None)
    if source is not None: source = str(source).encode('utf-8')

    try:
      c.pa.stream_connect_record( s, source,
        c.PA_BUFFER_ATTR(fragsize=4, maxlength=2**32-1),
        c.PA_STREAM_DONT_MOVE | c.PA_STREAM_PEAK_DETECT |
        c.PA_STREAM_ADJUST_LATENCY | c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND )
    except c.pa.CallError:
      c.pa.stream_unref(s)
      raise

    try: pulse._pulse_poll()
    finally:
      try: c.pa.stream_disconnect(s)
      except c.pa.CallError: pass

  def get_peak(self): return max(self._samples)


### ...loop to occasionally get current peak value somewhere in the main thread

import re, time, datetime

def main_app():
  with pulsectl.Pulse('myapp') as pulse:
    sink = next( sink for sink in pulse.sink_list()
      if re.search(r'^alsa_output\.pci-.*\.analog-stereo$', sink.name) )
    sampler = SamplerThread(10, source=sink.monitor_source)

    sampler.start()
    peak_alltime = 0
    print(f'Current volume on: {sink.description} [{sink.name}]')
    while True:
      time.sleep(0.2)
      peak = sampler.get_peak()
      peak_alltime = max(peak_alltime, peak)
      n = round(100 * peak / peak_alltime)
      print('\r[ ' + '#'*n + ' '*(100-n) + ' ]', end='', flush=True)

main_app()

But you didn't seem to even try to do anything there, so idk if getting more examples will help in the same way, unless they fit into your code exactly :)

@RESP3CT88
Copy link
Author

RESP3CT88 commented Mar 24, 2022

This is very helpful, it works great for sinks, thanks!

I have tried adapting your example to work for a source (microphone), it works properly for about 5 seconds and then PulseAudio suspends the source. See here: https://i.imgur.com/tMrZuBy.gif

Here is the log output of PulseAudio, when it stops working:

I: [pulseaudio] module-suspend-on-idle.c: Source alsa_input.pci-0000_00_1f.3.analog-stereo idle for too long, suspending ...
I: [alsa-source-ALC3235 Analog] alsa-source.c: Device suspended...
D: [pulseaudio] source.c: alsa_input.pci-0000_00_1f.3.analog-stereo: suspend_cause: (none) -> IDLE
D: [pulseaudio] source.c: alsa_input.pci-0000_00_1f.3.analog-stereo: state: RUNNING -> SUSPENDED
I: [pulseaudio] core.c: All sinks and sources are suspended, vacuuming memory

It seems PulseAudio thinks the source/microphone is idle, so it suspends it. This happens even when I am actively talking into the microphone.

If I remove c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND from the call to c.pa.stream_connect_record, then it works properly and does not suspend the source. I just don't know if this is the best way.

See below for how I have changed your example to work with a source.

import threading, array, itertools as it
import pulsectl, pulsectl._pulsectl as c


class SamplerThread(threading.Thread):
  def __init__(self, sample_count, **sampler_opts):
    super().__init__(name='sampler', daemon=True)
    self._samples = array.array('d', b'\0'*8*sample_count)
    self._sampler_opts = sampler_opts

  def run(self):
    with pulsectl.Pulse('sampler') as pulse:
      self.run_sampler(pulse, self._samples, **self._sampler_opts)

  def run_sampler(self, pulse, samples, source, sample_rate=25, stream_idx=None):
    proplist = c.pa.proplist_from_string('application.id=org.PulseAudio.pavucontrol')
    ss = c.PA_SAMPLE_SPEC(format=c.PA_SAMPLE_FLOAT32BE, rate=sample_rate, channels=1)
    s = c.pa.stream_new_with_proplist(pulse._ctx, 'peak detect', c.byref(ss), None, proplist)
    c.pa.proplist_free(proplist)
    sample_ptrs = it.chain.from_iterable(map(range, it.repeat(len(samples))))

    @c.PA_STREAM_REQUEST_CB_T
    def read_cb(s, bs, userdata):
      buff, bs = c.c_void_p(), c.c_int(bs)
      c.pa.stream_peek(s, buff, c.byref(bs))
      try:
        if not buff or bs.value < 4: return
        samples[next(sample_ptrs)] = c.cast(buff, c.POINTER(c.c_float))[0]
      finally:
        if bs.value: c.pa.stream_drop(s)

    if stream_idx is not None: c.pa.stream_set_monitor_stream(s, stream_idx)
    c.pa.stream_set_read_callback(s, read_cb, None)
    if source is not None: source = str(source).encode('utf-8')

    try:
      c.pa.stream_connect_record( s, source,
        c.PA_BUFFER_ATTR(fragsize=4, maxlength=2**32-1),
        c.PA_STREAM_DONT_MOVE | c.PA_STREAM_PEAK_DETECT |
        c.PA_STREAM_ADJUST_LATENCY | c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND )
    except c.pa.CallError:
      c.pa.stream_unref(s)
      raise

    try: pulse._pulse_poll()
    finally:
      try: c.pa.stream_disconnect(s)
      except c.pa.CallError: pass

  def get_peak(self): return max(self._samples)


### ...loop to occasionally get current peak value somewhere in the main thread

import re, time, datetime

def main_app():
  with pulsectl.Pulse('myapp') as pulse:
    source = next( source for source in pulse.source_list()
      if re.search(r'^alsa_input\.pci-.*\.analog-stereo$', source.name) )
    sampler = SamplerThread(10, source=source.name)

    sampler.start()
    peak_alltime = 0
    print(f'Current volume on: {source.description} [{source.name}]')
    while True:
      time.sleep(0.2)
      peak = sampler.get_peak()
      peak_alltime = max(peak_alltime, peak)
      n = round(100 * peak / peak_alltime)
      #print('\r[ ' + '#'*n + ' '*(100-n) + ' ]', end='', flush=True)
      print(peak/peak_alltime)

main_app()

@mk-fg
Copy link
Owner

mk-fg commented Mar 25, 2022

If I remove c.PA_STREAM_DONT_INHIBIT_AUTO_SUSPEND from the call to c.pa.stream_connect_record, then it works properly and does not suspend the source

Hm, I think maybe it suspends the source because of some other stream flags in combination with this one.

But you don't have to guess, just check pulseadio code, like grep for that exact "PA_STREAM_DONT_INHIBIT" thing which will get you to a place - likely in somefile like source.c - with couple if's which control all the logic and can see exactly why/when stuff gets suspended.
There are also docs ofc, and headers usually document stuff extensively, but I often find it easier to check how it's coded by reading a couple of "if" statements :)

@mhthies
Copy link
Contributor

mhthies commented Mar 25, 2022

I already encountered that issue in pulsectl-asyncio and recently added an option to set this flag optionally.

The PulseAudio flag actually does exactly what it says: Usually, PulseAudio suspends sources and sinks, when for a few seconds no stream is running. Each running stream, related to that source/sink, will inhibit the suspension, unless it has the DONT_INHIBIT_AUTO_SUSPEND flag set.

Typically, you want to inhibit suspension when monitoring source devices (so the source doesn't get suspended if no other application is listening to it), but you don't want to inhibit suspension when monitoring a sink's monitor source (so the sink can get suspended when no application is playing audio to it).

@mk-fg
Copy link
Owner

mk-fg commented Mar 25, 2022

Ah, and I guess @RESP3CT88 knows about it, as they've filed that issue as well :)

@RESP3CT88
Copy link
Author

Thanks a lot guys! Your tips are helpful and I'm very grateful for the support. Btw, both of your projects are so amazing I have used both :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants