forked from arduino/arduino-create-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_ws.py
210 lines (173 loc) Β· 7.75 KB
/
test_ws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# Copyright 2022 Arduino SA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import time
import json
import base64
import pytest
from sys import platform
from common import running_on_ci
message = []
@pytest.mark.skipif(
platform == "darwin",
reason="on macOS the user is prompted to install certificates",
)
def test_ws_connection(socketio):
print('my sid is', socketio.sid)
assert socketio.sid is not None
@pytest.mark.skipif(
platform == "darwin",
reason="on macOS the user is prompted to install certificates",
)
def test_list(socketio, message):
socketio.emit('command', 'list')
time.sleep(.2)
print (message)
assert any("list" in i for i in message)
assert any("Ports" in i for i in message)
# NOTE run the following tests with a board connected to the PC
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_default(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "default")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timed(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "timed")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timedraw(socketio, serial_port, baudrate, message):
general_open_serial(socketio, serial_port, baudrate, message, "timedraw")
# NOTE run the following tests with a board connected to the PC and with the sketch found in tests/testdata/SerialEcho.ino on it be sure to change serial_address in conftest.py
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_default(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "default")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_timed(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "timed")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_serial_timedraw(socketio, close_port, serial_port, baudrate, message):
general_send_serial(socketio, close_port, serial_port, baudrate, message, "timedraw")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_default(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "default")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_timed(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "timed")
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_send_emoji_serial_timedraw(socketio, close_port, serial_port, baudrate, message):
general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, "timedraw")
def general_open_serial(socketio, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
# test the closing of the serial port, we are gonna use close_port for the other tests
socketio.emit('command', 'close ' + serial_port)
time.sleep(.2)
print (message)
#check if port has been closed
assert any("\"IsOpen\": false," in i for i in message)
def general_send_serial(socketio, close_port, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
# send the string "ciao" using the serial connection
socketio.emit('command', 'send ' + serial_port + ' ciao')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " ciao" in i for i in message)
#check if message has been sent back by the connected board
if buffertype == "timedraw":
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
assert "ciao" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure
def general_send_emoji_serial(socketio, close_port, serial_port, baudrate, message, buffertype):
open_serial_port(socketio, serial_port, baudrate, message, buffertype)
# send a lot of emoji: they can be messed up
socketio.emit('command', 'send ' + serial_port + ' /"π§π§π§π§π§π§π§π§π§π§/"')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + serial_port + " /\"π§π§π§π§π§π§π§π§π§π§/\"" in i for i in message)
if buffertype == "timedraw":
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
assert "/\"π§π§π§π§π§π§π§π§π§π§/\"" in output
# the serial connection is closed by close_port() fixture: even if in case of test failure
def open_serial_port(socketio, serial_port, baudrate, message, buffertype):
#open a new serial connection with the specified buffertype
socketio.emit('command', 'open ' + serial_port + ' ' + baudrate + ' ' + buffertype)
# give time to the message var to be filled
time.sleep(.5)
print(message)
# the serial connection should be open now
assert any("\"IsOpen\": true" in i for i in message)
@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_sendraw_serial(socketio, close_port, serial_port, baudrate, message):
open_serial_port(socketio, serial_port, baudrate, message, "timedraw")
#test with bytes
integers = [1, 2, 3, 4, 5]
bytes_array=bytearray(integers)
encoded_integers = base64.b64encode(bytes_array).decode('ascii')
socketio.emit('command', 'sendraw ' + serial_port + ' ' + encoded_integers)
time.sleep(1)
print(message)
# check if the send command has been registered
assert any(("sendraw " + serial_port + ' ' + encoded_integers) in i for i in message)
#check if message has been sent back by the connected board
output = extract_serial_data(message) # TODO use decode_output()
print (output)
assert encoded_integers in output
# helper function used to extract serial data from its JSON representation
# NOTE make sure to pass a clean message (maybe reinitialize the message global var before populating it)
def extract_serial_data(msg):
serial_data = ""
for i in msg:
if "{\"P\"" in i:
print (json.loads(i)["D"])
serial_data+=json.loads(i)["D"]
print("serialdata:"+serial_data)
return serial_data
def decode_output(raw_output):
# print(raw_output)
base64_bytes = raw_output.encode('ascii') #encode rawoutput message into a bytes-like object
output_bytes = base64.b64decode(base64_bytes)
return output_bytes.decode('utf-8')