Skip to content
This repository has been archived by the owner on Oct 1, 2018. It is now read-only.

Remove backtracking. #5

Merged
merged 1 commit into from Apr 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -2,7 +2,7 @@

setup(name='telemetry-tools',
description='Utility code to work with Mozilla Telemetry data.',
version='1.0.9',
version='1.1.0',
author='Mozilla',
url='https://github.com/mozilla/telemetry-tools',
packages=['telemetry', 'telemetry.util'],
Expand Down
43 changes: 1 addition & 42 deletions telemetry/util/heka_message.py
Expand Up @@ -18,43 +18,6 @@
_record_separator = 0x1e


class BacktrackableFile:
def __init__(self, stream):
self._stream = stream
self._buffer = StringIO()

def read(self, size):
buffer_data = self._buffer.read(size)
to_read = size - len(buffer_data)

if to_read == 0:
return buffer_data

stream_data = self._stream.read(to_read)
self._buffer.write(stream_data)

return buffer_data + stream_data

def close(self):
self._buffer.close()
if type(self._stream) == boto.s3.key.Key:
if self._stream.resp: # Hack! Connections are kept around otherwise!
self._stream.resp.close()

self._stream.close(True)
else:
self._stream.close()

def backtrack(self):
buffer = self._buffer.getvalue()
index = buffer.find(chr(_record_separator), 1)

self._buffer = StringIO()
if index >= 0:
self._buffer.write(buffer[index:])
self._buffer.seek(0)


class UnpackedRecord():
def __init__(self, raw, header, message=None, error=None):
self.raw = raw
Expand Down Expand Up @@ -168,7 +131,7 @@ def unpack_string(string, **kwargs):
return unpack(StringIO(string), **kwargs)


def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_snappy=True):
def unpack(fin, raw=False, verbose=False, strict=False, try_snappy=True):
record_count = 0
bad_records = 0
total_bytes = 0
Expand All @@ -184,10 +147,6 @@ def unpack(fin, raw=False, verbose=False, strict=False, backtrack=False, try_sna
elif verbose:
print e

if backtrack and type(e) == DecodeError:
fin.backtrack()
continue

if r is None:
break

Expand Down
22 changes: 0 additions & 22 deletions telemetry/util/test_heka_message.py
Expand Up @@ -52,28 +52,6 @@ def test_unpack_strict(self):
threw = True
self.assertEquals(expected_exceptions[t], threw)

def test_backtracking_with_initial_separator(self):
# Test backtracking when the separator appears at the first character
w = hm.BacktrackableFile(StringIO("\x1eFOOBAR"))
self.assertEquals("\x1eFOOB", w.read(5))
w.backtrack()
self.assertEquals("AR", w.read(5))

def test_backtracking_with_mid_separator(self):
# Test backtracking when separator was read
w = hm.BacktrackableFile(StringIO("FOOBAR\x1eFOOBAR"))
self.assertEquals("FOOBAR\x1eFOO", w.read(10))
w.backtrack()
self.assertEquals("\x1eFOOBAR", w.read(10))

def test_backtracking_without_separator(self):
# Test backtracking when separator wasn't read
w = hm.BacktrackableFile(StringIO("FOOBAR\x1eFOOBAR"))
self.assertEquals("FOOBA", w.read(5))
w.backtrack()
self.assertEquals("R\x1eFOO", w.read(5))
self.assertEquals("BAR", w.read(5))


if __name__ == "__main__":
unittest.main()