Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

parse messages from logs #241

Merged
merged 8 commits into from

3 participants

@smn
Owner

original version only did SMPP output, this one parses Vumi message objects written to stdout

@smn smn was assigned
@hodgestar
Owner

This needs to have develop merged in because #65 changed how options are processed a bit (and renamed a few things). The inject_message script also needs to be fixed.

@hodgestar
Owner

I updated the LogParser docstring but I don't know know where the 'smpp' and 'vumi' formats are expected to be encountered. Presumably 'smpp' is output by the SMPP transport? What outputs the 'vumi' format?

Other than this documentation enhancement the branch looks good to land.

@hodgestar
Owner

The string 'Inbound:' used in the 'vumi' format regex doesn't occur anywhere in the vumi code base except in parse_log_messages itself.

@smn
Owner

Hm, you're right, its in the output of one of @jerith's campaigns dispatchers.

@hodgestar
Owner

Should we perhaps not standardize on a single Inbound message logging format and then renamed the 'smpp' options to 'smpp-0.3' or something? Also, the SMPP logging line should probably be changed to '%r' % message instead of '%s'.

@hodgestar hodgestar merged commit a55930c into develop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
6 vumi/scripts/inject_messages.py
@@ -18,6 +18,7 @@ class InjectorOptions(VumiOptions):
]
def postOptions(self):
+ VumiOptions.postOptions(self)
if not self['transport-name']:
raise usage.UsageError("Please provide the "
"transport-name parameter.")
@@ -61,12 +62,9 @@ def process_line(self, line):
@inlineCallbacks
def main(options):
- vumi_options = {}
verbose = options['verbose']
- for opt in [i[0] for i in Options.optParameters]:
- vumi_options[opt] = options.pop(opt)
- worker_creator = WorkerCreator(vumi_options)
+ worker_creator = WorkerCreator(options.vumi_options)
worker_creator.create_worker_by_class(
MessageInjector, options)
View
123 vumi/scripts/parse_log_messages.py
@@ -0,0 +1,123 @@
+# -*- test-case-name: vumi.scripts.tests.test_parse_log_messages -*-
+import sys
+import re
+from twisted.python import usage
+from twisted.internet import reactor
+from twisted.internet.defer import maybeDeferred
+import datetime
+from vumi.message import to_json
+
+
+DATE_PATTERN = re.compile(
+ r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2}) '
+ r'(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})')
+LOG_PATTERN = {
+ 'vumi': re.compile(
+ r'(?P<date>[\d\-\:\s]+)\+0000 .* '
+ r'Inbound: <Message payload="(?P<message>.*)">'),
+ 'smpp': re.compile(
+ r'(?P<date>[\d\-\:\s]+)\+0000 .* '
+ r'PUBLISHING INBOUND: (?P<message>.*)'),
+ }
+
+
+class Options(usage.Options):
+ optParameters = [
+ ["from", "f", None,
+ "Ignore any log lines prior to timestamp [YYYY-MM-DD HH:MM:SS]"],
+ ["until", "u", None,
+ "Ignore any log lines after timestamp [YYYY-MM-DD HH:MM:SS]"],
+ ["format", None, "vumi",
+ "Message format, one of: [vumi, smpp] (default vumi)"],
+ ]
+
+ longdesc = """Parses inbound messages logged by a Vumi worker from stdin
+ and outputs them as JSON encoded Vumi messages to stdout. Useful
+ along with the `inject_messages.py` script to replay failed inbound
+ messages. The two formats supported currently are 'vumi' (which is a
+ simple custom format used by some third-party workers) and 'smpp' (which is
+ used for logging inbound messages by the SMPP transport).
+ """
+
+
+def parse_date(string, pattern):
+ match = pattern.match(string)
+ if match:
+ return dict((k, int(v)) for k, v in match.groupdict().items())
+ return {}
+
+
+class LogParser(object):
+ """
+ Parses Vumi TransportUserMessages from a log file and writes
+ simple JSON serialized Vumi TransportUserMessages to stdout.
+
+ Regular expression may be passed in to specify log and date
+ format.
+
+ Two common output formats are the one used by SMPP for logging:
+
+ `YYYY-MM-DD HH:MM:SS+0000 <bits of text> PUBLISHING INBOUND: <json
+ message>`
+
+ and the one used by some Vumi campaign workers:
+
+ `YYYY-MM-DD HH:MM:SS+0000 <bits of text> Inbound: <Message
+ payload="<json message>">`
+ """
+
+ def __init__(self, options, date_pattern=None, log_pattern=None):
+ self.date_pattern = date_pattern or DATE_PATTERN
+ self.log_pattern = log_pattern or LOG_PATTERN.get(options['format'])
+ self.start = options['from']
+ if self.start:
+ self.start = datetime.datetime(**parse_date(self.start,
+ self.date_pattern))
+ self.stop = options['until']
+ if self.stop:
+ self.stop = datetime.datetime(**parse_date(self.stop,
+ self.date_pattern))
+ self.parse()
+
+ def parse(self):
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ break
+ self.readline(line)
+
+ def emit(self, obj):
+ sys.stdout.write('%s\n' % (obj,))
+
+ def readline(self, line):
+ match = self.log_pattern.match(line)
+ if match:
+ data = match.groupdict()
+ date = datetime.datetime(**parse_date(data['date'],
+ self.date_pattern))
+ if self.start and self.start > date:
+ return
+ if self.stop and date > self.stop:
+ return
+ self.emit(to_json(eval(data['message'])))
+
+
+if __name__ == '__main__':
+ try:
+ options = Options()
+ options.parseOptions()
+ except usage.UsageError, errortext:
+ print '%s: %s' % (sys.argv[0], errortext)
+ print '%s: Try --help for usage details.' % (sys.argv[0])
+ sys.exit(1)
+
+ def _eb(f):
+ f.printTraceback()
+
+ def _main():
+ maybeDeferred(LogParser, options
+ ).addErrback(_eb
+ ).addCallback(lambda _: reactor.stop())
+
+ reactor.callLater(0, _main)
+ reactor.run()
View
92 vumi/scripts/parse_smpp_log_messages.py
@@ -1,92 +0,0 @@
-# -*- test-case-name: vumi.scripts.tests.test_parse_smpp_log_messages -*-
-import sys
-import re
-import json
-from twisted.python import usage
-from twisted.internet import reactor
-from twisted.internet.defer import maybeDeferred
-from datetime import datetime
-
-
-class Options(usage.Options):
- optParameters = [
- ["from", "f", None,
- "Parse log lines starting from timestamp [YYYY-MM-DD HH:MM:SS]"],
- ["until", "u", None,
- "Parse log lines starting from timestamp [YYYY-MM-DD HH:MM:SS]"],
- ]
-
-DATE_PATTERN = re.compile(r'(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2}) ' +
- r'(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})')
-LOG_PATTERN = re.compile(r'(?P<date>[\d\-\:\s]+)\+0000 .* ' +
- 'PUBLISHING INBOUND: (?P<message>.*)')
-
-
-def parse_date(string):
- match = DATE_PATTERN.match(string)
- if match:
- return dict((k, int(v)) for k, v in match.groupdict().items())
- return {}
-
-
-class LogParser(object):
- """
- Parses lines from the SMPP log in the following format:
-
- YYYY-MM-DD HH:MM:SS+0000 <bits of text> PUBLISHING INBOUND: <json message>
-
- And publishes the TransportUserMessage bits to stdout for reprocessing
- or re-injecting into queues.
-
- """
-
- def __init__(self, options):
- self.start = options['from']
- if self.start:
- self.start = datetime(**parse_date(self.start))
- self.stop = options['until']
- if self.stop:
- self.stop = datetime(**parse_date(self.stop))
- self.parse()
-
- def parse(self):
- while True:
- line = sys.stdin.readline()
- if not line:
- break
- self.readline(line)
-
- def emit(self, obj):
- sys.stdout.write('%s\n' % (obj,))
-
- def readline(self, line):
- match = LOG_PATTERN.match(line)
- if match:
- data = match.groupdict()
- date = datetime(**parse_date(data['date']))
- if self.start and self.start > date:
- return
- if self.stop and date > self.stop:
- return
- self.emit(json.dumps(eval(data['message'])))
-
-
-if __name__ == '__main__':
- try:
- options = Options()
- options.parseOptions()
- except usage.UsageError, errortext:
- print '%s: %s' % (sys.argv[0], errortext)
- print '%s: Try --help for usage details.' % (sys.argv[0])
- sys.exit(1)
-
- def _eb(f):
- f.printTraceback()
-
- def _main():
- maybeDeferred(LogParser, options
- ).addErrback(_eb
- ).addCallback(lambda _: reactor.stop())
-
- reactor.callLater(0, _main)
- reactor.run()
View
87 vumi/scripts/tests/test_parse_log_messages.py
@@ -0,0 +1,87 @@
+from twisted.trial.unittest import TestCase
+from vumi.scripts.parse_log_messages import LogParser
+from pkg_resources import resource_string
+import json
+
+
+class DummyLogParser(LogParser):
+ def __init__(self, *args, **kwargs):
+ super(DummyLogParser, self).__init__(*args, **kwargs)
+ self.emit_log = []
+
+ def parse(self):
+ pass
+
+ def emit(self, obj):
+ self.emit_log.append(obj)
+
+
+SAMPLE_LINE = (
+ "2012-04-12 10:52:23+0000 [WorkerAMQClient,client] "
+ "Inbound: <Message payload=\"{u'transport_name': u'transport_name',"
+ " 'network_operator': 'MNO', u'transport_metadata': {}, u'group':"
+ " None, u'from_addr': u'+27123456780', u'timestamp':"
+ " datetime.datetime(2012, 4, 12, 10, 52, 23, 329989),"
+ " u'to_addr': u'*120*12345*489665#', u'content': u'hello world',"
+ " u'message_version': u'20110921', u'transport_type': u'ussd',"
+ " u'helper_metadata': {}, u'in_reply_to': None, u'session_event':"
+ " u'new', u'message_id': u'b1893fa98ff4485299e3781f73ebfbb6',"
+ " u'message_type': u'user_message'}\">")
+
+
+class ParseSMPPLogMessagesTestCase(TestCase):
+
+ def test_parsing_of_line(self):
+ parser = DummyLogParser({
+ 'from': None,
+ 'until': None,
+ 'format': 'vumi',
+ })
+ parser.readline(SAMPLE_LINE)
+
+ parsed = json.loads(parser.emit_log[0])
+ expected = {
+ "content": "hell0 world",
+ "transport_type": "ussd",
+ "to_addr": "*120*12345*489665#",
+ "message_id": "b1893fa98ff4485299e3781f73ebfbb6",
+ "from_addr": "+27123456780"
+ }
+ for key in expected.keys():
+ self.assertEqual(parsed.get('key'), expected.get('key'))
+
+ def test_parsing_of_smpp_line(self):
+ parser = DummyLogParser({
+ 'from': None,
+ 'until': None,
+ 'format': 'smpp',
+ })
+ parser.readline(
+ "2011-11-15 02:04:48+0000 [EsmeTransceiver,client] "
+ "PUBLISHING INBOUND: {'content': u'AFN9WH79', 'transport_type': "
+ "'sms', 'to_addr': '1458', 'message_id': 'ec443820-62a8-4051-92e7"
+ "-66adaa487d20', 'from_addr': '23xxxxxxxx'}")
+
+ self.assertEqual(json.loads(parser.emit_log[0]), {
+ "content": "AFN9WH79",
+ "transport_type": "sms",
+ "to_addr": "1458",
+ "message_id": "ec443820-62a8-4051-92e7-66adaa487d20",
+ "from_addr": "23xxxxxxxx"
+ })
+
+ def test_parse_of_smpp_lines_with_limits(self):
+ sample = resource_string(__name__, 'sample-smpp-output.log')
+ parser = DummyLogParser({
+ 'from': '2011-11-15 00:23:59',
+ 'until': '2011-11-15 00:24:26',
+ 'format': 'smpp',
+ })
+ for line in sample.split('\n'):
+ parser.readline(line)
+
+ self.assertEqual(len(parser.emit_log), 2)
+ self.assertEqual(json.loads(parser.emit_log[0].strip())['content'],
+ "CODE2")
+ self.assertEqual(json.loads(parser.emit_log[1].strip())['content'],
+ "CODE3")
View
51 vumi/scripts/tests/test_parse_smpp_log_messages.py
@@ -1,51 +0,0 @@
-from twisted.trial.unittest import TestCase
-from vumi.scripts.parse_smpp_log_messages import LogParser
-from pkg_resources import resource_string
-import json
-
-
-class DummyLogParser(LogParser):
- def __init__(self, *args, **kwargs):
- super(DummyLogParser, self).__init__(*args, **kwargs)
- self.emit_log = []
-
- def parse(self):
- pass
-
- def emit(self, obj):
- self.emit_log.append(obj)
-
-
-class ParseSMPPLogMessagesTestCase(TestCase):
-
- def test_parsing_of_line(self):
- parser = DummyLogParser({
- 'from': None,
- 'until': None,
- })
- parser.readline("2011-11-15 02:04:48+0000 [EsmeTransceiver,client] "
- "PUBLISHING INBOUND: {'content': u'AFN9WH79', 'transport_type': "
- "'sms', 'to_addr': '1458', 'message_id': 'ec443820-62a8-4051-92e7"
- "-66adaa487d20', 'from_addr': '23xxxxxxxx'}")
- self.assertEqual(json.loads(parser.emit_log[0]), {
- "content": "AFN9WH79",
- "transport_type": "sms",
- "to_addr": "1458",
- "message_id": "ec443820-62a8-4051-92e7-66adaa487d20",
- "from_addr": "23xxxxxxxx"
- })
-
- def test_parse_of_line_with_limits(self):
- sample = resource_string(__name__, 'sample-smpp-output.log')
- parser = DummyLogParser({
- 'from': '2011-11-15 00:23:59',
- 'until': '2011-11-15 00:24:26'
- })
- for line in sample.split('\n'):
- parser.readline(line)
-
- self.assertEqual(len(parser.emit_log), 2)
- self.assertEqual(json.loads(parser.emit_log[0].strip())['content'],
- "CODE2")
- self.assertEqual(json.loads(parser.emit_log[1].strip())['content'],
- "CODE3")
Something went wrong with that request. Please try again.