Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Began work on Extractor. TODO: finish tests

  • Loading branch information...
commit 8dd6d4f90d443096ebd1bf8d24d2695816558043 1 parent be05132
@danielrichman danielrichman authored
Showing with 194 additions and 0 deletions.
  1. +34 −0 habitat/tests/test_uploader.py
  2. +160 −0 habitat/uploader.py
View
34 habitat/tests/test_uploader.py
@@ -500,3 +500,37 @@ def check(x):
self.uthr.join()
self.mocker.VerifyAll()
+
+
+# Mox-esque class that is 'equal' to another string if the value it is
+# initialised is contained in that string; used to avoid writing out the
+# large extractor log messages in the tests
+class EqualIfIn:
+ def __init__(self, test):
+ self.test = test
+ def __eq__(self, rhs):
+ return isinstance(rhs, basestring) and self.test.lower() in rhs.lower()
+ def __repr__(self):
+ return "<EqIn " + repr(self.test) + ">"
+
+
+class TestExtractorManager(object):
+ pass
+
+
+class TestUKHASExtractor(object):
+ def setup(self):
+ self.mocker = mox.Mox()
+ self.uplr = self.mocker.CreateMock(uploader.Uploader)
+ self.mgr = uploader.ExtractorManager(self.uplr)
+ self.ukhas_extractor = uploader.UKHASExtractor()
+ self.mgr.add(self.ukhas_extractor)
+
+ self.mocker.StubOutWithMock(self.mgr, "status")
+ self.mocker.StubOutWithMock(self.mgr, "data")
+
+ def teardown(self):
+ self.mocker.UnsetStubs()
+
+ def test_pass(self):
+ pass
View
160 habitat/uploader.py
@@ -438,3 +438,163 @@ def run(self):
self.caught_exception()
self._queue.task_done()
+
+
+class ExtractorManager(object):
+ """
+ Manage one or more :class:`Extractor` objects, and handle their logging.
+
+ The extractor manager maintains a list of :class:`Extractor` objects.
+ Any :meth:`push` or :meth:`skipped` calls are passed directly to each
+ added Extractor in turn. If any Extractor produces logging output, or
+ parsed data, it is returned to the :meth:`status` and :meth:`data` methods,
+ which the user should override.
+
+ The ExtractorManager also handles thread safety for all Extractors
+ (i.e., it holds a lock while pushing data to each extractor). Your
+ :meth:`status` and :meth:`data` methods should be thread safe if you want
+ to call the ExtractorManager from more than one thread.
+ """
+
+ PUSH_NONE = 0x0
+ PUSH_BAUDOT_HACK = 0x1
+
+ def __init__(self, uploader):
+ """uploader: an :class:`Uploader` or :class:`UploaderThread` object"""
+ self.uploader = uploader
+ self._extractors = []
+
+ def add(self, extractor):
+ """Add the extractor object to the manager"""
+ self._extractors.append(extractor)
+ extractor.manager = self
+
+ def push(self, b, flags=PUSH_NONE):
+ """
+ Push a received byte of data, b, to all extractors.
+
+ b must be of type str (i.e., ascii, not unicode) and of length 1.
+
+ The optional argument flags provides additional information about
+ the data. The only flag, currently, is
+ ExtractorManager.PUSH_BAUDOT_HACK. This is used when decoding baudot,
+ which doesn't support the '*' character, as the UKHASExtractor needs
+ to know to replace all '#' characters with '*'s.
+ """
+
+ assert len(b) == 0 and isinstance(b, str)
+
+ for e in self._extractors:
+ e.push(b, flags)
+
+ def skipped(self, n):
+ """
+ Tell all extractors that approximately n undecodable bytes have passed
+
+ This advises extractors that some bytes couldn't be decoded for
+ whatever reason, but were transmitted. This can assist some
+ fixed-size packet formats in recovering from errors if one byte is
+ dropped, say, due to the start bit being flipped. It also causes
+ Extractors to 'give up' after a certain amount of time has passed.
+ """
+ for e in self._extractors:
+ e.skipped(n)
+
+ def status(self, msg):
+ """Logging method, called by Extractors when something happens"""
+ return NotImplementedError
+
+ def data(self, d):
+ """Called by Extractors if they are able to parse extracted data"""
+ return NotImplementedError
+
+
+class Extractor(object):
+ """
+ A base class for an Extractor.
+
+ An extractor is responsible for identifying telemetry in a stream of bytes,
+ and extracting them as standalone strings. This may be by using start/end
+ delimiters, or packet lengths, or whatever. Extracted strings are passed
+ to :meth:`Uploader.payload_telemetry` via the ExtractorManager.
+
+ An extractor may optionally attempt to parse the data it has extracted.
+ This does not affect the upload of extracted data, and offical parsing
+ is done by the habitat server, but may be useful to display in a GUI.
+ It could even be a stripped down parser capable of only a subset of the
+ full protocol, or able to parse the bare minimum only. If it succeeds,
+ the result is passed to :meth:`ExtractorManager.data`.
+ """
+
+ def __init__(self):
+ self.manager = None
+
+ def push(self, b, flags):
+ """see :meth:`ExtractorManager.push`"""
+ raise NotImplementedError
+
+ def skipped(self, n):
+ """see :meth:`ExtractorManager.skipped`"""
+ raise NotImplementedError
+
+
+class UKHASExtractor(Extractor):
+ def __init__(self):
+ super(UKHASExtractor, self).__init__()
+ self.last = None
+ self.buffer = ""
+ self.garbage_count = 0
+ self.extracting = False
+
+ def push(self, b, flags):
+ if b == '\r':
+ b = '\n'
+
+ if self.last == '$' and b == '$':
+ self.buffer = b
+ self.garbage_count = 0
+ self.extracting = True
+
+ self.manager.status("UKHAS Extractor: found start delimiter")
+
+ elif self.extracting and b == '\n':
+ self.buffer += b
+ self.manager.uploader.payload_telemetry(self.buffer)
+
+ self.manager.status("UKHAS Extractor: extracted string")
+
+ try:
+ # TODO self.manager.data(self.crude_parse(self.buffer))
+ pass
+ except (ValueError, KeyError) as e:
+ self.manager.status("UKHAS Extractor: crude parse failed: " +
+ str(e))
+
+ self.manager.data()
+
+ self.buffer = None
+ self.extracting = False
+
+ elif self.extracting:
+ if flags & ExtractorManager.PUSH_BAUDOT_HACK and b == '#':
+ # baudot doesn't support '*', we use '#'
+ b = '*'
+
+ self.buffer += b
+
+ if b < 0x20 or b > 0x7E:
+ # Non ascii chars
+ self.garbage_count += 1
+
+ # Sane limits to avoid uploading tonnes of garbage
+ if len(self.buffer) > 1000 or self.garbage_count > 16:
+ self.manager.status("UKHAS Extractor: giving up")
+
+ self.buffer = None
+ self.extracting = False
+
+ self.last = b
+
+ def skipped(self, n):
+ for i in xrange(n):
+ self.push("\0")
Please sign in to comment.
Something went wrong with that request. Please try again.