Skip to content

Commit

Permalink
Merge pull request #50 from romanroibu/events-publishing-support
Browse files Browse the repository at this point in the history
Events publishing support
  • Loading branch information
papr committed Dec 16, 2019
2 parents fb2ad72 + 31c299f commit 68608de
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 83 deletions.
62 changes: 62 additions & 0 deletions examples/ndsi-recv-events.py
@@ -0,0 +1,62 @@
import time

# https://github.com/pupil-labs/pyndsi/tree/v1.0
import ndsi # Main requirement

EVENT_TYPE = "event" # Type of sensors that we are interested in
SENSORS = {} # Will store connected sensors


def main():
# Start auto-discovery of Pupil Invisible Companion devices
network = ndsi.Network(formats={ndsi.DataFormat.V4}, callbacks=(on_network_event,))
network.start()

try:
# Event loop, runs until interrupted
while network.running:
# Check for recently connected/disconnected devices
if network.has_events:
network.handle_event()

# Iterate over all connected devices
for event_sensor in SENSORS.values():
# Fetch recent sensor configuration changes,
# required for pyndsi internals
while event_sensor.has_notifications:
event_sensor.handle_notification()

# Fetch recent event data
for event in event_sensor.fetch_data():
# Output: EventValue(timestamp, label)
print(event_sensor, event)

time.sleep(0.1)

# Catch interruption and disconnect gracefully
except (KeyboardInterrupt, SystemExit):
network.stop()


def on_network_event(network, event):
# Handle event sensor attachment
if event["subject"] == "attach" and event["sensor_type"] == EVENT_TYPE:
# Create new sensor, start data streaming,
# and request current configuration
sensor = network.sensor(event["sensor_uuid"])
sensor.set_control_value("streaming", True)
sensor.refresh_controls()

# Save sensor s.t. we can fetch data from it in main()
SENSORS[event["sensor_uuid"]] = sensor
print(f"Added sensor {sensor}...")

# Handle event sensor detachment
if event["subject"] == "detach" and event["sensor_uuid"] in SENSORS:
# Known sensor has disconnected, remove from list
SENSORS[event["sensor_uuid"]].unlink()
del SENSORS[event["sensor_uuid"]]
print(f"Removed sensor {event['sensor_uuid']}...")


main() # Execute example
116 changes: 92 additions & 24 deletions ndsi/formatter.py
Expand Up @@ -12,11 +12,17 @@


__all__ = [
'DataFormat', 'DataFormatter', 'DataMessage',
'VideoDataFormatter', 'VideoValue',
'GazeDataFormatter', 'GazeValue',
'AnnotateDataFormatter', 'AnnotateValue',
'IMUDataFormatter', 'IMUValue',
"DataFormat",
"DataFormatter",
"DataMessage",
"VideoDataFormatter",
"VideoValue",
"GazeDataFormatter",
"GazeValue",
"AnnotateDataFormatter",
"AnnotateValue",
"IMUDataFormatter",
"IMUValue",
]


Expand All @@ -37,15 +43,16 @@ class DataFormat(enum.Enum):
"""
`DataFormat` enum represents the format for serializing and deserializing data between NDSI hosts and clients.
"""
V3 = 'v3'
V4 = 'v4'

V3 = "v3"
V4 = "v4"

@staticmethod
def latest() -> 'DataFormat':
def latest() -> "DataFormat":
return max(DataFormat.supported_formats(), key=lambda f: f.version_major)

@staticmethod
def supported_formats() -> typing.Set['DataFormat']:
def supported_formats() -> typing.Set["DataFormat"]:
return set(DataFormat)

@property
Expand All @@ -62,13 +69,12 @@ class DataMessage(typing.NamedTuple):
body: bytes


DT = typing.TypeVar('DataValue')
DT = typing.TypeVar("DataValue")


class DataFormatter(typing.Generic[DT], abc.ABC):

@abc.abstractstaticmethod
def get_formatter(format: DataFormat) -> 'DataFormatter':
def get_formatter(format: DataFormat) -> "DataFormatter":
pass

@abc.abstractmethod
Expand All @@ -87,7 +93,8 @@ class UnsupportedFormatter(DataFormatter[typing.Any]):
"""
Represents a formatter that is not supported for a specific data format and sensor type combination.
"""
def get_formatter(format: DataFormat) -> 'UnsupportedFormatter':

def get_formatter(format: DataFormat) -> "UnsupportedFormatter":
return UnsupportedFormatter()

def encode_msg(value: typing.Any) -> DataMessage:
Expand All @@ -114,7 +121,9 @@ def reset(self):

