Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events publishing support #50

Merged
merged 5 commits into from Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 62 additions & 0 deletions examples/ndsi-recv-events.py
@@ -0,0 +1,62 @@
import time

# https://github.com/pupil-labs/pyndsi/tree/v1.0
import ndsi # Main requirement

EVENT_TYPE = "event" # Type of sensors that we are interested in
SENSORS = {} # Will store connected sensors


def main():
# Start auto-discovery of Pupil Invisible Companion devices
network = ndsi.Network(formats={ndsi.DataFormat.V4}, callbacks=(on_network_event,))
network.start()

try:
# Event loop, runs until interrupted
while network.running:
# Check for recently connected/disconnected devices
if network.has_events:
network.handle_event()

# Iterate over all connected devices
for event_sensor in SENSORS.values():
# Fetch recent sensor configuration changes,
# required for pyndsi internals
while event_sensor.has_notifications:
event_sensor.handle_notification()

# Fetch recent event data
for event in event_sensor.fetch_data():
# Output: EventValue(timestamp, label)
print(event_sensor, event)

time.sleep(0.1)

# Catch interruption and disconnect gracefully
except (KeyboardInterrupt, SystemExit):
network.stop()


def on_network_event(network, event):
# Handle event sensor attachment
if event["subject"] == "attach" and event["sensor_type"] == EVENT_TYPE:
# Create new sensor, start data streaming,
# and request current configuration
sensor = network.sensor(event["sensor_uuid"])
sensor.set_control_value("streaming", True)
sensor.refresh_controls()

# Save sensor s.t. we can fetch data from it in main()
SENSORS[event["sensor_uuid"]] = sensor
print(f"Added sensor {sensor}...")

# Handle event sensor detachment
if event["subject"] == "detach" and event["sensor_uuid"] in SENSORS:
# Known sensor has disconnected, remove from list
SENSORS[event["sensor_uuid"]].unlink()
del SENSORS[event["sensor_uuid"]]
print(f"Removed sensor {event['sensor_uuid']}...")


main() # Execute example
116 changes: 92 additions & 24 deletions ndsi/formatter.py
Expand Up @@ -12,11 +12,17 @@


__all__ = [
'DataFormat', 'DataFormatter', 'DataMessage',
'VideoDataFormatter', 'VideoValue',
'GazeDataFormatter', 'GazeValue',
'AnnotateDataFormatter', 'AnnotateValue',
'IMUDataFormatter', 'IMUValue',
"DataFormat",
"DataFormatter",
"DataMessage",
"VideoDataFormatter",
"VideoValue",
"GazeDataFormatter",
"GazeValue",
"AnnotateDataFormatter",
"AnnotateValue",
"IMUDataFormatter",
"IMUValue",
]


Expand All @@ -37,15 +43,16 @@ class DataFormat(enum.Enum):
"""
`DataFormat` enum represents the format for serializing and deserializing data between NDSI hosts and clients.
"""
V3 = 'v3'
V4 = 'v4'

V3 = "v3"
V4 = "v4"

@staticmethod
def latest() -> 'DataFormat':
def latest() -> "DataFormat":
return max(DataFormat.supported_formats(), key=lambda f: f.version_major)

@staticmethod
def supported_formats() -> typing.Set['DataFormat']:
def supported_formats() -> typing.Set["DataFormat"]:
return set(DataFormat)

@property
Expand All @@ -62,13 +69,12 @@ class DataMessage(typing.NamedTuple):
body: bytes


DT = typing.TypeVar('DataValue')
DT = typing.TypeVar("DataValue")


class DataFormatter(typing.Generic[DT], abc.ABC):

@abc.abstractstaticmethod
def get_formatter(format: DataFormat) -> 'DataFormatter':
def get_formatter(format: DataFormat) -> "DataFormatter":
pass

@abc.abstractmethod
Expand All @@ -87,7 +93,8 @@ class UnsupportedFormatter(DataFormatter[typing.Any]):
"""
Represents a formatter that is not supported for a specific data format and sensor type combination.
"""
def get_formatter(format: DataFormat) -> 'UnsupportedFormatter':

def get_formatter(format: DataFormat) -> "UnsupportedFormatter":
return UnsupportedFormatter()

def encode_msg(value: typing.Any) -> DataMessage:
Expand All @@ -114,7 +121,9 @@ def reset(self):

