Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/sbp/client/framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def __init__(self,
dispatcher=dispatch,
into_buffer=True,
skip_metadata=False,
sender_id_filter_list=[]):
sender_id_filter_list=[],
stop_when_empty=False):
self._read = read
self._write = write
self._verbose = verbose
Expand All @@ -59,6 +60,7 @@ def __init__(self,
self._into_buffer = into_buffer
self._skip_metadata = skip_metadata
self._sender_id_filter_list = sender_id_filter_list
self._stop_when_empty = stop_when_empty

def __iter__(self):
self._broken = False
Expand Down Expand Up @@ -86,6 +88,8 @@ def __next__(self):
while msg is None:
try:
msg = self._receive()
if self._stop_when_empty and not msg:
raise StopIteration
if self._broken:
raise StopIteration
except IOError:
Expand Down
82 changes: 17 additions & 65 deletions python/sbp/sbp2json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,17 @@

import sys

import io

import json

from construct.core import StreamError

import sbp.msg
import sbp.table

from sbp.msg import SBP_PREAMBLE
from sbp.msg import UnpackError
from sbp.client import Handler, Framer


DEFAULT_JSON='rapidjson'
JSON_CHOICES=['json', 'rapidjson']
DEFAULT_JSON = 'rapidjson'
JSON_CHOICES = ['json', 'rapidjson']
try:
import rapidjson
except ImportError:
DEFAULT_JSON='json'
JSON_CHOICES=['json']
DEFAULT_JSON = 'json'
JSON_CHOICES = ['json']

try:
memoryview
Expand Down Expand Up @@ -69,56 +60,17 @@ def dump(args, res):
sys.stdout.write("\n")


def sbp_main(args):
header_len = 6
reader = io.open(args.file.fileno(), 'rb')
buf = memoryview(bytearray(4096))
unconsumed_offset = 0
read_offset = 0
buffer_remaining = len(buf)
include = set(args.include)
while True:
if buffer_remaining == 0:
buf[0:(read_offset - unconsumed_offset)] = buf[unconsumed_offset:read_offset]
read_offset = read_offset - unconsumed_offset
unconsumed_offset = 0
buffer_remaining = len(buf) - read_offset
mv = buf[read_offset:]
read_length = reader.readinto(mv)
if read_length == 0:
unconsumed = read_offset - unconsumed_offset
if unconsumed != 0:
sys.stderr.write("unconsumed: {}\n".format(unconsumed))
sys.stderr.flush()
break
read_offset += read_length
buffer_remaining -= read_length
while True:
bytes_available = read_offset - unconsumed_offset
b = buf[unconsumed_offset:(unconsumed_offset + bytes_available)]
if len(b) == 0:
break
if b[0] != SBP_PREAMBLE:
consumed = 1
else:
try:
m = sbp.msg.SBP.unpack(b)
if not include or m.msg_type in include:
m = sbp.table.dispatch(m)
dump(args, m)
consumed = header_len + m.length + 2
except (UnpackError, StreamError):
break
except ValueError:
consumed = 1
unconsumed_offset += consumed


def module_main():
args = get_args()
if not args:
sys.exit(1)
try:
sbp_main(args)
except KeyboardInterrupt:
pass
if args:
with Handler(Framer(args.file.read, None, verbose=True, stop_when_empty=True), autostart=False) as source:
try:
it = iter(source)
source.start()
msg, meta = next(it, (None, None))
while msg:
dump(args, msg)
msg, meta = next(it, (None, None))
except KeyboardInterrupt:
pass
args.file.close()