Skip to content
This repository was archived by the owner on Nov 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 1 addition & 38 deletions moztelemetry/heka/message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _parse_heka_record(record):
def _add_field(container, keys, value):
if len(keys) == 1:
blob = value[0] if len(value) else ""
container[keys[0]] = _lazyjson(blob)
container[keys[0]] = _parse_json(blob)
return

key = keys.pop(0)
Expand All @@ -99,43 +99,6 @@ def _parse_json(string):
return result


def _lazyjson(content):
if not isinstance(content, basestring):
raise ValueError("Argument must be a string.")

if content.startswith("{"):
default = {}
elif content.startswith("["):
default = []
else:
try:
return float(content) if '.' in content or 'e' in content.lower() else int(content)
except:
return content

class WrapperType(type(default)):
pass

def wrap(method_name):
def _wrap(*args, **kwargs):
if not hasattr(WrapperType, '__cache__'):
setattr(WrapperType, '__cache__', _parse_json(content))

cached = WrapperType.__cache__
method = getattr(cached, method_name)
return method(*args[1:], **kwargs)

return _wrap

wrapper = WrapperType(default)
for k, v in type(default).__dict__.iteritems():
if k == "__doc__":
continue
else:
setattr(WrapperType, k, wrap(k))
return wrapper


_record_separator = 0x1e


Expand Down
51 changes: 34 additions & 17 deletions tests/heka/test_message_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
import ujson
from google.protobuf.message import DecodeError
from mock import MagicMock
from moztelemetry.heka import message_parser
from moztelemetry.util.streaming_gzip import streaming_gzip_wrapper

Expand Down Expand Up @@ -97,22 +96,40 @@ def test_json_fallback():
assert TOO_BIG == message_parser._parse_json(str(TOO_BIG))


def test_lazy_parsing(data_dir, monkeypatch):
mock_parse_json = MagicMock(name='_parse_json',
wraps=message_parser._parse_json)
monkeypatch.setattr(message_parser, '_parse_json', mock_parse_json)
def test_json_keys():
class Message():
pass

# this heka message has 20 json fields, only one of which should be parsed
# on first load
filename = "{}/test_telemetry_gzip.heka".format(data_dir)
with open(filename, "rb") as o:
heka_message = message_parser.parse_heka_message(streaming_gzip_wrapper(o)).next()
class Field():
pass

class Record():
def __init__(self):
self.message = Message()

record = Record()
record.message.timestamp = 1
record.message.type = "t"
record.message.hostname = "h"
record.message.payload = '{"a": 1}'

f1 = Field()
f1.name = "f1.test"
f1.value_string = ['{"b": "bee"}']
f1.value_type = 0

record.message.fields = [f1]

parsed = message_parser._parse_heka_record(record)

expected = {"a": 1, "f1": {"test": {"b": "bee"}}}
expected["meta"] = {
"Timestamp": 1,
"Type": "t",
"Hostname": "h",
}

# should only have parsed json *once* to get the payload field (other
# json/dictionary fields of the message should be parsed lazily)
assert mock_parse_json.call_count == 1
serialized = json.dumps(parsed)
e_serialized = json.dumps(expected)

# deep copying the heka message should cause the lazily evaluated fields
# to be evaluated
copy.deepcopy(heka_message)
assert mock_parse_json.call_count == 20 # 19 lazily instantiated fields + original call
assert serialized == e_serialized