-
Notifications
You must be signed in to change notification settings - Fork 0
/
telemetry.py
94 lines (71 loc) · 3.14 KB
/
telemetry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import random
class TelemetryClient(object):
# The communication with the server is simulated in this implementation.
# Because the focus of the exercise is on the other class.
DIAGNOSTIC_MESSAGE = "AT#UD"
def __init__(self):
self._online_status = False
self._diagnosticMessageJustSent = False
def get_online_status(self):
return self._online_status
def connect(self, telemetry_server_connection_string):
if (telemetry_server_connection_string is None or telemetry_server_connection_string == ""):
raise Exception()
# Fake the connection with 20% chances of success
success = random.randint(1, 10) <= 2
self._online_status = success
def disconnect(self):
self._online_status = False
def send(self, message):
if (message is None or message == ""):
raise Exception()
# The simulation of Send() actually just remember if the last message sent was a diagnostic message.
# This information will be used to simulate the Receive(). Indeed there is no real server listening.
if (message == TelemetryClient.DIAGNOSTIC_MESSAGE):
self._diagnosticMessageJustSent = True
else:
self._diagnosticMessageJustSent = False
def receive(self):
if (self._diagnosticMessageJustSent):
# Simulate the reception of the diagnostic message
message = """\
LAST TX rate................ 100 MBPS\r\n
HIGHEST TX rate............. 100 MBPS\r\n
LAST RX rate................ 100 MBPS\r\n
HIGHEST RX rate............. 100 MBPS\r\n
BIT RATE.................... 100000000\r\n
WORD LEN.................... 16\r\n
WORD/FRAME.................. 511\r\n
BITS/FRAME.................. 8192\r\n
MODULATION TYPE............. PCM/FM\r\n
TX Digital Los.............. 0.75\r\n
RX Digital Los.............. 0.10\r\n
BEP Test.................... -5\r\n
Local Rtrn Count............ 00\r\n
Remote Rtrn Count........... 00"""
self._diagnosticMessageJustSent = False
else:
# Simulate the reception of a response message returning a random message.
message = ""
messageLength = random.randint(0, 50) + 60
i = messageLength
while(i >= 0):
message += chr((random.randint(0, 40) + 86))
i -= 1
return message
class TelemetryDiagnosticControls:
_DiagnosticChannelConnectionString = "*111#"
def __init__(self):
self._telemetry_client = TelemetryClient()
self.diagnostic_info = ""
def check_transmission(self):
self.diagnostic_info = ""
self._telemetry_client.disconnect()
retryLeft = 3
while (self._telemetry_client.get_online_status() == False and retryLeft > 0):
self._telemetry_client.connect(TelemetryDiagnosticControls._DiagnosticChannelConnectionString)
retryLeft -= 1
if self._telemetry_client.get_online_status() == False:
raise Exception("Unable to connect.")
self._telemetry_client.send(TelemetryClient.DIAGNOSTIC_MESSAGE)
self.diagnostic_info = self._telemetry_client.receive()