Permalink
Browse files

bus.py - hack data compression for "tcp_point_data" name

  • Loading branch information...
m3d committed Jan 5, 2019
1 parent 4a54ccf commit a16b02f08bfef2544c18f60e7aaefba4bf23918b
Showing with 19 additions and 1 deletion.
  1. +8 −1 osgar/bus.py
  2. +11 −0 osgar/test_bus.py
@@ -2,6 +2,7 @@
Internal bus for communication among modules
"""
import time
import zlib
from queue import Queue
from datetime import timedelta

@@ -27,11 +28,17 @@ def __init__(self, logger, name='', out={}):
idx = self.logger.register('.'.join([self.name, publish_name]))
self.stream_id[publish_name] = idx
self._is_alive = True
self.compressed_output = (name == 'tcp_point_data') # hack

def publish(self, channel, data):
with self.logger.lock:
stream_id = self.stream_id[channel] # local maping of indexes
timestamp = self.logger.write(stream_id, serialize(data))
if self.compressed_output:
to_write = zlib.compress(serialize(data))
else:
to_write = serialize(data)
timestamp = self.logger.write(stream_id, to_write)

for queue, input_channel in self.out[channel]:
queue.put((timestamp, input_channel, data))
return timestamp
@@ -141,4 +141,15 @@ def test_bus_sleep(self):
bus = LogBusHandlerInputsOnly(logger, inputs={})
bus.sleep(0.1)

def test_compression(self):
logger = MagicMock()
logger.register = MagicMock(return_value=1)
bus = BusHandler(logger, out={'raw':[]}, name='tcp_point_data')
# TODO proper definition what should be compressed - now hardcoded to name "tcp_point_data"
self.assertTrue(bus.compressed_output)
bus.publish('raw', b'\00'*10000)
logger.write.assert_called_once_with(1,
b'x\x9c\xed\xc1\x01\r\x00\x00\x08\x03 #\xbc\x85\xfdC\xd8\xcb\x1e\x1fp\x9b' +
b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x80\x02\x0f\x9f\xba\x00\xfd')

# vim: expandtab sw=4 ts=4

0 comments on commit a16b02f

Please sign in to comment.