diff --git a/hubblestack/hec/dq.py b/hubblestack/hec/dq.py index 6d7cbea4a..4dc707970 100644 --- a/hubblestack/hec/dq.py +++ b/hubblestack/hec/dq.py @@ -7,6 +7,8 @@ import shutil import json from collections import deque +from hubblestack.utils.misc import numbered_file_split_key +from hubblestack.utils.encoding import encode_something_to_bytes, decode_something_to_string __all__ = [ 'QueueTypeError', 'QueueCapacityError', 'MemQueue', 'DiskQueue', @@ -37,6 +39,7 @@ def check_type(self, item): if not isinstance(item, self.ok_types): raise QueueTypeError('type({0}) is not ({1})'.format(type(item), self.ok_types)) + class NoQueue(object): cn = 0 def put(self, *a, **kw): @@ -49,22 +52,6 @@ def __bool__(self): return False __nonzero__ = __bool__ # stupid python2 -def numbered_file_split_key(x): - """ for sorting purposes, split filenames like '238048.11', '238048.17', - '238048.0' into lists of integers. E.g.: - - for fname in sorted(filenames, key=numbered_file_split_key): - do_things_ordered_by_integer_sort() - """ - try: - return [int(i) for i in x.split('.')] - except: - pass - try: - return [int(x)] - except: - pass - return list() class DiskQueue(OKTypesMixin): sep = b' ' @@ -85,6 +72,7 @@ def __bool__(self): __nonzero__ = __bool__ # stupid python2 def compress(self, dat): + dat = encode_something_to_bytes(dat) if not self.compression: return dat def _bz2(x): @@ -100,7 +88,8 @@ def unlink_(self, fname): os.unlink(name) def decompress(self, dat): - if str(dat).startswith('BZ'): + dat = encode_something_to_bytes(dat) + if dat.startswith(b'BZ'): try: return bz2.BZ2Decompressor().decompress(dat) except IOError: @@ -147,8 +136,6 @@ def put(self, item, **meta): f = os.path.join(d, remainder) with open(f, 'wb') as fh: log.debug('writing item to disk cache') - if isinstance(bstr, str): - bstr = str.encode(bstr) fh.write(bstr) if meta: with open(f + '.meta', 'w') as fh: @@ -176,7 +163,7 @@ def peek(self): """ for fname in self.files: with open(fname, 'rb') as fh: - return self.decompress(fh.read()), self.read_meta(fname) + return decode_something_to_string(self.decompress(fh.read())), self.read_meta(fname) def get(self): """ get the next item from the queue @@ -184,15 +171,15 @@ def get(self): """ for fname in self.files: with open(fname, 'rb') as fh: - ret = self.decompress(fh.read()) - ret = ret, self.read_meta(fname) + dat = self.decompress(fh.read()) + mdat = self.read_meta(fname) sz = os.stat(fname).st_size self.unlink_(fname) self.cn -= 1 self.sz -= sz if self.double_check_cnsz: self._count(double_check_only=True, tag='get') - return ret + return decode_something_to_string(dat), mdat def getz(self, sz=SPLUNK_MAX_MSG): """ fetch items from the queue and concatenate them together using the @@ -235,7 +222,7 @@ def getz(self, sz=SPLUNK_MAX_MSG): # # occasionally this will return something pessimistic meta_data[k] = max(meta_data[k]) - return ret, meta_data + return decode_something_to_string(ret), meta_data def pop(self): """ remove the next item from the queue (do not return it); useful with .peek() """ diff --git a/hubblestack/utils/encoding.py b/hubblestack/utils/encoding.py index bbdcb8e56..62f62bead 100644 --- a/hubblestack/utils/encoding.py +++ b/hubblestack/utils/encoding.py @@ -40,3 +40,15 @@ def encode_base64(starting_string, format_chained=True, chained=None, chained_st ret = base64.b64encode(starting_string) return bool(ret), ret + +def encode_something_to_bytes(x): + """ take strings or bytes or whatever and convert to bytes """ + if isinstance(x, (bytes,bytearray)): + return x + return x.encode('utf-8') + +def decode_something_to_string(x): + """ take strings or bytes or whatever and convert to string """ + if isinstance(x, (bytes,bytearray)): + return x.decode('utf-8') + return x diff --git a/hubblestack/utils/misc.py b/hubblestack/utils/misc.py new file mode 100644 index 000000000..0557fd9bc --- /dev/null +++ b/hubblestack/utils/misc.py @@ -0,0 +1,18 @@ +# coding: utf-8 + +def numbered_file_split_key(x): + """ for sorting purposes, split filenames like '238048.11', '238048.17', + '238048.0' into lists of integers. E.g.: + + for fname in sorted(filenames, key=numbered_file_split_key): + do_things_ordered_by_integer_sort() + """ + try: + return [int(i) for i in x.split('.')] + except: + pass + try: + return [int(x)] + except: + pass + return list() diff --git a/tests/unittests/test_hec_dq.py b/tests/unittests/test_hec_dq.py index 6bb022bcf..28b9e33c6 100644 --- a/tests/unittests/test_hec_dq.py +++ b/tests/unittests/test_hec_dq.py @@ -12,20 +12,33 @@ def samp(): @pytest.fixture def dq(): - return DiskQueue(TEST_DQ_DIR, size=100, fresh=True) + return DiskQueue(TEST_DQ_DIR, fresh=True) -def test_disk_queue(dq): +@pytest.fixture +def dqc(): + return DiskQueue(TEST_DQ_DIR + ".bz2", fresh=True, compression=9) + +def _test_disk_queue(dq): borked = False dq.put('one', testinator=3) dq.put('two', testinator=4) dq.put('three', testinator=5) - assert len(dq) == 13 - assert dq.peek() == (b'one', {'testinator': 3}) - assert dq.get() == (b'one', {'testinator': 3}) - assert dq.peek() == (b'two', {'testinator': 4}) - assert len(dq) == 9 + if not dq.compression: + # NOTE: with the huffman headers (or whatever), the size of the dq is + # probably quite a lot larger than the expected 13. the test is + # essentially meaningless unless the uncompressed message was large + # enough… then we could test to see if the dq len was smaller than + # expected or something… let's just skip this for the compressed dq + assert len(dq) == 13 + + assert dq.peek() == ('one', {'testinator': 3}) + assert dq.get() == ('one', {'testinator': 3}) + assert dq.peek() == ('two', {'testinator': 4}) + + if not dq.compression: + assert len(dq) == 9 assert dq.getz() == ('two three', {'testinator': 5}) assert len(dq) == 0 @@ -37,18 +50,23 @@ def test_disk_queue(dq): assert dq.getz(8) == ('one two', {}) assert dq.getz(8) == ('three', {}) +def test_disk_queue(dq): + _test_disk_queue(dq) + +def test_disk_queue_with_compression(dqc): + _test_disk_queue(dqc) + def _test_pop(samp,q): for i in samp: q.put(i) for i in samp: - assert q.peek() == (str.encode(i), {}) + assert q.peek() == (i, {}) q.pop() def test_dq_pop(samp,dq): _test_pop(samp,dq) -def test_disk_queue_put_estimator(): - dq = DiskQueue(TEST_DQ_DIR, fresh=True) +def test_disk_queue_put_estimator(dq): for item in ['hi-there-{}'.format(x) for x in range(20)]: pre = dq.cn, dq.sz dq.put(item)