Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pybci #70

Closed
LMBooth opened this issue May 23, 2023 · 22 comments
Closed

pybci #70

LMBooth opened this issue May 23, 2023 · 22 comments

Comments

@LMBooth
Copy link

LMBooth commented May 23, 2023

Hey, this is more a discussion point so please do close or remove if necessary, for others who may be interested - i've made a configurable python based brain computer interface (bci) class that gathers epoch data for single/multiple LSL datastreams for a selected number of seconds before and after a marker is received on a given markerstream then a given or custom class for extracting feature data can be used as well as a custom pytorch, sklearn or tensorflow model can be passed for classifying once enough training data is collected.

For anyone looking to do machine learning on LSL data it would be great to get some feedback/input/recommendations for features from others who are using LSL enabled hardware in python. The readthedocs gives a more detailed overview and installation instructions: https://pybci.readthedocs.io/en/latest/,

Git link: https://github.com/LMBooth/pybci

@cboulay
Copy link
Contributor

cboulay commented May 24, 2023

Hi @LMBooth . Nicely done on the project and thank you for contributing to the LSL ecosystem!

I noticed in the documentation that you mentioned performance issues. If you are concerned about performance, I recommend a couple enhancements.

Don't re-allocate

Wherever possible, use pull_chunk with the dest_obj parameter.

Typically you'd create a buffer that is larger than you'd reasonably expect to receive in a given iteration, then pass that to _, timestamps = pull_chunk(..., max_samples=n_samples_in_buffer, dest_obj=buffer), followed by view = buffer[:len(timestamps)].

This is MUCH more efficient then reallocating memory every time new data comes in.
Example.

Checking async

As you probably know, due to the GIL, Python doesn't have true multi-threading unless it's explicitly supported in the underlying C code. While you could use multiprocessing, this is much more complicated than threading, especially for sharing data.

So the first thing to do would be to setup a silly test to see if pull_chunk(timeout=1.0) is truly non-blocking during the timeout.

class MyInlet(threading.Thread):
    def run(self):
        # ... setup inlet for an irregular stream
        while True:
            data, timestamps = my_inlet.pull_chunk(timeout=5.0)
            print(f"Received {len(timestamps)} samples.")

class waiter(Thread):
    def run(self):
        while True:
            print("waiting...")
            time.sleep(1)

def run():
    MyInlet().start()
    time.sleep(1.0)
    waiter().start()

Run an irregular stream, run the Python script, and see if you get 5x waiting... per Received 0 samples.

If it's not blocking then I guess we're done. It's async by default!
If it is blocking then the following might be better:

def run(self):
    # ...
    while not self.closeEvent.is_set():
        _, timestamps = pull_chunk(timeout=0.0, max_samples=n_samples_in_buffer, dest_obj=buffer)
        n_received = len(timestamps)
        if n_received == 0:
            time.sleep(0.1)  # Let the other threads go for a bit.
            continue        
        view = buffer[:len(timestamps)]
        # ...

asyncio

The above is an artificial way of doing async. Python has asyncio that, once accustomed to, might lead to better async programming.

I haven't tested this, but in the simplest case it would look something like this:

async def run(self):
    # ...
    while not self.closeEvent.is_set():
        _, timestamps = pull_chunk(timeout=0.0, max_samples=n_samples_in_buffer, dest_obj=buffer)
        n_received = len(timestamps)
        if n_received == 0:
            await asyncio.sleep(0.1)  # Execution is returned to 
            continue        
        view = buffer[:len(timestamps)]
        # ...

The truly difficult part of using asyncio is making sure that every other long-running function within the same thread is also modified to be a coroutine, which means it either needs to await a Python built-in async function, which might be as simple as adding an await asyncio.sleep.

If you like this pattern then we can talk about making an async_pull_chunk method in pylsl so you don't have to setup that boilerplate.

@dmedine
Copy link
Contributor

dmedine commented May 24, 2023

