From 1f3097920de039650bdace5b39ba126bb06e7b34 Mon Sep 17 00:00:00 2001 From: Pedro Furrix Date: Thu, 6 Nov 2025 10:47:47 +0000 Subject: [PATCH 1/7] add callable stop flag to event listener --- src/open_ephys/streaming/README.md | 3 +-- src/open_ephys/streaming/event_listener.py | 20 +++++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/open_ephys/streaming/README.md b/src/open_ephys/streaming/README.md index d5f5434..77eab6c 100644 --- a/src/open_ephys/streaming/README.md +++ b/src/open_ephys/streaming/README.md @@ -83,5 +83,4 @@ stream.start(ttl_callback=ttl_callback, spike_callback=spike_callback) ``` -To stop listening, press `ctrl-C`. - +To stop listening, call the `stream.stop()` method from another thread or press `ctrl-C`. diff --git a/src/open_ephys/streaming/event_listener.py b/src/open_ephys/streaming/event_listener.py index a67382f..35cbfa6 100644 --- a/src/open_ephys/streaming/event_listener.py +++ b/src/open_ephys/streaming/event_listener.py @@ -99,6 +99,7 @@ def __init__(self, ip_address="127.0.0.1", port=5557): self.socket = self.context.socket(zmq.SUB) self.socket.connect(self.url) self.socket.setsockopt(zmq.SUBSCRIBE, b"") + self.running = False # Stop flag print("Initialized EventListener at " + self.url) @@ -120,8 +121,9 @@ def start( """ print("Starting EventListener") - - while True: + self.running = True # Set running flag + + while self.running: try: parts = self.socket.recv_multipart() @@ -133,7 +135,15 @@ def start( spike_callback(info) else: ttl_callback(info) - except KeyboardInterrupt: - print() # Add final newline - break + print("Stopped by KeyboardInterrupt") # Add final newline + break + except Exception as e: + print(f"Error: {e}") + print("EventListener stopped.") + print("") + + def stop(self): + """Call this method to stop the listener""" + self.running = False + From 0fd5cc8d0eff8d389466f1c59b42ad11b56391ac Mon Sep 17 00:00:00 2001 From: Josh Siegle Date: Thu, 6 Nov 2025 15:05:33 -0800 Subject: [PATCH 2/7] Increment version to 1.0.1 --- src/open_ephys/__init__.py | 2 +- src/open_ephys/streaming/event_listener.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/open_ephys/__init__.py b/src/open_ephys/__init__.py index 5becc17..5c4105c 100755 --- a/src/open_ephys/__init__.py +++ b/src/open_ephys/__init__.py @@ -1 +1 @@ -__version__ = "1.0.0" +__version__ = "1.0.1" diff --git a/src/open_ephys/streaming/event_listener.py b/src/open_ephys/streaming/event_listener.py index 35cbfa6..483406b 100644 --- a/src/open_ephys/streaming/event_listener.py +++ b/src/open_ephys/streaming/event_listener.py @@ -122,7 +122,7 @@ def start( print("Starting EventListener") self.running = True # Set running flag - + while self.running: try: parts = self.socket.recv_multipart() @@ -136,8 +136,8 @@ def start( else: ttl_callback(info) except KeyboardInterrupt: - print("Stopped by KeyboardInterrupt") # Add final newline - break + print("Stopped by KeyboardInterrupt") # Add final newline + break except Exception as e: print(f"Error: {e}") print("EventListener stopped.") @@ -146,4 +146,3 @@ def start( def stop(self): """Call this method to stop the listener""" self.running = False - From 2b44a762d9b60e4979fbdbdf30201e97498cce0e Mon Sep 17 00:00:00 2001 From: Josh Siegle Date: Thu, 6 Nov 2025 15:13:54 -0800 Subject: [PATCH 3/7] Remove print statement --- src/open_ephys/streaming/event_listener.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/open_ephys/streaming/event_listener.py b/src/open_ephys/streaming/event_listener.py index 483406b..303aace 100644 --- a/src/open_ephys/streaming/event_listener.py +++ b/src/open_ephys/streaming/event_listener.py @@ -140,8 +140,7 @@ def start( break except Exception as e: print(f"Error: {e}") - print("EventListener stopped.") - print("") + break def stop(self): """Call this method to stop the listener""" From d295d9667bda3b0c8752ff8f8390642ddac5b026 Mon Sep 17 00:00:00 2001 From: Josh Siegle Date: Thu, 6 Nov 2025 15:40:30 -0800 Subject: [PATCH 4/7] Use threading.Event() rather than boolean to stop EventListener --- src/open_ephys/streaming/event_listener.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/open_ephys/streaming/event_listener.py b/src/open_ephys/streaming/event_listener.py index 303aace..c4dcc5f 100644 --- a/src/open_ephys/streaming/event_listener.py +++ b/src/open_ephys/streaming/event_listener.py @@ -24,6 +24,7 @@ import zmq import json +import threading def default_spike_callback(info): @@ -99,7 +100,8 @@ def __init__(self, ip_address="127.0.0.1", port=5557): self.socket = self.context.socket(zmq.SUB) self.socket.connect(self.url) self.socket.setsockopt(zmq.SUBSCRIBE, b"") - self.running = False # Stop flag + self.socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout + self._stop_event = threading.Event() print("Initialized EventListener at " + self.url) @@ -121,9 +123,9 @@ def start( """ print("Starting EventListener") - self.running = True # Set running flag + self._stop_event.clear() - while self.running: + while not self._stop_event.is_set(): try: parts = self.socket.recv_multipart() @@ -135,8 +137,11 @@ def start( spike_callback(info) else: ttl_callback(info) + except zmq.Again: + # Timeout occurred, continue loop to check stop flag + continue except KeyboardInterrupt: - print("Stopped by KeyboardInterrupt") # Add final newline + print("Stopped by KeyboardInterrupt") break except Exception as e: print(f"Error: {e}") @@ -144,4 +149,4 @@ def start( def stop(self): """Call this method to stop the listener""" - self.running = False + self._stop_event.set() From a935a3b5fe52d233af814564a30309241dcff344 Mon Sep 17 00:00:00 2001 From: Pedro Furrix Date: Fri, 7 Nov 2025 10:29:16 +0000 Subject: [PATCH 5/7] update _event_ instead of stop flag --- src/open_ephys/streaming/event_listener.py | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/open_ephys/streaming/event_listener.py b/src/open_ephys/streaming/event_listener.py index 35cbfa6..4f6338b 100644 --- a/src/open_ephys/streaming/event_listener.py +++ b/src/open_ephys/streaming/event_listener.py @@ -24,7 +24,7 @@ import zmq import json - +import threading def default_spike_callback(info): """ @@ -99,8 +99,8 @@ def __init__(self, ip_address="127.0.0.1", port=5557): self.socket = self.context.socket(zmq.SUB) self.socket.connect(self.url) self.socket.setsockopt(zmq.SUBSCRIBE, b"") - self.running = False # Stop flag - + self.socket.setsockopt(zmq.RCVTIMEO, 1000) # 1 second timeout + self._stop_event = threading.Event() print("Initialized EventListener at " + self.url) def start( @@ -121,9 +121,9 @@ def start( """ print("Starting EventListener") - self.running = True # Set running flag - - while self.running: + self._stop_event.clear() # Clear stop event + + while not self._stop_event.is_set(): try: parts = self.socket.recv_multipart() @@ -135,15 +135,17 @@ def start( spike_callback(info) else: ttl_callback(info) + except zmq.Again: + # Timeout occurred, continue loop to check stop flag + continue except KeyboardInterrupt: - print("Stopped by KeyboardInterrupt") # Add final newline - break + print("Stopped by KeyboardInterrupt") + break except Exception as e: print(f"Error: {e}") - print("EventListener stopped.") - print("") + print("EventListener stopped") def stop(self): """Call this method to stop the listener""" - self.running = False + self._stop_event.set() From 4bbc4cfa4bdaeca0e1ad4dc0d742d26dc9fdda64 Mon Sep 17 00:00:00 2001 From: Josh Siegle Date: Fri, 7 Nov 2025 15:17:25 -0800 Subject: [PATCH 6/7] Add threading example to streaming README --- src/open_ephys/streaming/README.md | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/open_ephys/streaming/README.md b/src/open_ephys/streaming/README.md index 77eab6c..55eed5d 100644 --- a/src/open_ephys/streaming/README.md +++ b/src/open_ephys/streaming/README.md @@ -83,4 +83,32 @@ stream.start(ttl_callback=ttl_callback, spike_callback=spike_callback) ``` -To stop listening, call the `stream.stop()` method from another thread or press `ctrl-C`. +To stop listening, press `ctrl-C`. + + +### Running the event listener in a separate thread + +To receive and respond to events in a separate thread, you can use the Python `threading` library: + +```python +import threading +from open_ephys.streaming import EventListener + +stream = EventListener(port=5557) + +thread = threading.Thread( + target=self.stream.start, + args=(self.ttl_callback,self.spike_callback), # Arguments to the target function + daemon=True # Ensures the main program doesn't exit when the thread finishes + ) + +thread.start() +``` + +To stop listening, call the `EventListener.stop()` method, and allow the thread to shut down gracefully by calling `thread.join()`: + +```python +stream.stop() +thread.join() +``` + From 690c5ed07c240429ea52db8b5478f0a1f20bcd2a Mon Sep 17 00:00:00 2001 From: Josh Siegle Date: Fri, 7 Nov 2025 15:20:32 -0800 Subject: [PATCH 7/7] Fix typo in README --- src/open_ephys/streaming/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/open_ephys/streaming/README.md b/src/open_ephys/streaming/README.md index 55eed5d..caf5435 100644 --- a/src/open_ephys/streaming/README.md +++ b/src/open_ephys/streaming/README.md @@ -97,9 +97,9 @@ from open_ephys.streaming import EventListener stream = EventListener(port=5557) thread = threading.Thread( - target=self.stream.start, - args=(self.ttl_callback,self.spike_callback), # Arguments to the target function - daemon=True # Ensures the main program doesn't exit when the thread finishes + target = stream.start, + args = (ttl_callback, spike_callback), # Arguments to the target function + daemon = True # Ensures the main program doesn't exit when the thread finishes ) thread.start()