Skip to content

Commit

Permalink
Remove the unit parameter, use the LSL lib directly for timestamp cor…
Browse files Browse the repository at this point in the history
…rection
  • Loading branch information
mesca committed Jun 1, 2023
1 parent c525a11 commit 89fb8ee
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions timeflux/nodes/lsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import os
import pandas as pd
import numpy as np
import uuid
Expand Down Expand Up @@ -35,6 +36,7 @@ class Send(Node):
format (string): The format type for each channel. Currently, only ``double64`` and ``string`` are supported.
rate (float): The nominal sampling rate. Set to ``0.0`` to indicate a variable sampling rate.
source (string, None): The unique identifier for the stream. If ``None``, it will be auto-generated.
config_path (string, None): The path to an LSL config file.
Example:
.. literalinclude:: /../examples/lsl.yaml
Expand All @@ -44,7 +46,15 @@ class Send(Node):

_dtypes = {"double64": np.number, "string": object}

def __init__(self, name, type="Signal", format="double64", rate=0.0, source=None):
def __init__(
self,
name,
type="Signal",
format="double64",
rate=0.0,
source=None,
config_path=None,
):
if not source:
source = str(uuid.uuid4())
self._name = name
Expand All @@ -53,6 +63,8 @@ def __init__(self, name, type="Signal", format="double64", rate=0.0, source=None
self._rate = rate
self._source = source
self._outlet = None
if config_path:
os.environ["LSLAPICFG"] = config_path

def update(self):
if isinstance(self.i.data, pd.core.frame.DataFrame):
Expand All @@ -77,7 +89,7 @@ def update(self):
values = self.i.data.select_dtypes(
include=[self._dtypes[self._format]]
).values
stamps = self.i.data.index.values.astype(np.float64)
stamps = self.i.data.index.values.astype(np.float64) / 1e9
for row, stamp in zip(values, stamps):
self._outlet.push_sample(row, stamp)

Expand All @@ -93,10 +105,13 @@ class Receive(Node):
prop (string): The property to look for during stream resolution (e.g., ``name``, ``type``, ``source_id``).
value (string): The value that the property should have (e.g., ``EEG`` for the type property).
timeout (float): The resolution timeout, in seconds.
unit (string): Unit of the timestamps (e.g., ``s``, ``ms``, ``us``, ``ns``). The LSL library uses seconds by default. Timeflux uses nanoseconds. Default: ``s``.
sync (string, None): The method used to synchronize timestamps. Use ``local`` if you receive the stream from another application on the same computer. Use ``network`` if you receive from another computer. Use ``None`` if you receive from a Timeflux instance on the same computer.
channels (list, None): Override the channel names. If ``None``, the names defined in the LSL stream will be used.
max_samples (int): The maximum number of samples to return per call.
clocksync (bool): Perform automatic clock synchronization.
dejitter (bool): Remove jitter from timestamps using a smoothing algorithm to the received timestamps.
monotonize (bool): Force the timestamps to be monotonically ascending. Only makes sense if timestamps are dejittered.
threadsafe (bool): Same inlet can be read from by multiple threads.
config_path (string, None): The path to an LSL config file.
Example:
.. literalinclude:: /../examples/lsl_multiple.yaml
Expand All @@ -109,23 +124,36 @@ def __init__(
prop="name",
value=None,
timeout=1.0,
unit="s",
sync="local",
channels=None,
max_samples=1024,
clocksync=True,
dejitter=False,
monotonize=False,
threadsafe=True,
config_path=None,
):
if not value:
raise ValueError("Please specify a stream name or a property and value.")
self._prop = prop
self._value = value
self._inlet = None
self._labels = None
self._unit = unit
self._sync = sync
self._channels = channels
self._timeout = timeout
self._max_samples = max_samples
self._offset = np.timedelta64(int((time() - pylsl.local_clock()) * 1e9), "ns")
self._flags = 0
self._offset = 0
if clocksync:
self._flags |= 1
self._offset = time() - pylsl.local_clock()
if dejitter:
self._flags |= 2
if monotonize:
self._flags |= 4
if threadsafe:
self._flags |= 8
if config_path:
os.environ["LSLAPICFG"] = config_path

def update(self):
if not self._inlet:
Expand All @@ -134,7 +162,8 @@ def update(self):
if not streams:
return
self.logger.debug("Stream acquired")
self._inlet = StreamInlet(streams[0])
self._flags = pylsl.proc_clocksync | pylsl.proc_dejitter
self._inlet = StreamInlet(streams[0], processing_flags=self._flags)
info = self._inlet.info()
self._meta = {
"name": info.name(),
Expand All @@ -154,15 +183,8 @@ def update(self):
if self._inlet:
values, stamps = self._inlet.pull_chunk(max_samples=self._max_samples)
if stamps:
stamps = pd.to_datetime(stamps, format=None, unit=self._unit)
if self._sync == "local":
stamps += self._offset
elif self._sync == "network":
stamps = (
stamps
+ np.timedelta64(
round(self._inlet.time_correction() * 1e9), "ns"
)
+ self._offset
)
if self._offset != 0:
self._offset
stamps = np.array(stamps) + self._offset
stamps = pd.to_datetime(stamps, format=None, unit="s")
self.o.set(values, stamps, self._labels, self._meta)

0 comments on commit 89fb8ee

Please sign in to comment.