@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['VideoDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["VideoDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _VideoDataFormatter_V3()
if format == DataFormat.V4:
Expand All @@ -129,7 +138,7 @@ class _VideoDataFormatter_V3(VideoDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> VideoValue:
meta_data = struct.unpack("<LLLLdLL", data_msg.header)
meta_data = list(meta_data)
meta_data[4] *= 1e6 # Convert timestamp s -> us
meta_data[4] *= 1e6 # Convert timestamp s -> us
meta_data = tuple(meta_data)
if meta_data[0] == VIDEO_FRAME_FORMAT_MJPEG:
return self._frame_factory.create_jpeg_frame(data_msg.body, meta_data)
Expand All @@ -138,14 +147,14 @@ def decode_msg(self, data_msg: DataMessage) -> VideoValue:
self._newest_h264_frame = frame or self._newest_h264_frame
return self._newest_h264_frame
else:
raise StreamError('Frame was not of format MJPEG or H264')
raise StreamError("Frame was not of format MJPEG or H264")


class _VideoDataFormatter_V4(VideoDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> VideoValue:
meta_data = struct.unpack("<LLLLQLL", data_msg.header)
meta_data = list(meta_data)
meta_data[4] /= 1e3 # Convert timestamp ns -> us
meta_data[4] /= 1e3 # Convert timestamp ns -> us
meta_data = tuple(meta_data)
if meta_data[0] == VIDEO_FRAME_FORMAT_MJPEG:
return self._frame_factory.create_jpeg_frame(data_msg.body, meta_data)
Expand All @@ -154,7 +163,7 @@ def decode_msg(self, data_msg: DataMessage) -> VideoValue:
self._newest_h264_frame = frame or self._newest_h264_frame
return self._newest_h264_frame
else:
raise StreamError('Frame was not of format MJPEG or H264')
raise StreamError("Frame was not of format MJPEG or H264")


##########
Expand All @@ -169,7 +178,9 @@ class AnnotateValue(typing.NamedTuple):
class AnnotateDataFormatter(DataFormatter[AnnotateValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['AnnotateDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["AnnotateDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _AnnotateDataFormatter_V3()
if format == DataFormat.V4:
Expand Down Expand Up @@ -207,7 +218,9 @@ class GazeValue(typing.NamedTuple):
class GazeDataFormatter(DataFormatter[GazeValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['GazeDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["GazeDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return UnsupportedFormatter()
if format == DataFormat.V4:
Expand All @@ -220,7 +233,7 @@ def encode_msg(self, value: GazeValue) -> DataMessage:

class _GazeDataFormatter_V4(GazeDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> GazeValue:
ts, = struct.unpack("<Q", data_msg.header)
(ts,) = struct.unpack("<Q", data_msg.header)
ts *= NANO
x, y = struct.unpack("<ff", data_msg.body)
return GazeValue(x=x, y=y, timestamp=ts)
Expand All @@ -242,7 +255,9 @@ class IMUValue(typing.NamedTuple):
class IMUDataFormatter(DataFormatter[IMUValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['IMUDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["IMUDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _IMUDataFormatter_V3()
if format == DataFormat.V4:
Expand All @@ -267,7 +282,9 @@ class _IMUDataFormatter_V3(IMUDataFormatter):
)

def decode_msg(self, data_msg: DataMessage) -> IMUValue:
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(np.recarray)
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(
np.recarray
)
return IMUValue(*content)


Expand All @@ -285,5 +302,56 @@ class _IMUDataFormatter_V4(IMUDataFormatter):
)

def decode_msg(self, data_msg: DataMessage) -> IMUValue:
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(np.recarray)
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(
np.recarray
)
return IMUValue(*content)


##########


class EventValue(typing.NamedTuple):
timestamp: float
label: str


class EventDataFormatter(DataFormatter[EventValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(
format: DataFormat,
) -> typing.Union["EventDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return UnsupportedFormatter()
if format == DataFormat.V4:
return _EventDataFormatter_V4()
raise ValueError(format)

def encode_msg(self, value: EventValue) -> DataMessage:
raise NotImplementedError()


class _EventDataFormatter_V4(EventDataFormatter):

_encoding_lookup = {
0: "utf-8",
}

def decode_msg(self, data_msg: DataMessage) -> EventValue:
"""
1. sensor UUID
2. header:
- int_64 timestamp_le
- uint32 body_length_le
- uint32 encoding_le
= 0 -> "utf-8"
3. body:
- `encoding_le` encoded string of lenght `body_length_le`
"""
ts, len_, enc_code = struct.unpack("<qii", data_msg.header)
ts *= NANO
enc = self._encoding_lookup[enc_code]
body = data_msg.body.bytes[:len_]
label = body.decode(enc)
return EventValue(label=label, timestamp=ts)

0 comments on commit 68608de

Please sign in to comment.