@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['VideoDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["VideoDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _VideoDataFormatter_V3()
if format == DataFormat.V4:
Expand All @@ -129,7 +138,7 @@ class _VideoDataFormatter_V3(VideoDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> VideoValue:
meta_data = struct.unpack("<LLLLdLL", data_msg.header)
meta_data = list(meta_data)
meta_data[4] *= 1e6 # Convert timestamp s -> us
meta_data[4] *= 1e6 # Convert timestamp s -> us
meta_data = tuple(meta_data)
if meta_data[0] == VIDEO_FRAME_FORMAT_MJPEG:
return self._frame_factory.create_jpeg_frame(data_msg.body, meta_data)
Expand All @@ -138,14 +147,14 @@ def decode_msg(self, data_msg: DataMessage) -> VideoValue:
self._newest_h264_frame = frame or self._newest_h264_frame
return self._newest_h264_frame
else:
raise StreamError('Frame was not of format MJPEG or H264')
raise StreamError("Frame was not of format MJPEG or H264")


class _VideoDataFormatter_V4(VideoDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> VideoValue:
meta_data = struct.unpack("<LLLLQLL", data_msg.header)
meta_data = list(meta_data)
meta_data[4] /= 1e3 # Convert timestamp ns -> us
meta_data[4] /= 1e3 # Convert timestamp ns -> us
meta_data = tuple(meta_data)
if meta_data[0] == VIDEO_FRAME_FORMAT_MJPEG:
return self._frame_factory.create_jpeg_frame(data_msg.body, meta_data)
Expand All @@ -154,7 +163,7 @@ def decode_msg(self, data_msg: DataMessage) -> VideoValue:
self._newest_h264_frame = frame or self._newest_h264_frame
return self._newest_h264_frame
else:
raise StreamError('Frame was not of format MJPEG or H264')
raise StreamError("Frame was not of format MJPEG or H264")


##########
Expand All @@ -169,7 +178,9 @@ class AnnotateValue(typing.NamedTuple):
class AnnotateDataFormatter(DataFormatter[AnnotateValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['AnnotateDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["AnnotateDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _AnnotateDataFormatter_V3()
if format == DataFormat.V4:
Expand Down Expand Up @@ -207,7 +218,9 @@ class GazeValue(typing.NamedTuple):
class GazeDataFormatter(DataFormatter[GazeValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['GazeDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["GazeDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return UnsupportedFormatter()
if format == DataFormat.V4:
Expand All @@ -220,7 +233,7 @@ def encode_msg(self, value: GazeValue) -> DataMessage:

class _GazeDataFormatter_V4(GazeDataFormatter):
def decode_msg(self, data_msg: DataMessage) -> GazeValue:
ts, = struct.unpack("<Q", data_msg.header)
(ts,) = struct.unpack("<Q", data_msg.header)
ts *= NANO
x, y = struct.unpack("<ff", data_msg.body)
return GazeValue(x=x, y=y, timestamp=ts)
Expand All @@ -242,7 +255,9 @@ class IMUValue(typing.NamedTuple):
class IMUDataFormatter(DataFormatter[IMUValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(format: DataFormat) -> typing.Union['IMUDataFormatter', UnsupportedFormatter]:
def get_formatter(
format: DataFormat,
) -> typing.Union["IMUDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return _IMUDataFormatter_V3()
if format == DataFormat.V4:
Expand All @@ -267,7 +282,9 @@ class _IMUDataFormatter_V3(IMUDataFormatter):
)

def decode_msg(self, data_msg: DataMessage) -> IMUValue:
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(np.recarray)
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(
np.recarray
)
return IMUValue(*content)


Expand All @@ -285,5 +302,56 @@ class _IMUDataFormatter_V4(IMUDataFormatter):
)

def decode_msg(self, data_msg: DataMessage) -> IMUValue:
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(np.recarray)
content = np.frombuffer(data_msg.body, dtype=self.CONTENT_DTYPE).view(
np.recarray
)
return IMUValue(*content)


##########


class EventValue(typing.NamedTuple):
timestamp: float
label: str


class EventDataFormatter(DataFormatter[EventValue]):
@staticmethod
@functools.lru_cache(maxsize=1, typed=True)
def get_formatter(
format: DataFormat,
) -> typing.Union["EventDataFormatter", UnsupportedFormatter]:
if format == DataFormat.V3:
return UnsupportedFormatter()
if format == DataFormat.V4:
return _EventDataFormatter_V4()
raise ValueError(format)

def encode_msg(self, value: EventValue) -> DataMessage:
raise NotImplementedError()


class _EventDataFormatter_V4(EventDataFormatter):

_encoding_lookup = {
0: "utf-8",
}

def decode_msg(self, data_msg: DataMessage) -> EventValue:
"""
1. sensor UUID
2. header:
- int_64 timestamp_le
- uint32 body_length_le
- uint32 encoding_le
= 0 -> "utf-8"
3. body:
- `encoding_le` encoded string of lenght `body_length_le`
"""
ts, len_, enc_code = struct.unpack("<qii", data_msg.header)
ts *= NANO
enc = self._encoding_lookup[enc_code]
body = data_msg.body.bytes[:len_]
label = body.decode(enc)
return EventValue(label=label, timestamp=ts)