/
test_osc.py
213 lines (168 loc) · 7.66 KB
/
test_osc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import asyncio
import pytest
@pytest.mark.asyncio
async def test_nodes():
from vidhubcontrol.interfaces.osc.node import OscNode
class Listener(object):
def __init__(self):
self.messages_received = {}
self.tree_messages_received = {}
def on_message_received(self, node, client_address, *messages):
if node.osc_address not in self.messages_received:
self.messages_received[node.osc_address] = []
self.messages_received[node.osc_address].extend([m for m in messages])
def on_tree_message_received(self, node, client_address, *messages):
if node.osc_address not in self.tree_messages_received:
self.tree_messages_received[node.osc_address] = []
self.tree_messages_received[node.osc_address].extend([m for m in messages])
root = OscNode('root')
assert root.osc_address == '/root'
branchA = root.add_child('branchA', children={'leaf1':{}, 'leaf2':{}})
assert branchA.parent == root
assert branchA.osc_address == '/root/branchA'
leafA1 = root.find('branchA/leaf1')
leafA2 = root.find('branchA/leaf2')
leafB1 = root.add_child('branchB/leaf1')
branchB = leafB1.parent
leafB2 = branchB.add_child('leaf2')
assert leafB1.parent == leafB2.parent == branchB
assert branchB.parent == root
assert branchB.osc_address == '/root/branchB'
assert leafB1.osc_address == '/root/branchB/leaf1'
assert root.find('branchB/leaf1') == leafB1
assert root.find('branchB/leaf2') == leafB2
# root.add_child('branchC')
# root.find('branchC').add_child('leaf1')
# root.find('branchC').add_child('leaf2')
leafC1 = root.add_child('branchC/leaf1')
leafC2 = root.add_child('branchC/leaf2')
print(repr(leafC1))
print(repr(leafC2))
all_nodes = {n.osc_address:n for n in root.walk()}
print(all_nodes.keys())
expected = {
'/root',
'/root/branchA',
'/root/branchA/leaf1',
'/root/branchA/leaf2',
'/root/branchB',
'/root/branchB/leaf1',
'/root/branchB/leaf2',
'/root/branchC',
'/root/branchC/leaf1',
'/root/branchC/leaf2',
}
assert set(all_nodes.keys()) == expected
listener = Listener()
for node in all_nodes.values():
node.bind(on_message_received=listener.on_message_received)
root.bind(on_tree_message_received=listener.on_tree_message_received)
for node in all_nodes.values():
node.on_osc_dispatcher_message(node.osc_address, None, *['foo', 'bar'])
assert set(listener.messages_received.keys()) == set(all_nodes.keys())
assert set(listener.tree_messages_received.keys()) == set(all_nodes.keys())
for key in all_nodes.keys():
assert listener.messages_received[key] == ['foo', 'bar']
assert listener.tree_messages_received[key] == ['foo', 'bar']
@pytest.mark.asyncio
async def test_interface():
from vidhubcontrol.interfaces.osc.node import OscNode
from vidhubcontrol.interfaces.osc.interface import OscInterface
from vidhubcontrol.interfaces.osc.server import OSCUDPServer, OscDispatcher
from vidhubcontrol.backends.dummy import DummyBackend
interface = OscInterface()
vidhub = DummyBackend(device_name='dummy-name')
await interface.add_vidhub(vidhub)
await interface.start()
client_node = OscNode('vidhubcontrol')
client_dispatcher = OscDispatcher()
client_node.osc_dispatcher = client_dispatcher
client_addr = (str(interface.hostiface.ip), interface.hostport+1)
client = OSCUDPServer(client_addr, client_dispatcher)
await client.start()
server_addr = interface.server._server_address
assert interface.root_node.find('vidhubs/by-id/dummy') is not None
assert interface.root_node.find('vidhubs/by-name/dummy-name') is not None
for i, lbl in enumerate(vidhub.output_labels):
addr = 'vidhubs/by-id/dummy/labels/output/{}'.format(i)
assert interface.root_node.find(addr) is not None
cnode = client_node.add_child(addr)
cnode.ensure_message(server_addr, 'FOO OUT {}'.format(i))
assert interface.root_node.find('vidhubs/by-name/dummy-name/labels/output/{}'.format(i)) is not None
for i, lbl in enumerate(vidhub.input_labels):
addr = 'vidhubs/by-id/dummy/labels/input/{}'.format(i)
assert interface.root_node.find(addr) is not None
cnode = client_node.add_child(addr)
cnode.ensure_message(server_addr, 'FOO IN {}'.format(i))
assert interface.root_node.find('vidhubs/by-name/dummy-name/labels/input/{}'.format(i)) is not None
for out_idx, in_idx in enumerate(vidhub.crosspoints):
addr = 'vidhubs/by-id/dummy/crosspoints/{}'.format(out_idx)
assert interface.root_node.find(addr) is not None
cnode = client_node.add_child(addr)
cnode.ensure_message(server_addr, 2)
assert interface.root_node.find('vidhubs/by-name/dummy-name/crosspoints/{}'.format(i)) is not None
await asyncio.sleep(2)
for i, lbl in enumerate(vidhub.output_labels):
assert lbl == 'FOO OUT {}'.format(i)
for i, lbl in enumerate(vidhub.input_labels):
assert lbl == 'FOO IN {}'.format(i)
for xpt in vidhub.crosspoints:
assert xpt == 2
# Test presets
preset_node = client_node.add_child('vidhubs/by-id/dummy/presets')
preset_node.add_child('recall')
preset_node.add_child('store')
crosspoint_node = client_node.find('vidhubs/by-id/dummy/crosspoints')
class PresetAwait(object):
def __init__(self, event_name):
self.event = asyncio.Event()
self.args = None
self.kwargs = None
vidhub.bind(**{event_name:self.on_vidhub_event})
def on_vidhub_event(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.event.set()
async def wait(self):
await self.event.wait()
self.event.clear()
return self.args, self.kwargs
waiter = PresetAwait('on_preset_stored')
for i in range(vidhub.num_inputs):
xpts = [i]*vidhub.num_outputs
crosspoint_node.ensure_message(server_addr, *xpts)
await asyncio.sleep(.2)
assert vidhub.crosspoints == xpts
name = 'preset_{}'.format(i)
preset_node.find('store').ensure_message(server_addr, i, name)
await waiter.wait()
assert vidhub.presets[i].name == name
assert vidhub.presets[i].crosspoints == {i:v for i, v in enumerate(xpts)}
assert len(vidhub.presets) == vidhub.num_inputs
waiter = PresetAwait('on_preset_active')
for preset in vidhub.presets:
assert not preset.active
preset_node.find('recall').ensure_message(server_addr, preset.index)
await waiter.wait()
assert preset.active
await client.stop()
await interface.stop()
@pytest.mark.asyncio
async def test_interface_config(tempconfig):
from vidhubcontrol.config import Config
from vidhubcontrol.backends.dummy import DummyBackend
from vidhubcontrol.interfaces.osc.interface import OscInterface
config = Config.load(str(tempconfig))
interface = OscInterface(config=config)
await config.start()
await interface.start()
vidhub = await DummyBackend.create_async(device_id='foo')
config.add_vidhub(vidhub)
while 'foo' not in interface.vidhubs:
await asyncio.sleep(.1)
vidhub_node = interface.root_node.find('vidhubs/by-id/foo')
for i in range(vidhub.num_outputs):
assert vidhub_node.find('crosspoints/{}'.format(i)) is not None
assert vidhub_node.find('labels/output/{}'.format(i)) is not None
for i in range(vidhub.num_inputs):
assert vidhub_node.find('labels/input/{}'.format(i)) is not None