-
-
Notifications
You must be signed in to change notification settings - Fork 65
/
ubxhelpers.py
624 lines (480 loc) · 16.5 KB
/
ubxhelpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
"""
Collection of UBX helper methods which can be used
outside the UBXMessage or UBXReader classes.
Created on 15 Dec 2020
:author: semuadmin
:copyright: SEMU Consulting © 2020
:license: BSD 3-Clause
"""
import struct
from datetime import datetime, timedelta
from math import cos, pi, sin, trunc
from pynmeagps.nmeatypes_core import NMEA_HDR
import pyubx2.exceptions as ube
import pyubx2.ubxtypes_configdb as ubcdb
import pyubx2.ubxtypes_core as ubt
from pyubx2.ubxtypes_core import (
NMEA_PROTOCOL,
POLL,
RTCM3_PROTOCOL,
SET,
UBX_HDR,
UBX_PROTOCOL,
)
from pyubx2.ubxtypes_decodes import FIXTYPE, GNSSLIST
EPOCH0 = datetime(1980, 1, 6) # EPOCH start date
LEAPOFFSET = 18 # leap year offset in seconds, valid as from 1/1/2017
SIW = 604800 # seconds in week = 3600*24*7
def att2idx(att: str) -> object:
"""
Get integer indices corresponding to grouped attribute.
e.g. svid_06 -> 6; gnssId_103 -> 103, gsid_03_04 -> (3,4), tow -> 0
:param str att: grouped attribute name e.g. svid_01
:return: indices as integer(s), or 0 if not grouped
:rtype: int or tuple for nested group
"""
try:
att = att.split("_")
ln = len(att)
if ln == 2: # one group level
return int(att[1])
if ln > 2: # nested group level(s)
return tuple(int(att[i]) for i in range(1, ln))
return 0 # not grouped
except ValueError:
return 0
def att2name(att: str) -> str:
"""
Get name of grouped attribute.
e.g. svid_06 -> svid; gnssId_103 -> gnssId, tow -> tow
:param str att: grouped attribute name e.g. svid_01
:return: name without index e.g. svid
:rtype: str
"""
return att.split("_")[0]
def calc_checksum(content: bytes) -> bytes:
"""
Calculate checksum using 8-bit Fletcher's algorithm.
:param bytes content: message content, excluding header and checksum bytes
:return: checksum
:rtype: bytes
"""
check_a = 0
check_b = 0
for char in content:
check_a += char
check_a &= 0xFF
check_b += check_a
check_b &= 0xFF
return bytes((check_a, check_b))
def isvalid_checksum(message: bytes) -> bool:
"""
Validate message checksum.
:param bytes message: message including header and checksum bytes
:return: checksum valid flag
:rtype: bool
"""
lenm = len(message)
ckm = message[lenm - 2 : lenm]
return ckm == calc_checksum(message[2 : lenm - 2])
def atttyp(att: str) -> str:
"""
Helper function to return attribute type as string.
:param str: attribute type e.g. 'U002'
:return: type of attribute as string e.g. 'U'
:rtype: str
"""
return att[0:1]
def attsiz(att: str) -> int:
"""
Helper function to return attribute size in bytes.
:param str: attribute type e.g. 'U002'
:return: size of attribute in bytes
:rtype: int
"""
return int(att[1:4])
def itow2utc(itow: int) -> datetime.time:
"""
Convert GPS Time Of Week to UTC time
:param int itow: GPS Time Of Week in milliseconds
:return: UTC time hh.mm.ss
:rtype: datetime.time
"""
utc = EPOCH0 + timedelta(seconds=(itow / 1000) - LEAPOFFSET)
return utc.time()
def utc2itow(utc: datetime) -> tuple:
"""
Convert UTC datetime to GPS Week Number, Time Of Week
:param datetime utc: datetime
:return: GPS Week Number, Time of Week in milliseconds
:rtype: tuple
"""
wno = int((utc - EPOCH0).total_seconds() / SIW)
sow = EPOCH0 + timedelta(seconds=wno * SIW)
itow = int(((utc - sow).total_seconds() + LEAPOFFSET) * 1000)
return wno, itow
def gpsfix2str(fix: int) -> str:
"""
Convert GPS fix integer to descriptive string.
:param int fix: GPS fix type (0-5)
:return: GPS fix type as string
:rtype: str
"""
try:
return FIXTYPE[fix]
except KeyError:
return str(fix)
def dop2str(dop: float) -> str:
"""
Convert Dilution of Precision float to descriptive string.
:param float dop: dilution of precision as float
:return: dilution of precision as string
:rtype: str
"""
if dop == 1:
dops = "Ideal"
elif dop <= 2:
dops = "Excellent"
elif dop <= 5:
dops = "Good"
elif dop <= 10:
dops = "Moderate"
elif dop <= 20:
dops = "Fair"
else:
dops = "Poor"
return dops
def gnss2str(gnss_id: int) -> str:
"""
Convert GNSS ID to descriptive string
('GPS', 'GLONASS', etc.).
:param int gnss_id: GNSS identifier as integer (0-6)
:return: GNSS identifier as string
:rtype: str
"""
try:
return GNSSLIST[gnss_id]
except KeyError:
return str(gnss_id)
def key_from_val(dictionary: dict, value) -> str:
"""
Helper method - get dictionary key corresponding to (unique) value.
:param dict dictionary: dictionary
:param object value: unique dictionary value
:return: dictionary key
:rtype: str
:raises: KeyError: if no key found for value
"""
val = None
for key, val in dictionary.items():
if val == value:
return key
raise KeyError(f"No key found for value {value}")
def get_bits(bitfield: bytes, bitmask: int) -> int:
"""
Get integer value of specified (masked) bit(s) in a UBX bitfield (attribute type 'X')
e.g. to get value of bits 6,7 in bitfield b'\\\\x89' (binary 0b10001001)::
get_bits(b'\\x89', 0b11000000) = get_bits(b'\\x89', 192) = 2
:param bytes bitfield: bitfield byte(s)
:param int bitmask: bitmask as integer (= Σ(2**n), where n is the number of the bit)
:return: value of masked bit(s)
:rtype: int
"""
i = 0
val = int(bitfield.hex(), 16)
while bitmask & 1 == 0:
bitmask = bitmask >> 1
i += 1
return val >> i & bitmask
def val2bytes(val, att: str) -> bytes:
"""
Convert value to bytes for given UBX attribute type.
:param object val: attribute value e.g. 25
:param str att: attribute type e.g. 'U004'
:return: attribute value as bytes
:rtype: bytes
:raises: UBXTypeError
"""
if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE)
return val.encode("utf-8", "backslashreplace")
atts = attsiz(att)
if atttyp(att) in ("C", "X"): # byte or char
valb = val
elif atttyp(att) in ("E", "L", "U"): # unsigned integer
valb = val.to_bytes(atts, byteorder="little", signed=False)
elif atttyp(att) == "A": # array of unsigned integers
atts = attsiz(att)
valb = b""
for i in range(atts):
valb += val[i].to_bytes(1, byteorder="little", signed=False)
elif atttyp(att) == "I": # signed integer
valb = val.to_bytes(atts, byteorder="little", signed=True)
elif att == ubt.R4: # single precision floating point
valb = struct.pack("<f", val)
elif att == ubt.R8: # double precision floating point
valb = struct.pack("<d", val)
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return valb
def bytes2val(valb: bytes, att: str) -> object:
"""
Convert bytes to value for given UBX attribute type.
:param bytes valb: attribute value in byte format e.g. b'\\\\x19\\\\x00\\\\x00\\\\x00'
:param str att: attribute type e.g. 'U004'
:return: attribute value as int, float, str or bytes
:rtype: object
:raises: UBXTypeError
"""
if att == ubt.CH: # single variable-length string (e.g. INF-NOTICE)
val = valb.decode("utf-8", "backslashreplace")
elif atttyp(att) in ("X", "C"):
val = valb
elif atttyp(att) in ("E", "L", "U"): # unsigned integer
val = int.from_bytes(valb, "little", signed=False)
elif atttyp(att) == "A": # array of unsigned integers
atts = attsiz(att)
val = []
for i in range(atts):
val.append(valb[i])
elif atttyp(att) == "I": # signed integer
val = int.from_bytes(valb, "little", signed=True)
elif att == ubt.R4: # single precision floating point
val = struct.unpack("<f", valb)[0]
elif att == ubt.R8: # double precision floating point
val = struct.unpack("<d", valb)[0]
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return val
def nomval(att: str) -> object:
"""
Get nominal value for given UBX attribute type.
:param str att: attribute type e.g. 'U004'
:return: attribute value as int, float, str or bytes
:rtype: object
:raises: UBXTypeError
"""
if att == "CH":
val = ""
elif atttyp(att) in ("X", "C"):
val = b"\x00" * attsiz(att)
elif atttyp(att) == "R":
val = 0.0
elif atttyp(att) in ("E", "I", "L", "U"):
val = 0
elif atttyp(att) == "A": # array of unsigned integers
val = [0] * attsiz(att)
else:
raise ube.UBXTypeError(f"Unknown attribute type {att}")
return val
def msgclass2bytes(msgclass: int, msgid: int) -> bytes:
"""
Convert message class/id integers to bytes.
:param int msgClass: message class as integer e.g. 6
:param int msgID: message ID as integer e.g. 1
:return: message class as bytes e.g. b'/x06/x01'
:rtype: bytes
"""
msgclass = val2bytes(msgclass, ubt.U1)
msgid = val2bytes(msgid, ubt.U1)
return (msgclass, msgid)
def msgstr2bytes(msgclass: str, msgid: str) -> bytes:
"""
Convert plain text UBX message class to bytes.
:param str msgClass: message class as str e.g. 'CFG'
:param str msgID: message ID as str e.g. 'CFG-MSG'
:return: message class as bytes e.g. b'/x06/x01'
:rtype: bytes
:raises: UBXMessageError
"""
try:
clsid = key_from_val(ubt.UBX_CLASSES, msgclass)
msgid = key_from_val(ubt.UBX_MSGIDS, msgid)[1:2]
return (clsid, msgid)
except KeyError as err:
raise ube.UBXMessageError(
f"Undefined message, class {msgclass}, id {msgid}"
) from err
def cfgname2key(name: str) -> tuple:
"""
Return hexadecimal key and data type for given
configuration database key name.
:param str name: config key as string e.g. "CFG_NMEA_PROTVER"
:return: tuple of (key, type)
:rtype: tuple: (int, str)
:raises: UBXMessageError
"""
try:
return ubcdb.UBX_CONFIG_DATABASE[name]
except KeyError as err:
raise ube.UBXMessageError(
f"Undefined configuration database key {name}"
) from err
def cfgkey2name(keyid: int) -> tuple:
"""
Return key name and data type for given
configuration database hexadecimal key.
:param int keyID: config key as integer e.g. 0x20930001
:return: tuple of (keyname, type)
:rtype: tuple: (str, str)
:raises: UBXMessageError
"""
try:
val = None
for key, val in ubcdb.UBX_CONFIG_DATABASE.items():
(kid, typ) = val
if keyid == kid:
return (key, typ)
# undocumented configuration database key
# type is derived from keyID
key = f"CFG_{hex(keyid)}"
typ = f"X{ubcdb.UBX_CONFIG_STORSIZE[int(hex(keyid)[2:3])]:03d}"
return (key, typ)
except KeyError as err:
raise ube.UBXMessageError(
f"Invalid configuration database key {hex(keyid)}"
) from err
def protocol(raw: bytes) -> int:
"""
Gets protocol of raw message.
:param bytes raw: raw (binary) message
:return: protocol type (1 = NMEA, 2 = UBX, 4 = RTCM3, 0 = unknown)
:rtype: int
"""
p = raw[0:2]
if p == UBX_HDR:
return UBX_PROTOCOL
if p in NMEA_HDR:
return NMEA_PROTOCOL
if p[0] == 0xD3 and (p[1] & ~0x03) == 0:
return RTCM3_PROTOCOL
return 0
def hextable(raw: bytes, cols: int = 8) -> str:
"""
Formats raw (binary) message in tabular hexadecimal format e.g.
000: 2447 4e47 5341 2c41 2c33 2c33 342c 3233 | b'$GNGSA,A,3,34,23' |
:param bytes raw: raw (binary) data
:param int cols: number of columns in hex table (8)
:return: table of hex data
:rtype: str
"""
hextbl = ""
colw = cols * 4
rawh = raw.hex()
for i in range(0, len(rawh), colw):
rawl = rawh[i : i + colw].ljust(colw, " ")
hextbl += f"{int(i/2):03}: "
for col in range(0, colw, 4):
hextbl += f"{rawl[col : col + 4]} "
hextbl += f" | {bytes.fromhex(rawl)} |\n"
return hextbl
def cel2cart(elevation: float, azimuth: float) -> tuple:
"""
Convert celestial coordinates (degrees) to Cartesian coordinates.
:param float elevation: elevation
:param float azimuth: azimuth
:return: cartesian x,y coordinates
:rtype: tuple
"""
if not (isinstance(elevation, (float, int)) and isinstance(azimuth, (float, int))):
return (0, 0)
ele, azi = [c * pi / 180 for c in (elevation, azimuth)]
x = cos(azi) * cos(ele)
y = sin(azi) * cos(ele)
return (x, y)
def escapeall(val: bytes) -> str:
"""
Escape all byte characters e.g. b'\\\\x73' rather than b`s`
:param bytes val: bytes
:return: string of escaped bytes
:rtype: str
"""
return "b'{}'".format("".join(f"\\x{b:02x}" for b in val))
def val2sphp(val: float, scale: float = 1e-7) -> tuple:
"""
Convert a float value into separate standard and high precisions components,
multiplied by a scaling factor to render them as integers, as required by some
CFG and NAV messages.
e.g. 48.123456789 becomes (481234567, 89)
:param float val: value as float
:param float scale: scaling factor e.g. 1e-7
:return: tuple of (standard precision, high precision)
:rtype: tuple
"""
val = val / scale
val_sp = trunc(val)
val_hp = round((val - val_sp) * 100)
return val_sp, val_hp
def getinputmode(data: bytes) -> int:
"""
Return input message mode (SET or POLL).
:param bytes data: raw UBX input message
:return: message mode (1 = SET, 2 = POLL)
:rtype: int
"""
if (
len(data) == 8
or data[2:4] == b"\x06\x8b" # CFG-VALGET
or (
data[2:4]
in (
b"\x06\x01",
b"\x06\x02",
b"\x06\x03",
b"\x06\x31",
) # CFG-INF, CFG-MSG, CFG-PRT, CFG-TP5
and len(data) <= 10
)
):
return POLL
return SET
def process_monver(msg: object) -> dict:
"""
Process parsed MON-VER sentence into dictionary of
hardware, firmware and software version identifiers.
:param UBXMessage msg: UBX MON-VER config message
:return: dict of version information
:rtype: dict
"""
exts = []
fw_version = "N/A"
rom_version = "N/A"
gnss_supported = ""
model = ""
sw_version = getattr(msg, "swVersion", b"N/A")
sw_version = sw_version.replace(b"\x00", b"").decode()
sw_version = sw_version.replace("ROM CORE", "ROM")
sw_version = sw_version.replace("EXT CORE", "Flash")
hw_version = getattr(msg, "hwVersion", b"N/A")
hw_version = hw_version.replace(b"\x00", b"").decode()
for i in range(9):
ext = getattr(msg, f"extension_{i+1:02d}", b"")
ext = ext.replace(b"\x00", b"").decode()
exts.append(ext)
if "FWVER=" in exts[i]:
fw_version = exts[i].replace("FWVER=", "")
if "PROTVER=" in exts[i]:
rom_version = exts[i].replace("PROTVER=", "")
if "PROTVER " in exts[i]:
rom_version = exts[i].replace("PROTVER ", "")
if "MOD=" in exts[i]:
model = exts[i].replace("MOD=", "")
hw_version = f"{model} {hw_version}"
for gnss in (
"GPS",
"GLO",
"GAL",
"BDS",
"SBAS",
"IMES",
"QZSS",
"NAVIC",
):
if gnss in exts[i]:
gnss_supported = gnss_supported + gnss + " "
verdata = {}
verdata["swversion"] = sw_version
verdata["hwversion"] = hw_version
verdata["fwversion"] = fw_version
verdata["romversion"] = rom_version
verdata["gnss"] = gnss_supported
return verdata