diff --git a/moztelemetry/heka/message_parser.py b/moztelemetry/heka/message_parser.py index df7ec93..af23a59 100644 --- a/moztelemetry/heka/message_parser.py +++ b/moztelemetry/heka/message_parser.py @@ -82,7 +82,7 @@ def _parse_heka_record(record): def _add_field(container, keys, value): if len(keys) == 1: blob = value[0] if len(value) else "" - container[keys[0]] = _lazyjson(blob) + container[keys[0]] = _parse_json(blob) return key = keys.pop(0) @@ -99,43 +99,6 @@ def _parse_json(string): return result -def _lazyjson(content): - if not isinstance(content, basestring): - raise ValueError("Argument must be a string.") - - if content.startswith("{"): - default = {} - elif content.startswith("["): - default = [] - else: - try: - return float(content) if '.' in content or 'e' in content.lower() else int(content) - except: - return content - - class WrapperType(type(default)): - pass - - def wrap(method_name): - def _wrap(*args, **kwargs): - if not hasattr(WrapperType, '__cache__'): - setattr(WrapperType, '__cache__', _parse_json(content)) - - cached = WrapperType.__cache__ - method = getattr(cached, method_name) - return method(*args[1:], **kwargs) - - return _wrap - - wrapper = WrapperType(default) - for k, v in type(default).__dict__.iteritems(): - if k == "__doc__": - continue - else: - setattr(WrapperType, k, wrap(k)) - return wrapper - - _record_separator = 0x1e diff --git a/tests/heka/test_message_parser.py b/tests/heka/test_message_parser.py index adf6113..1ce3c5a 100644 --- a/tests/heka/test_message_parser.py +++ b/tests/heka/test_message_parser.py @@ -6,7 +6,6 @@ import pytest import ujson from google.protobuf.message import DecodeError -from mock import MagicMock from moztelemetry.heka import message_parser from moztelemetry.util.streaming_gzip import streaming_gzip_wrapper @@ -97,22 +96,40 @@ def test_json_fallback(): assert TOO_BIG == message_parser._parse_json(str(TOO_BIG)) -def test_lazy_parsing(data_dir, monkeypatch): - mock_parse_json = MagicMock(name='_parse_json', - wraps=message_parser._parse_json) - monkeypatch.setattr(message_parser, '_parse_json', mock_parse_json) +def test_json_keys(): + class Message(): + pass - # this heka message has 20 json fields, only one of which should be parsed - # on first load - filename = "{}/test_telemetry_gzip.heka".format(data_dir) - with open(filename, "rb") as o: - heka_message = message_parser.parse_heka_message(streaming_gzip_wrapper(o)).next() + class Field(): + pass + + class Record(): + def __init__(self): + self.message = Message() + + record = Record() + record.message.timestamp = 1 + record.message.type = "t" + record.message.hostname = "h" + record.message.payload = '{"a": 1}' + + f1 = Field() + f1.name = "f1.test" + f1.value_string = ['{"b": "bee"}'] + f1.value_type = 0 + + record.message.fields = [f1] + + parsed = message_parser._parse_heka_record(record) + + expected = {"a": 1, "f1": {"test": {"b": "bee"}}} + expected["meta"] = { + "Timestamp": 1, + "Type": "t", + "Hostname": "h", + } - # should only have parsed json *once* to get the payload field (other - # json/dictionary fields of the message should be parsed lazily) - assert mock_parse_json.call_count == 1 + serialized = json.dumps(parsed) + e_serialized = json.dumps(expected) - # deep copying the heka message should cause the lazily evaluated fields - # to be evaluated - copy.deepcopy(heka_message) - assert mock_parse_json.call_count == 20 # 19 lazily instantiated fields + original call + assert serialized == e_serialized