From e774ab8f1e1a3e2fed848ab184c9fb8269e691cf Mon Sep 17 00:00:00 2001 From: adrian-kong Date: Mon, 9 May 2022 10:38:28 +1000 Subject: [PATCH 1/3] using handler + framer --- python/sbp/sbp2json.py | 79 +++++++----------------------------------- 1 file changed, 13 insertions(+), 66 deletions(-) diff --git a/python/sbp/sbp2json.py b/python/sbp/sbp2json.py index 2471f9aeb6..d6ed04bb5e 100644 --- a/python/sbp/sbp2json.py +++ b/python/sbp/sbp2json.py @@ -2,26 +2,17 @@ import sys -import io - import json -from construct.core import StreamError - -import sbp.msg -import sbp.table - -from sbp.msg import SBP_PREAMBLE -from sbp.msg import UnpackError +from sbp.client import Handler, Framer - -DEFAULT_JSON='rapidjson' -JSON_CHOICES=['json', 'rapidjson'] +DEFAULT_JSON = 'rapidjson' +JSON_CHOICES = ['json', 'rapidjson'] try: import rapidjson except ImportError: - DEFAULT_JSON='json' - JSON_CHOICES=['json'] + DEFAULT_JSON = 'json' + JSON_CHOICES = ['json'] try: memoryview @@ -69,56 +60,12 @@ def dump(args, res): sys.stdout.write("\n") -def sbp_main(args): - header_len = 6 - reader = io.open(args.file.fileno(), 'rb') - buf = memoryview(bytearray(4096)) - unconsumed_offset = 0 - read_offset = 0 - buffer_remaining = len(buf) - include = set(args.include) - while True: - if buffer_remaining == 0: - buf[0:(read_offset - unconsumed_offset)] = buf[unconsumed_offset:read_offset] - read_offset = read_offset - unconsumed_offset - unconsumed_offset = 0 - buffer_remaining = len(buf) - read_offset - mv = buf[read_offset:] - read_length = reader.readinto(mv) - if read_length == 0: - unconsumed = read_offset - unconsumed_offset - if unconsumed != 0: - sys.stderr.write("unconsumed: {}\n".format(unconsumed)) - sys.stderr.flush() - break - read_offset += read_length - buffer_remaining -= read_length - while True: - bytes_available = read_offset - unconsumed_offset - b = buf[unconsumed_offset:(unconsumed_offset + bytes_available)] - if len(b) == 0: - break - if b[0] != SBP_PREAMBLE: - consumed = 1 - else: - try: - m = sbp.msg.SBP.unpack(b) - if not include or m.msg_type in include: - m = sbp.table.dispatch(m) - dump(args, m) - consumed = header_len + m.length + 2 - except (UnpackError, StreamError): - break - except ValueError: - consumed = 1 - unconsumed_offset += consumed - - def module_main(): - args = get_args() - if not args: - sys.exit(1) - try: - sbp_main(args) - except KeyboardInterrupt: - pass + if args := get_args(): + with Handler(Framer(args.file.read, None, verbose=True)) as source: + try: + it = source.__iter__() + while msg_meta := next(it, None): + dump(args, msg_meta[0]) + except KeyboardInterrupt: + pass From 447e373ac2a367ae1c318145545973c32437798b Mon Sep 17 00:00:00 2001 From: adrian-kong Date: Tue, 10 May 2022 07:58:27 +1000 Subject: [PATCH 2/3] no walrus! and changed to use for loop --- python/sbp/sbp2json.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/sbp/sbp2json.py b/python/sbp/sbp2json.py index d6ed04bb5e..ddafe8bb77 100644 --- a/python/sbp/sbp2json.py +++ b/python/sbp/sbp2json.py @@ -61,11 +61,11 @@ def dump(args, res): def module_main(): - if args := get_args(): + args = get_args() + if args: with Handler(Framer(args.file.read, None, verbose=True)) as source: try: - it = source.__iter__() - while msg_meta := next(it, None): - dump(args, msg_meta[0]) + for msg, meta in source: + dump(args, msg) except KeyboardInterrupt: pass From b5debccd7ff956c96848d731b0269fe7a0973df0 Mon Sep 17 00:00:00 2001 From: adrian-kong Date: Tue, 10 May 2022 08:51:56 +1000 Subject: [PATCH 3/3] fixes the ordering and breaks when finished reading --- python/sbp/client/framer.py | 6 +++++- python/sbp/sbp2json.py | 9 +++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/python/sbp/client/framer.py b/python/sbp/client/framer.py index 17b976dfe9..623d64ec70 100644 --- a/python/sbp/client/framer.py +++ b/python/sbp/client/framer.py @@ -48,7 +48,8 @@ def __init__(self, dispatcher=dispatch, into_buffer=True, skip_metadata=False, - sender_id_filter_list=[]): + sender_id_filter_list=[], + stop_when_empty=False): self._read = read self._write = write self._verbose = verbose @@ -59,6 +60,7 @@ def __init__(self, self._into_buffer = into_buffer self._skip_metadata = skip_metadata self._sender_id_filter_list = sender_id_filter_list + self._stop_when_empty = stop_when_empty def __iter__(self): self._broken = False @@ -86,6 +88,8 @@ def __next__(self): while msg is None: try: msg = self._receive() + if self._stop_when_empty and not msg: + raise StopIteration if self._broken: raise StopIteration except IOError: diff --git a/python/sbp/sbp2json.py b/python/sbp/sbp2json.py index ddafe8bb77..55556d91a7 100644 --- a/python/sbp/sbp2json.py +++ b/python/sbp/sbp2json.py @@ -63,9 +63,14 @@ def dump(args, res): def module_main(): args = get_args() if args: - with Handler(Framer(args.file.read, None, verbose=True)) as source: + with Handler(Framer(args.file.read, None, verbose=True, stop_when_empty=True), autostart=False) as source: try: - for msg, meta in source: + it = iter(source) + source.start() + msg, meta = next(it, (None, None)) + while msg: dump(args, msg) + msg, meta = next(it, (None, None)) except KeyboardInterrupt: pass + args.file.close()