1
+ import logging
1
2
import unittest
2
3
from threading import Event
3
4
9
10
class TestNetwork (unittest .TestCase ):
10
11
11
12
def setUp (self ):
12
- network = canopen .Network ()
13
- with self .assertLogs ():
14
- network .add_node (2 , SAMPLE_EDS )
15
- network .add_node (3 , network [2 ].object_dictionary )
16
- self .network = network
13
+ self .network = canopen .Network ()
17
14
18
- def test_add_node (self ):
19
- node = self . network [ 2 ]
20
- self .assertIsInstance ( node , canopen . Node )
21
- self .assertEqual ( node . id , 2 )
15
+ def test_network_add_node (self ):
16
+ # Add using str.
17
+ with self .assertLogs ():
18
+ node = self .network . add_node ( 2 , SAMPLE_EDS )
22
19
self .assertEqual (self .network [2 ], node )
23
- self .assertEqual (len (self .network ), 2 )
20
+ self .assertEqual (node .id , 2 )
21
+ self .assertIsInstance (node , canopen .RemoteNode )
22
+
23
+ # Add using OD.
24
+ node = self .network .add_node (3 , self .network [2 ].object_dictionary )
25
+ self .assertEqual (self .network [3 ], node )
26
+ self .assertEqual (node .id , 3 )
27
+ self .assertIsInstance (node , canopen .RemoteNode )
28
+
29
+ # Add using RemoteNode.
30
+ with self .assertLogs ():
31
+ node = canopen .RemoteNode (4 , SAMPLE_EDS )
32
+ self .network .add_node (node )
33
+ self .assertEqual (self .network [4 ], node )
34
+ self .assertEqual (node .id , 4 )
35
+ self .assertIsInstance (node , canopen .RemoteNode )
36
+
37
+ # Add using LocalNode.
38
+ with self .assertLogs ():
39
+ node = canopen .LocalNode (5 , SAMPLE_EDS )
40
+ self .network .add_node (node )
41
+ self .assertEqual (self .network [5 ], node )
42
+ self .assertEqual (node .id , 5 )
43
+ self .assertIsInstance (node , canopen .LocalNode )
44
+
45
+ # Verify that we've got the correct number of nodes.
46
+ self .assertEqual (len (self .network ), 4 )
24
47
25
- def test_notify (self ):
48
+ def test_network_add_node_upload_eds (self ):
49
+ # Will err because we're not connected to a real network.
50
+ with self .assertLogs (level = logging .ERROR ):
51
+ self .network .add_node (2 , SAMPLE_EDS , upload_eds = True )
52
+
53
+ def test_network_create_node (self ):
54
+ with self .assertLogs ():
55
+ self .network .create_node (2 , SAMPLE_EDS )
56
+ self .network .create_node (3 , SAMPLE_EDS )
57
+ node = canopen .RemoteNode (4 , SAMPLE_EDS )
58
+ self .network .create_node (node )
59
+ self .assertIsInstance (self .network [2 ], canopen .LocalNode )
60
+ self .assertIsInstance (self .network [3 ], canopen .LocalNode )
61
+ self .assertIsInstance (self .network [4 ], canopen .RemoteNode )
62
+
63
+ def test_network_check (self ):
64
+ self .network .connect (interface = "virtual" )
65
+
66
+ def cleanup ():
67
+ # We must clear the fake exception installed below, since
68
+ # .disconnect() implicitly calls .check() during test tear down.
69
+ self .network .notifier .exception = None
70
+ self .network .disconnect ()
71
+
72
+ self .addCleanup (cleanup )
73
+ self .assertIsNone (self .network .check ())
74
+
75
+ class Custom (Exception ):
76
+ pass
77
+
78
+ self .network .notifier .exception = Custom ("fake" )
79
+ with self .assertRaisesRegex (Custom , "fake" ):
80
+ with self .assertLogs (level = logging .ERROR ):
81
+ self .network .check ()
82
+ with self .assertRaisesRegex (Custom , "fake" ):
83
+ with self .assertLogs (level = logging .ERROR ):
84
+ self .network .disconnect ()
85
+
86
+ def test_network_notify (self ):
87
+ with self .assertLogs ():
88
+ self .network .add_node (2 , SAMPLE_EDS )
26
89
node = self .network [2 ]
27
90
self .network .notify (0x82 , b'\x01 \x20 \x02 \x00 \x01 \x02 \x03 \x04 ' , 1473418396.0 )
28
91
self .assertEqual (len (node .emcy .active ), 1 )
29
92
self .network .notify (0x702 , b'\x05 ' , 1473418396.0 )
30
93
self .assertEqual (node .nmt .state , 'OPERATIONAL' )
31
94
self .assertListEqual (self .network .scanner .nodes , [2 ])
32
95
33
- def test_send (self ):
34
- bus = can .interface .Bus (interface = "virtual" , channel = 1 )
96
+ def test_network_send_message (self ):
97
+ bus = can .interface .Bus (interface = "virtual" )
35
98
self .addCleanup (bus .shutdown )
36
99
37
- self .network .connect (interface = "virtual" , channel = 1 )
100
+ self .network .connect (interface = "virtual" )
38
101
self .addCleanup (self .network .disconnect )
39
102
40
103
# Send standard ID
@@ -52,16 +115,123 @@ def test_send(self):
52
115
self .assertEqual (msg .arbitration_id , 0x12345 )
53
116
self .assertTrue (msg .is_extended_id )
54
117
55
- def test_send_periodic (self ):
118
+ def test_network_subscribe_unsubscribe (self ):
119
+ N_HOOKS = 3
120
+ accumulators = [] * N_HOOKS
121
+
122
+ self .network .connect (interface = "virtual" , receive_own_messages = True )
123
+ self .addCleanup (self .network .disconnect )
124
+
125
+ for i in range (N_HOOKS ):
126
+ accumulators .append ([])
127
+ def hook (* args , i = i ):
128
+ accumulators [i ].append (args )
129
+ self .network .subscribe (i , hook )
130
+
131
+ self .network .notify (0 , bytes ([1 , 2 , 3 ]), 1000 )
132
+ self .network .notify (1 , bytes ([2 , 3 , 4 ]), 1001 )
133
+ self .network .notify (1 , bytes ([3 , 4 , 5 ]), 1002 )
134
+ self .network .notify (2 , bytes ([4 , 5 , 6 ]), 1003 )
135
+
136
+ self .assertEqual (accumulators [0 ], [(0 , bytes ([1 , 2 , 3 ]), 1000 )])
137
+ self .assertEqual (accumulators [1 ], [
138
+ (1 , bytes ([2 , 3 , 4 ]), 1001 ),
139
+ (1 , bytes ([3 , 4 , 5 ]), 1002 ),
140
+ ])
141
+ self .assertEqual (accumulators [2 ], [(2 , bytes ([4 , 5 , 6 ]), 1003 )])
142
+
143
+ self .network .unsubscribe (0 )
144
+ self .network .notify (0 , bytes ([7 , 7 , 7 ]), 1004 )
145
+ # Verify that no new data was added to the accumulator.
146
+ self .assertEqual (accumulators [0 ], [(0 , bytes ([1 , 2 , 3 ]), 1000 )])
147
+
148
+ def test_network_subscribe_multiple (self ):
149
+ N_HOOKS = 3
150
+ self .network .connect (interface = "virtual" , receive_own_messages = True )
151
+ self .addCleanup (self .network .disconnect )
152
+
153
+ accumulators = []
154
+ hooks = []
155
+ for i in range (N_HOOKS ):
156
+ accumulators .append ([])
157
+ def hook (* args , i = i ):
158
+ accumulators [i ].append (args )
159
+ hooks .append (hook )
160
+ self .network .subscribe (0x20 , hook )
161
+
162
+ self .network .notify (0xaa , bytes ([1 , 1 , 1 ]), 2000 )
163
+ self .network .notify (0x20 , bytes ([2 , 3 , 4 ]), 2001 )
164
+ self .network .notify (0xbb , bytes ([2 , 2 , 2 ]), 2002 )
165
+ self .network .notify (0x20 , bytes ([3 , 4 , 5 ]), 2003 )
166
+ self .network .notify (0xcc , bytes ([3 , 3 , 3 ]), 2004 )
167
+
168
+ BATCH1 = [
169
+ (0x20 , bytes ([2 , 3 , 4 ]), 2001 ),
170
+ (0x20 , bytes ([3 , 4 , 5 ]), 2003 ),
171
+ ]
172
+ for n , acc in enumerate (accumulators ):
173
+ with self .subTest (hook = n ):
174
+ self .assertEqual (acc , BATCH1 )
175
+
176
+ # Unsubscribe the second hook; dispatch a new message.
177
+ self .network .unsubscribe (0x20 , hooks [1 ])
178
+
179
+ BATCH2 = 0x20 , bytes ([4 , 5 , 6 ]), 2005
180
+ self .network .notify (* BATCH2 )
181
+ self .assertEqual (accumulators [0 ], BATCH1 + [BATCH2 ])
182
+ self .assertEqual (accumulators [1 ], BATCH1 )
183
+ self .assertEqual (accumulators [2 ], BATCH1 + [BATCH2 ])
184
+
185
+ # Unsubscribe the first hook; dispatch yet another message.
186
+ self .network .unsubscribe (0x20 , hooks [0 ])
187
+
188
+ BATCH3 = 0x20 , bytes ([5 , 6 , 7 ]), 2006
189
+ self .network .notify (* BATCH3 )
190
+ self .assertEqual (accumulators [0 ], BATCH1 + [BATCH2 ])
191
+ self .assertEqual (accumulators [1 ], BATCH1 )
192
+ self .assertEqual (accumulators [2 ], BATCH1 + [BATCH2 ] + [BATCH3 ])
193
+
194
+ # Unsubscribe the rest (only one remaining); dispatch a new message.
195
+ self .network .unsubscribe (0x20 )
196
+ self .network .notify (0x20 , bytes ([7 , 7 , 7 ]), 2007 )
197
+ self .assertEqual (accumulators [0 ], BATCH1 + [BATCH2 ])
198
+ self .assertEqual (accumulators [1 ], BATCH1 )
199
+ self .assertEqual (accumulators [2 ], BATCH1 + [BATCH2 ] + [BATCH3 ])
200
+
201
+ def test_network_context_manager (self ):
202
+ with self .network .connect (interface = "virtual" ):
203
+ pass
204
+ with self .assertRaisesRegex (RuntimeError , "Not connected" ):
205
+ self .network .send_message (0 , [])
206
+
207
+ def test_network_item_access (self ):
208
+ with self .assertLogs ():
209
+ self .network .add_node (2 , SAMPLE_EDS )
210
+ self .network .add_node (3 , SAMPLE_EDS )
211
+ self .assertEqual ([2 , 3 ], [node for node in self .network ])
212
+
213
+ # Check __delitem__.
214
+ del self .network [2 ]
215
+ self .assertEqual ([3 ], [node for node in self .network ])
216
+ with self .assertRaises (KeyError ):
217
+ del self .network [2 ]
218
+
219
+ # Check __setitem__.
220
+ old = self .network [3 ]
221
+ with self .assertLogs ():
222
+ new = canopen .Node (3 , SAMPLE_EDS )
223
+ self .network [3 ] = new
224
+
225
+ # Check __getitem__.
226
+ self .assertNotEqual (self .network [3 ], old )
227
+ self .assertEqual ([3 ], [node for node in self .network ])
228
+
229
+ def test_network_send_periodic (self ):
56
230
DATA1 = bytes ([1 , 2 , 3 ])
57
231
DATA2 = bytes ([4 , 5 , 6 ])
58
232
COB_ID = 0x123
59
233
PERIOD = 0.1
60
- self .network .connect (
61
- interface = "virtual" ,
62
- channel = 1 ,
63
- receive_own_messages = True
64
- )
234
+ self .network .connect (interface = "virtual" , receive_own_messages = True )
65
235
self .addCleanup (self .network .disconnect )
66
236
67
237
acc = []
0 commit comments