# Tests for DataMux

In [1]:
import logging

logging.basicConfig(format="%(asctime)-15s %(message)s", level=logging.INFO)

In [2]:
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
from serving import WebsocketClient

server_host = "localhost"
server_port = 3300

client = WebsocketClient(server_host, server_port, backend="avro")
await client.connect()

2023-08-09 23:19:02,328 Connected to DataMux: ws://localhost:3300/ws


In [4]:
import asyncio

collections = None
collection_streams = None
live_streams = None


async def log_messages():
    global collections
    global collection_streams
    global live_streams

    while True:
        topic, content = await client.recvmsg()
        if topic == client.protocol.LIST_COLLECTIONS:
            collections = content["collections"]
        if topic == client.protocol.LIST_COLLECTION_STREAMS:
            collection_streams = content["streams"]
        if topic == client.protocol.LIST_LIVE_STREAMS:
            live_streams = content["streams"]


logging_task = asyncio.create_task(log_messages())

In [5]:
await client.sendmsg(*client.datamux.list_collections())
await asyncio.sleep(2)
assert collections

2023-08-09 23:19:02,830 topic: b'list_collections'
2023-08-09 23:19:02,831 content: {'collections': [{'name': 'n_back', 'description': 'Eye movements of participants during a n-back task', 'keywords': ['n-back'], 'authors': [{'name': 'Andrew T. Duchowski', 'affiliation': 'Clemson University', 'email': 'duchowski@clemson.edu'}], 'streams': {'gaze': {'name': 'Gaze', 'description': 'gaze point', 'unit': '', 'frequency': 1000.0, 'fields': {'x': {'name': 'gaze x', 'description': 'Normalized X-Gaze Point', 'dtype': 'f32'}, 'y': {'name': 'gaze y', 'description': 'Normalized Y-Gaze Point', 'dtype': 'f32'}}, 'index': {'t': {'name': 'timestamp', 'description': 'Recording Timestamp', 'dtype': 'f32'}}, 'node': {'device': None, 'uri': None, 'inputs': {}, 'outputs': {}}, 'attrs': {}}, 'pupil': {'name': 'Pupil', 'description': 'pupil diameter', 'unit': 'mm', 'frequency': 1000.0, 'fields': {'d': {'name': 'pupil d', 'description': 'Pupil Diameter', 'dtype': 'f32'}}, 'index': {'t': {'name': 'timestamp',

In [6]:
await client.sendmsg(*client.datamux.list_collection_streams("adhd_sin"))
await asyncio.sleep(2)
assert collection_streams

2023-08-09 23:19:05,197 topic: b'list_collection_streams'
2023-08-09 23:19:05,215 content: {'collection_name': 'adhd_sin', 'streams': [{'name': 'Gaze', 'description': 'gaze point', 'unit': '', 'frequency': 60.0, 'fields': {'x': {'name': 'gaze x', 'description': 'Normalized X-Gaze Point', 'dtype': 'f32'}, 'y': {'name': 'gaze y', 'description': 'Normalized Y-Gaze Point', 'dtype': 'f32'}}, 'index': {'t': {'name': 'timestamp', 'description': 'Recording Timestamp', 'dtype': 'f32'}}, 'node': {'device': None, 'uri': None, 'inputs': {}, 'outputs': {}}, 'attrs': {'diagnosis': 'Non-ADHD', 'gender': 'female', 'noise': '0', 'participant': '12', 'question': '10', 'collection': 'adhd_sin', 'id': 'gaze'}}, {'name': 'Pupil', 'description': 'pupil diameter', 'unit': 'mm', 'frequency': 60.0, 'fields': {'d': {'name': 'pupil d', 'description': 'Pupil Diameter', 'dtype': 'f32'}}, 'index': {'t': {'name': 'timestamp', 'description': 'Recording Timestamp', 'dtype': 'f32'}}, 'node': {'device': None, 'uri': Non

In [7]:
collection_name = "adhd_sin"
stream_name = "Gaze"
attrs = dict(
    {
        "diagnosis": "Non-ADHD",
        "gender": "female",
        "noise": "0",
        "participant": "12",
        "question": "10",
        "collection": "adhd_sin",
        "id": "gaze",
    }
)

In [8]:
await client.sendmsg(
    *client.datamux.replay_collection_stream(collection_name, stream_name, attrs)
)
await asyncio.sleep(10)

2023-08-09 23:19:06,423 topic: b'replay_collection_stream'
2023-08-09 23:19:06,424 content: {'collection_name': 'adhd_sin', 'stream_name': 'Gaze', 'attrs': {'diagnosis': 'Non-ADHD', 'gender': 'female', 'noise': '0', 'participant': '12', 'question': '10', 'collection': 'adhd_sin', 'id': 'gaze'}, 'status': 1}
2023-08-09 23:19:06,426 topic: b'data'
2023-08-09 23:19:06,427 content: {'index': {'t': 0.0}, 'value': {'x': 973.0, 'y': 748.0}}
2023-08-09 23:19:06,438 topic: b'data'
2023-08-09 23:19:06,440 content: {'index': {'t': 16.670000076293945}, 'value': {'x': 976.0, 'y': 752.0}}
2023-08-09 23:19:06,456 topic: b'data'
2023-08-09 23:19:06,458 content: {'index': {'t': 33.33000183105469}, 'value': {'x': 975.0, 'y': 750.0}}
2023-08-09 23:19:06,474 topic: b'data'
2023-08-09 23:19:06,476 content: {'index': {'t': 50.0}, 'value': {'x': 973.0, 'y': 748.0}}
2023-08-09 23:19:06,491 topic: b'data'
2023-08-09 23:19:06,492 content: {'index': {'t': 66.66000366210938}, 'value': {'x': 972.0, 'y': 745.0}}
20

In [9]:
await client.sendmsg(
    *client.datamux.restream_collection_stream(collection_name, stream_name, attrs)
)
await asyncio.sleep(2)

2023-08-09 23:19:16,450 topic: b'restream_collection_stream'
2023-08-09 23:19:16,451 content: {'collection_name': 'adhd_sin', 'stream_name': 'Gaze', 'attrs': {'diagnosis': 'Non-ADHD', 'gender': 'female', 'noise': '0', 'participant': '12', 'question': '10', 'collection': 'adhd_sin', 'id': 'gaze'}, 'status': 1}


In [10]:
await client.sendmsg(*client.datamux.list_live_streams())
await asyncio.sleep(2)
assert live_streams

2023-08-09 23:19:19,466 topic: b'list_live_streams'
2023-08-09 23:19:19,468 content: {'streams': [{'name': 'Gaze', 'description': 'gaze point', 'unit': '', 'frequency': 60.0, 'fields': {'x': {'name': 'gaze x', 'description': 'Normalized X-Gaze Point', 'dtype': 'f32'}, 'y': {'name': 'gaze y', 'description': 'Normalized Y-Gaze Point', 'dtype': 'f32'}}, 'index': {'t': {'name': 'timestamp', 'description': 'Recording Timestamp', 'dtype': 'f32'}}, 'node': None, 'attrs': {'diagnosis': 'Non-ADHD', 'gender': 'female', 'noise': '0', 'participant': '12', 'question': '10', 'collection': 'adhd_sin', 'id': 'gaze', 'dfds_mode': 'restream'}}]}


In [11]:
assert live_streams
live_stream = live_streams[0]

ls_name = live_stream["name"]
ls_attrs = live_stream["attrs"]
await client.sendmsg(*client.datamux.relay_live_streams(ls_name, ls_attrs))

2023-08-09 23:19:20,492 topic: b'relay_live_stream'
2023-08-09 23:19:20,493 content: {'stream_name': 'Gaze', 'status': 1}
2023-08-09 23:19:20,511 topic: b'data'
2023-08-09 23:19:20,512 content: {'index': {'t': 1590389.603234798}, 'value': {'x': 973.0, 'y': 748.0}}
2023-08-09 23:19:20,528 topic: b'data'
2023-08-09 23:19:20,529 content: {'index': {'t': 16.670000076293945}, 'value': {'x': 976.0, 'y': 752.0}}
2023-08-09 23:19:20,545 topic: b'data'
2023-08-09 23:19:20,546 content: {'index': {'t': 33.33000183105469}, 'value': {'x': 975.0, 'y': 750.0}}
2023-08-09 23:19:20,563 topic: b'data'
2023-08-09 23:19:20,564 content: {'index': {'t': 50.0}, 'value': {'x': 973.0, 'y': 748.0}}
2023-08-09 23:19:20,580 topic: b'data'
2023-08-09 23:19:20,581 content: {'index': {'t': 66.66000366210938}, 'value': {'x': 972.0, 'y': 745.0}}
2023-08-09 23:19:20,598 topic: b'data'
2023-08-09 23:19:20,599 content: {'index': {'t': 83.33000183105469}, 'value': {'x': 966.0, 'y': 747.0}}
2023-08-09 23:19:20,615 topic: b