Skip to content

Commit

Permalink
make some minor repairs to the hec.dq
Browse files Browse the repository at this point in the history
  • Loading branch information
jettero committed Jan 5, 2020
1 parent 8cd07f4 commit 6400e61
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 34 deletions.
35 changes: 11 additions & 24 deletions hubblestack/hec/dq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import shutil
import json
from collections import deque
from hubblestack.utils.misc import numbered_file_split_key
from hubblestack.utils.encoding import encode_something_to_bytes, decode_something_to_string

__all__ = [
'QueueTypeError', 'QueueCapacityError', 'MemQueue', 'DiskQueue',
Expand Down Expand Up @@ -37,6 +39,7 @@ def check_type(self, item):
if not isinstance(item, self.ok_types):
raise QueueTypeError('type({0}) is not ({1})'.format(type(item), self.ok_types))


class NoQueue(object):
cn = 0
def put(self, *a, **kw):
Expand All @@ -49,22 +52,6 @@ def __bool__(self):
return False
__nonzero__ = __bool__ # stupid python2

def numbered_file_split_key(x):
""" for sorting purposes, split filenames like '238048.11', '238048.17',
'238048.0' into lists of integers. E.g.:
for fname in sorted(filenames, key=numbered_file_split_key):
do_things_ordered_by_integer_sort()
"""
try:
return [int(i) for i in x.split('.')]
except:
pass
try:
return [int(x)]
except:
pass
return list()

class DiskQueue(OKTypesMixin):
sep = b' '
Expand All @@ -85,6 +72,7 @@ def __bool__(self):
__nonzero__ = __bool__ # stupid python2

def compress(self, dat):
dat = encode_something_to_bytes(dat)
if not self.compression:
return dat
def _bz2(x):
Expand All @@ -100,7 +88,8 @@ def unlink_(self, fname):
os.unlink(name)

def decompress(self, dat):
if str(dat).startswith('BZ'):
dat = encode_something_to_bytes(dat)
if dat.startswith(b'BZ'):
try:
return bz2.BZ2Decompressor().decompress(dat)
except IOError:
Expand Down Expand Up @@ -147,8 +136,6 @@ def put(self, item, **meta):
f = os.path.join(d, remainder)
with open(f, 'wb') as fh:
log.debug('writing item to disk cache')
if isinstance(bstr, str):
bstr = str.encode(bstr)
fh.write(bstr)
if meta:
with open(f + '.meta', 'w') as fh:
Expand Down Expand Up @@ -176,23 +163,23 @@ def peek(self):
"""
for fname in self.files:
with open(fname, 'rb') as fh:
return self.decompress(fh.read()), self.read_meta(fname)
return decode_something_to_string(self.decompress(fh.read())), self.read_meta(fname)

def get(self):
""" get the next item from the queue
returns: data_octets, meta_data_dict
"""
for fname in self.files:
with open(fname, 'rb') as fh:
ret = self.decompress(fh.read())
ret = ret, self.read_meta(fname)
dat = self.decompress(fh.read())
mdat = self.read_meta(fname)
sz = os.stat(fname).st_size
self.unlink_(fname)
self.cn -= 1
self.sz -= sz
if self.double_check_cnsz:
self._count(double_check_only=True, tag='get')
return ret
return decode_something_to_string(dat), mdat

def getz(self, sz=SPLUNK_MAX_MSG):
""" fetch items from the queue and concatenate them together using the
Expand Down Expand Up @@ -235,7 +222,7 @@ def getz(self, sz=SPLUNK_MAX_MSG):
#
# occasionally this will return something pessimistic
meta_data[k] = max(meta_data[k])
return ret, meta_data
return decode_something_to_string(ret), meta_data

def pop(self):
""" remove the next item from the queue (do not return it); useful with .peek() """
Expand Down
12 changes: 12 additions & 0 deletions hubblestack/utils/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ def encode_base64(starting_string, format_chained=True, chained=None, chained_st
ret = base64.b64encode(starting_string)

return bool(ret), ret

def encode_something_to_bytes(x):
""" take strings or bytes or whatever and convert to bytes """
if isinstance(x, (bytes,bytearray)):
return x
return x.encode('utf-8')

def decode_something_to_string(x):
""" take strings or bytes or whatever and convert to string """
if isinstance(x, (bytes,bytearray)):
return x.decode('utf-8')
return x
18 changes: 18 additions & 0 deletions hubblestack/utils/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# coding: utf-8

def numbered_file_split_key(x):
""" for sorting purposes, split filenames like '238048.11', '238048.17',
'238048.0' into lists of integers. E.g.:
for fname in sorted(filenames, key=numbered_file_split_key):
do_things_ordered_by_integer_sort()
"""
try:
return [int(i) for i in x.split('.')]
except:
pass
try:
return [int(x)]
except:
pass
return list()
38 changes: 28 additions & 10 deletions tests/unittests/test_hec_dq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,33 @@ def samp():

@pytest.fixture
def dq():
return DiskQueue(TEST_DQ_DIR, size=100, fresh=True)
return DiskQueue(TEST_DQ_DIR, fresh=True)

def test_disk_queue(dq):
@pytest.fixture
def dqc():
return DiskQueue(TEST_DQ_DIR + ".bz2", fresh=True, compression=9)

def _test_disk_queue(dq):
borked = False

dq.put('one', testinator=3)
dq.put('two', testinator=4)
dq.put('three', testinator=5)

assert len(dq) == 13
assert dq.peek() == (b'one', {'testinator': 3})
assert dq.get() == (b'one', {'testinator': 3})
assert dq.peek() == (b'two', {'testinator': 4})
assert len(dq) == 9
if not dq.compression:
# NOTE: with the huffman headers (or whatever), the size of the dq is
# probably quite a lot larger than the expected 13. the test is
# essentially meaningless unless the uncompressed message was large
# enough… then we could test to see if the dq len was smaller than
# expected or something… let's just skip this for the compressed dq
assert len(dq) == 13

assert dq.peek() == ('one', {'testinator': 3})
assert dq.get() == ('one', {'testinator': 3})
assert dq.peek() == ('two', {'testinator': 4})

if not dq.compression:
assert len(dq) == 9

assert dq.getz() == ('two three', {'testinator': 5})
assert len(dq) == 0
Expand All @@ -37,18 +50,23 @@ def test_disk_queue(dq):
assert dq.getz(8) == ('one two', {})
assert dq.getz(8) == ('three', {})

def test_disk_queue(dq):
_test_disk_queue(dq)

def test_disk_queue_with_compression(dqc):
_test_disk_queue(dqc)

def _test_pop(samp,q):
for i in samp:
q.put(i)
for i in samp:
assert q.peek() == (str.encode(i), {})
assert q.peek() == (i, {})
q.pop()

def test_dq_pop(samp,dq):
_test_pop(samp,dq)

def test_disk_queue_put_estimator():
dq = DiskQueue(TEST_DQ_DIR, fresh=True)
def test_disk_queue_put_estimator(dq):
for item in ['hi-there-{}'.format(x) for x in range(20)]:
pre = dq.cn, dq.sz
dq.put(item)
Expand Down

0 comments on commit 6400e61

Please sign in to comment.