Just one comment on async and pull operations in LSL: it's currently not possible to do it. The LSL pull functions all block. The only way to introduce actual asynchronous-ness is to introduce a callback function in the api and the underlying implementation to handle events when new data is ready. I once asked about how to do this in the context of the C# wrapper on stack overflow: https://stackoverflow.com/questions/68735759/is-it-possible-to-make-an-asynchronous-c-c-function-awaitable-in-c-sharp-witho

The actual liblsl code that I paraphrase in that post is here: https://github.com/sccn/liblsl/blob/master/src/consumer_queue.h#L75-L106

What Chad suggests is a decent work around (and I do essentially the same thing in my C# wrapper), but what this code actually does is to return control to the calling thread during the sleep times; but, this is happening every so often---perhaps more or less frequently than when new data is actually available. With true async, you only have to ask once and then it calls you back when it is ready.

That being said, not having proper async in LSL probably isn't contributing too much to any performance issues. It is also possible that the infrastructure of event handling, though not blocking, wouldn't necessarily result in a performance enhancement, at least not all the time. So far, no one has complained about this except for me and I only wanted it so that I could say that I have fancy async methods in my custom C# wrapper.

@LMBooth
Copy link
Author

LMBooth commented May 24, 2023

Thankyou for the recommendations above @cboulay and @dmedine , i'll have a look at those and see if i can get some rigid benchmarks down for how i'm doing it now and monitor if i can get any major improvements after the new look at pull_chunk and asyncio.

Streams with regular sample rates i create deque FIFO's with set lengths dictated by the largest time window needed and device sample rate, but for devices with irregular sample rates i create the deque based on a theoretical maximum received sample rate, but due to the inconsistency we have to slice the deques up in a more complicated way relating to each samples corresponding timestamp.

I have an option where one marker on the marker stream can signify a long window of time, this long window can be split up in to smaller windows and overlapped, this is useful when in constant conversion mode where time windows may want to be short, around 0.5-1 seconds. I think this is far more computationally heavy on my AsyncDataReceiverThread.py compared to my DataReceiverThread.py.

in DataReceiverThread the main thing i use is itertools.islice to cut up the FIFOs which isn't too bad.
sliceDataFIFOs = [list(itertools.islice(d, fifoLength - int((self.globalEpochSettings.tmin+self.globalEpochSettings.tmax) * self.sr), fifoLength)) for d in dataFIFOs]

In AsyncDataReceiverThread i use the timestamps as reference to slice up the deques which is where i think overhead is being produced. Sorry if my terminology has been confusing, where i've made AsyncDataReceiver, maybe irregular would be better phrasing. Thankyou for letting me formalise some of these thoughts too as it's giving me ideas of where to look to improve.

I have played with multiprocessing before when i've had large/multiple eeg datasets to crunch and features extract, so maybe having an option to put the classifier thread on a separate core so it only needs to have feature data passed to it on some form of queue and then it can be queried for its feature/classifier data, would likely be handy for more complex model fitting, actually parallelising some of the compute. Writing up a plan on my whiteboard to isolate what's taking the longest computationally in my code at the moment and will report back in a few days/week when i've got some numbers and progress.

@LMBooth
Copy link
Author

LMBooth commented May 24, 2023

We're non blocking!
image

@LMBooth
Copy link
Author

LMBooth commented May 25, 2023

My intuition is wrong, seems the most computationally taxing part is my generic feature extractor, not surprising in hindsight since i was passing it 25 channels of data performing 17 different features on each of them, perhaps i should only select a couple features by default instead of the whole list. Slicing up the data on regular or irregular data streams seems super quick, typically less than 1ms in both cases for the regular and irregular devices.

The other main factor is the classifiers. The sklearn model training and testing can be really quick, ~20ms-100ms and testing a feature set is around ~5-50ms for some simple svm models, the tensorflow models can can take alot longer (>60s in some cases) when first fitting, then around 1-2 seconds after adding single epochs to the model, and around 0.5-1 seconds for predicting a label. Based on that i think i'll work on putting the classifier stuff in a separate core, then have data able to queue in, with the user able to query the classifier core through the pybci object, i.e. ask for current estimates or get current classifier info/feature data or see if classifying is complete or not.

Thinking about multi-processing and asking for any thoughts @cboulay and @dmedine, i think i should allow the user to instantiate on one core which then creates the marker+data+feature threads on another core then classifier on yet another core, My thinking being the end user would potentially feel the least 'blocking' and 'lag'?

@dmedine
Copy link
Contributor

dmedine commented May 25, 2023

In general (imho) forcing processes to run on specific cores is a bad idea. For one thing, it may crash your computer. Another thing is that there is a bottleneck in throughput that comes with sharing memory between CPUs. The benefit from running a process on another CPU core may not be as great as you think and then you have to shuttle memory between CPUs stacks and the heap---which is slow.

As far as my limited understanding of computer architecture and modern OS resource management goes, a multi-core CPU is treated basically the same way a single-core CPU is. Your operating system won't let one core idle while another one works, it will timeshare threads wherever its scheduling algorithm decides it should. This means that there is no way of knowing what core is going to be a good choice for your thread. The only situation where this would really make sense is running on bare metal where you don't have an operating system handling thousands of processes all the time. I would let the operating system handle the assignment of resources because 99% of the time, it will do a better job than any person.

That being said, with deep neuro-nets, you get big speed up by running the model on a graphics card. However, you have the same issue with handing memory back and forth so you need to do that as infrequently as possible.

One thing you can also do is set the thread priority to the highest one your OS has to offer.

At the end of the day, it probably isn't realistic (or necessary) to run all the classifiers under the sun all the time in real time (maybe I misunderstand you, and this isn't what you're doing).

@dmedine
Copy link
Contributor

dmedine commented May 25, 2023

Another comment I have is that this test doesn't really tell you anything about blocking. Those threads are not sharing any data so there is no reason to believe they would block each other. The calls to pull*'s underlying implementation do block in the sense that they do not hand control back to the caller. If pull calls were in fact non-blocking, there would be 10s of 1000s of print outs of 'Received 0 samples' between the print out of 'waiting...'. Calling thread.start, however, does hand control back because it returns after the thread is created and added to the pool. This is why you can kick off the two threads sequentially and let them run simultaneously.

However, there will not always be 5 'waiting...' per 'Received 0 samples', because sleep times are not accurate. Sometimes there will be 4 or 6 (and perhaps other numbers). This is one reason that synchronization can be desirable. The main reason you want synchronicity---or callbacks, which will also prevent this---is deadlock and race conditions. What you really don't want is somebody trying to write to 'data' while somebody else is reading from it. Usually a process will crash if you try to do this, which is better than indeterminate behavior (race condition) or two threads waiting on each other for a shared resource (deadlock).

@cboulay
Copy link
Contributor

cboulay commented May 25, 2023

As David mentioned, I think you need to separate the idea of 'process' and 'core' in this discussion. Doing things in multiple processes is great. Pinning processes to cores is a bad idea unless you absolutely need to do it, which is probably only true on embedded systems.

To your questions:

Almost always you'll want the main thread (in c++, or main process in Python) to be responsive to user input so it makes sense to have the user instantiate there.

In normal circumstances, I would thoroughly support having a thread / process dedicated to data ingest. However, with LSL that isn't really necessary. liblsl handles buffering the data for you (default is 6 minutes!) so you don't have to worry about pulling consistently to clean out the buffer. It's OK to let the data ingest thread/process have variable iteration intervals.

That being said, you'll want your data handling code to be amenable to different-sized chunks. Sometimes you'll get 20 msec of data, sometimes 200 msec. If your signal processing pipeline can only handle exactly 20 msec chunks, you're in for a bad time having to have lots of loops.

IMO the best trick to reduce the user's perception of lag is to use your decoder output to parameterize a state-space model, then a regularly-updating thread/process will ask the state space model to predict the effector state (mouse position, robot arm kinematics, etc.) XXX msec from now, where XXX is the average system lag between the time a sample arrives at the digitizer and the time the effector state gets updated, minus the normal latency from intention update in the brain to (e.g.,) finger movement (25 - 100 msec depending on muscle group).

@cboulay
Copy link
Contributor

cboulay commented May 25, 2023

@dmedine , thanks for pointing out the subtlety in asynchronous and non-blocking.

That test is only meant to test Python's Global Interpreter Lock. e.g., if you were to replace pull_chunk(timeout=5.0) with something that didn't release the lock, you might only see one printout from the waiter thread between each iteration of the inlet thread.

It's not always clear which operations release the GIL. I know time.sleep is one of them so that's why I suggested the alternative of timeout=0 with an explicit sleep.

@LMBooth , can you repeat the above test but in the print statements please add , time.time()?

@dmedine
Copy link
Contributor

dmedine commented May 25, 2023

Ah. I was not familiar with the GIL until now. I'm a pretty terrible python programmer. That test makes much more sense now.

@LMBooth
Copy link
Author

LMBooth commented May 25, 2023

Ahhh, i was misunderstanding the working of multiprocessing, i thought it pinned the operations to cores, thanks for clarifying - i was hoping there'd be some miracle parallel compute i could exploit.

"At the end of the day, it probably isn't realistic (or necessary) to run all the classifiers under the sun all the time in real time (maybe I misunderstand you, and this isn't what you're doing)."
Sorry if i'm not being clear in my descriptions, only one machine learning model is tested or trained at a time.

So far i've been using time windows around 0.5-3 seconds for varying epochs lengths which is handled in the signal processing pipeline, it would be interesting to test the limits of this for smaller windows and see where it starts falling apart.

With the time added to the print:
image

@dmedine
Copy link
Contributor

dmedine commented May 25, 2023

Ahhh, i was misunderstanding the working of multiprocessing, i thought it pinned the operations to cores, thanks for clarifying - i was hoping there'd be some miracle parallel compute i could exploit.

Only in the case of using the GPU for highly parallel computations such as the outputs of neurons in a DNN model.

@LMBooth
Copy link
Author

LMBooth commented May 25, 2023

Ahhh, i was misunderstanding the working of multiprocessing, i thought it pinned the operations to cores, thanks for clarifying - i was hoping there'd be some miracle parallel compute i could exploit.

Only in the case of using the GPU for highly parallel computations such as the outputs of neurons in a DNN model.

I'll have a look at this tomorrow, see how long it takes to get data in and out of the GPU and see how much quicker on average it can perform the fitting and predicting. Thanks for the recommendation, i'll report back when i've had chance!

@dmedine
Copy link
Contributor

dmedine commented May 25, 2023

I have muddied the waters with my ignorance of python. You are correct that multiprocessing (the python module) will put the spawned process on a new core (if it can). If your program can guarantee that you have fewer processes than cores, then you should by all means use this---but I still think it might be dangerous. One of my laptops has 12 cores, another has 4. If you spawn more processes than cores you have, the scheduler will make it wait (according to a so post that I read: https://stackoverflow.com/questions/65810983/multiprocessing-in-python-vs-number-of-cores) that means that code I write on laptop A will not work the same way on laptop B.

However, my intuition tells me that this is only part of the story. The notion of a 'process' is a CPU construct. That means that it encapsulates many different instructions that get handled per CPU cycle. The OS is therefor able to run >1 process on a single core and may indeed choose to do so. Right now there are about 250 'processes' that are 'running' (most of them are currently consuming 0 percent of my CPU) on my 12 core machine. If your code has enough moments of idling, the CPU might want to interleave it with another one of your processes on the same core.

What your OS will not want to do is switch contexts (move one process, or thread to another CPU) because this requires copying CPU cache to RAM and back to CPU cache on the new core. But it will still do it if the scheduler decides it needs to.

I should add that every time I have ever tried to force my computer to run all the cores at once it crashed. However, I was doing that with the Linux's posix threads that allows one to force the OS to pin a thread to a chosen CPU core. Your OS hates you for it and will die. I would imagine that the python team made sure this wouldn't happen when it made that module. Sorry for the confusion.

@LMBooth
Copy link
Author

LMBooth commented May 25, 2023

That's fine, it's good to discuss in more detail and learn together. I'm going to have a play around with a few things with multiprocessing and passing stuff to the gpu to see if i can improve where things are taking the longest. Thanks for the ideas guys! I remember putting a catch in when playing with the multiprocessing before to do a check of the number of cores and then only opening so many dependant on so many less than that number, so i'd likely look to make the feature optional, that is if i even see any major improvement in latency of classification to action, it could be that sufficient pruning(dropping unnecessary channels and features) and choosing simpler models for near-real time classification.

Regarding the state-space estimation stuff, i like the sound of that, though that was the one third year module i did not do too well on, could be time to brush up.

@LMBooth
Copy link
Author

LMBooth commented Jun 3, 2023

Hi again @cboulay, just wanted to say thanks again for your recommendations, i've replaced my two threads for handling 0Hz devices and normal sample rate devices with a single data slicing threads with pre-allocated numpy arrays and pull_chunk instead of pulling each sample and piping into deque FIFO's and i believe it's much more efficient now. Added some levels of logging so it's possible to time some functionality, namely classifier training and testing time as well as feature extraction time. Finally added and tested Pytorch to work now too. Time to update the documentation.

@cboulay
Copy link
Contributor

cboulay commented Oct 22, 2023

@LMBooth , FYI I ran into this great library that I think you would like: https://riverml.xyz/

I think they didn't prioritize efficiency because most of their use-cases are in a domain where data ingestion is slower than decoding. Nevertheless, it's got a great API for real-time signal processing.

@LMBooth
Copy link
Author

LMBooth commented Oct 24, 2023

Oooo, thankyou for recommending @cboulay! I've got a ML accel+gyro project coming up after i finish my current investigation in a month or two, so i'll have a play with riverml to see how it compares!

@LMBooth
Copy link
Author

LMBooth commented Apr 4, 2024

Hi @cboulay, i was wondering if you've come across labgraph and had any experience to speak of?

@cboulay
Copy link
Contributor

cboulay commented Apr 5, 2024

Yes, I liked what I read, and the API was pretty nice. If only there were something similar but wasn't so difficult to get working... ;)

About 6 months ago I discovered ezmsg and I've been using it and contributing to it ever since. ezmsg is a reimagining of LabGraph in pure Python, but is somehow faster! Its API is almost identical to LabGraph's API.

I've been evangelizing ezmsg ever since and I have created some documentation and intro materials for it. For example, here is a list of other ezmsg-namespaced packages:

  • ezmsg-vispy -- Package + examples for visualizing running ezmsg data.
  • ezmsg-panel -- Use Panel web framework with ezmsg.
  • ezmsg-blackrock -- Receive data from Blackrock Neuroport via pycbsdk
  • ezmsg-unicorn -- Receive data from g.tec Unicorn through reverse-engineered bluetooth protocol.
  • ezmsg-lsl -- Send/Receive data to/from LSL streams.
  • ezmsg-openbci -- Receive data from OpenBCI Cyton
  • ezmsg-gadget -- Run ezmsg on a Raspberry Pi and turn it into a keyboard and mouse.
  • ezmsg-ssvep
  • ezmsg-tasks -- Psychophysics tasks using Panel web framework
  • bcpi -- Full BCI system on Raspberry Pi that leverages ezmsg

@LMBooth
Copy link
Author

LMBooth commented Apr 5, 2024

Awesome, thanks for recommending! ezmsg looks ezier then labgraph to get my head round, does as it says on the tin. (Cthulhu is a scary name to give a submodule)

Once i have some free time i do plan to update pybci to optionally output marker classifications to the LSL, then was shown LabGraph and thought maybe i could expand to other modules too.

@cboulay
Copy link
Contributor

cboulay commented Jul 3, 2024

Closing because I think the original issue is resolved, but happy to continue conversing.

@cboulay cboulay closed this as completed Jul 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants