Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: rhettg/Ziggy
base: 1aef29a648
...
head fork: rhettg/Ziggy
compare: 84f37880f3
Checking mergeability… Don't worry, you can still create the pull request.
  • 12 commits
  • 12 files changed
  • 0 commit comments
  • 1 contributor
View
5 CHANGES
@@ -0,0 +1,5 @@
+ziggy (0.1.0)
+
+ * Initial Release
+
+-- Rhett Garber <rhettg@gmail.com> Thu, 22 Aug 2012 15:53:00 -0700
View
3  bin/ziggyctl
@@ -54,6 +54,9 @@ def main():
print "Lag: %f secs" % resp['lag']
print
+ for name, name_data in resp['hosts'].iteritems():
+ print " %32s Last %s (%d secs ago)" % (name, datetime.datetime.fromtimestamp(name_data['last']), time.time() - name_data['last'])
+ print
for name, name_data in resp['events'].iteritems():
print " %32s Last %s (%d secs ago)" % (name, datetime.datetime.fromtimestamp(name_data['last']), time.time() - name_data['last'])
else:
View
17 bin/ziggyd
@@ -85,10 +85,14 @@ class LogFileStream(object):
def log_file_path(self):
day_str = datetime.datetime.now().strftime('%Y%m%d')
- return os.path.join(self.log_path + '/', "%s-%s.log" % (self.name, day_str))
+ return os.path.normpath(os.path.join(self.log_path + '/', "%s/%s-%s.log" % (day_str, self.name, day_str)))
def open(self):
self.stream_filename = self.log_file_path()
+ try:
+ os.makedirs(os.path.dirname(self.stream_filename))
+ except OSError:
+ pass
log.info("Opening file stream for %s: %s", self.name, self.stream_filename)
self.stream = io.open(self.stream_filename, "ab")
@@ -109,7 +113,8 @@ class LogFileStream(object):
def collect_stats(stats, event):
type_name = event['type']
- send_time = event['end_time']
+ send_time = event['end']
+ host = event['host']
stats['last'] = time.time()
stats['lag'] = time.time() - send_time
@@ -118,6 +123,10 @@ def collect_stats(stats, event):
stats['events'].setdefault(type_name, {})
stats['events'][type_name]['last'] = send_time
+ stats.setdefault('hosts', {})
+ stats['hosts'].setdefault(host, {})
+ stats['hosts'][host]['last'] = send_time
+
def build_stats(stats):
return stats
@@ -172,7 +181,7 @@ def main():
forward_sock.connect("tcp://%s" % options.forward)
forward_sock.hwm = 1000
- stats = {}
+ stats = {'last': None, 'lag': None, 'events': {}, 'hosts': {}}
log_files = {}
continue_running = True
log.info("Starting IO Loop")
@@ -215,7 +224,7 @@ def main():
log_files[type_name] = LogFileStream(options.log_path, type_name)
log.debug("writing to %s", type_name)
- log_files[type_name].write(event['start_time'], event_data)
+ log_files[type_name].write(event['start'], event_data)
elif control_sock in ready:
control_data = control_sock.recv()
request = bson.loads(control_data)
View
69 bin/ziggyview
@@ -9,6 +9,8 @@ import pprint
import bson
import zmq
+import ziggy.client
+
log = logging.getLogger('ziggyview')
def setup_logging(options):
@@ -22,50 +24,6 @@ def setup_logging(options):
log_format = "%(asctime)s %(levelname)s:%(name)s: %(message)s"
logging.basicConfig(level=level, format=log_format, stream=sys.stdout)
-
-def decode_stream(stream):
- while True:
- # BSON is encoded with a 32-bit int size in bytes of the structure as
- # the first two bytes. We're going to peek at that size (using the
- # magic of buffered io), then load them up one structure at a time
- size_data = stream.peek(2)[:2]
- if len(size_data) < 2:
- break
-
- size = struct.unpack("H", size_data)[0]
- data = stream.read(size)
- yield bson.loads(data)
-
-def subscribe_stream(context, host, port, type_name):
- sock = context.socket(zmq.SUB)
-
- prefix = False
- if type_name:
- if type_name.endswith('*'):
- prefix = True
- subscription = type_name[:-1]
- else:
- subscription = type_name
- else:
- subscription = ""
-
- sock.setsockopt(zmq.SUBSCRIBE, subscription)
- log.info("Connecting to %s:%d" % (host, port))
- sock.connect("tcp://%s:%d" % (host, port))
-
- while True:
- parts = sock.recv_multipart()
- if len(parts) == 2:
- channel, data = parts
- # If the client only want exact matches, we'll skip this guy.
- if not prefix and subscription and channel != subscription:
- continue
-
- yield bson.loads(data)
- else:
- log.info("Done receiving")
- break
-
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', dest='verbose', action='append_const', const=True, default=list())
@@ -74,6 +32,7 @@ def main():
parser.add_argument('--host', '-H', dest='host', action='store', default="127.0.0.1:3513")
parser.add_argument('--log-path', '-l', dest='log_path', action='store', default=None)
parser.add_argument('--pretty', '-p', dest='pretty', action='store_true', default=False)
+ parser.add_argument('--group', dest='group', action='store_true', default=False)
options = parser.parse_args()
@@ -81,22 +40,7 @@ def main():
if sys.stdin.isatty():
log.info("Loading stream from ziggyd")
- context = zmq.Context()
- poller = zmq.Poller()
- sock = context.socket(zmq.REQ)
- sock.connect("tcp://%s" % options.host)
- poller.register(sock, zmq.POLLIN)
-
- sock.send(bson.dumps({'cmd': 'SOCK_STREAM'}))
-
- result = dict(poller.poll(5000))
- if sock in result:
- result = bson.loads(sock.recv())
- host, _ = options.host.split(':')
- out_stream = subscribe_stream(context, host, result['port'], options.type_name)
- else:
- print >>sys.stderr, "Error connecting to server"
- sys.exit(1)
+ out_stream = ziggy.client.subscribe_stream(options.host, options.type_name)
else:
if options.type_name is not None:
parser.error("Can't specify a name from stdin")
@@ -104,7 +48,10 @@ def main():
log.info("Loading stream from stdin")
stdin = io.open(sys.stdin.fileno(), mode='rb', closefd=False)
- out_stream = decode_stream(stdin)
+ out_stream = ziggy.client.decode_stream(stdin)
+
+ if options.group:
+ out_stream = ziggy.client.Grouper(out_stream)
for line in out_stream:
if options.pretty:
View
2  setup.py
@@ -21,7 +21,7 @@
setup(
name='ziggy',
#version=ziggy.__version__,
- version='0.0.1',
+ version='0.1.0',
description='Ziggy Python Application Logging',
long_description=open('README.md').read(),
author='Rhett Garber',
View
45 tests/client_test.py
@@ -0,0 +1,45 @@
+import pprint
+from testify import *
+
+import ziggy
+from ziggy import client
+
+class SimpleGrouperTest(TestCase):
+ @setup
+ def build_stream(self):
+ events = [{'id': 1, 'type': 'foo.bar', 'body': 'Hello World'},
+ {'id': 1, 'type': 'foo', 'body': 'All Done'}]
+
+ self.stream = (event for event in events)
+
+ @setup
+ def build_grouper(self):
+ self.grouper = client.Grouper(self.stream)
+
+ def test(self):
+ output = list(self.grouper)
+ assert_equal(len(output), 1)
+ assert_equal(len(output[0]), 2)
+
+ assert_equal(self.grouper.size, 0)
+
+class MaxGrouperTest(TestCase):
+ @setup
+ def build_stream(self):
+ events = [
+ {'id': 1, 'type': 'foo.bar', 'body': 'Hello World'},
+ {'id': 2, 'type': 'foo.bar', 'body': 'Hello World 2'},
+ {'id': 1, 'type': 'foo', 'body': 'All Done'}]
+
+ self.stream = (event for event in events)
+
+ @setup
+ def build_grouper(self):
+ self.grouper = client.Grouper(self.stream, max_size=1)
+
+ def test(self):
+ output = list(self.grouper)
+ assert_equal(len(output), 1)
+ assert_equal(len(output[0]), 1)
+
+
View
15 tests/tornado_app.py
@@ -31,6 +31,20 @@ def get(self):
self.write("Hello, world")
self.finish()
+
+class AsyncCrashHandler(ziggy.tornado_utils.SampleRequestHandler):
+ @tornado.web.asynchronous
+ @tornado.gen.engine
+ def get(self):
+ loop = tornado.ioloop.IOLoop.instance()
+
+ req_id = self.ziggy.id
+
+ called = yield tornado.gen.Task(loop.add_timeout, time.time() + random.randint(1, 5))
+
+ raise Exception("This Handler is Broken!")
+
+
class ManualAsyncHandler(ziggy.tornado_utils.SampleRequestHandler):
@tornado.web.asynchronous
def get(self):
@@ -52,6 +66,7 @@ def _complete_get(self):
application = tornado.web.Application([
(r"/", MainHandler),
(r"/async", AsyncHandler),
+ (r"/async_crash", AsyncCrashHandler),
(r"/async2", ManualAsyncHandler),
])
View
5 ziggy/__init__.py
@@ -10,7 +10,7 @@
"""
__title__ = 'ziggy'
-__version__ = '0.0.1'
+__version__ = '0.1.0'
__build__ = 0
__author__ = 'Rhett Garber'
__license__ = 'ISC'
@@ -23,6 +23,7 @@
from .context import Context, set, append, add
from . import context as _context_mod
from .errors import Error
+from .logger import LogHandler
from .timer import timeit
log = logging.getLogger(__name__)
@@ -44,6 +45,6 @@ def configure(host, port, recorder=None):
network.init(host, port)
context._recorder_function = network.send
else:
- log.warning("Empty ziggy configuration")
+ log.info("Empty ziggy configuration")
context._recorder_function = None
View
147 ziggy/client.py
@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+
+"""
+ziggy.client
+~~~~~~~~
+
+This module provides utilities for writing client applications which connect or use ziggy data.
+
+:copyright: (c) 2012 by Rhett Garber
+:license: ISC, see LICENSE for more details.
+
+"""
+import collections
+import logging
+import struct
+
+import bson
+import zmq
+
+log = logging.getLogger(__name__)
+
+def decode_stream(stream):
+ """A generator which reads data out of the buffered file stream, unpacks and decodes the ziggy events
+
+ This is useful for parsing on disk log files generated by ziggyd
+ """
+ while True:
+ # BSON is encoded with a 32-bit int size in bytes of the structure as
+ # the first two bytes. We're going to peek at that size (using the
+ # magic of buffered io), then load them up one structure at a time
+ size_data = stream.peek(2)[:2]
+ if len(size_data) < 2:
+ break
+
+ size = struct.unpack("H", size_data)[0]
+ data = stream.read(size)
+ yield bson.loads(data)
+
+def retrieve_stream_host(context, control_host):
+ poller = zmq.Poller()
+ sock = context.socket(zmq.REQ)
+ sock.connect("tcp://%s" % control_host)
+ poller.register(sock, zmq.POLLIN)
+
+ sock.send(bson.dumps({'cmd': 'SOCK_STREAM'}))
+
+ result = dict(poller.poll(5000))
+ if sock in result:
+ result = bson.loads(sock.recv())
+ host, _ = control_host.split(':')
+ return "%s:%d" % (host, result['port'])
+ else:
+ log.warning("Failed to connect to server")
+ return None
+
+def subscribe_stream(control_host, subscribe):
+ context = zmq.Context()
+
+ while True:
+ stream_host = retrieve_stream_host(context, control_host)
+ if stream_host is None:
+ return
+
+ sock = context.socket(zmq.SUB)
+
+ prefix = False
+ if subscribe:
+ if subscribe.endswith('*'):
+ prefix = True
+ subscription = subscribe[:-1]
+ else:
+ subscription = subscribe
+ else:
+ subscription = ""
+
+ sock.setsockopt(zmq.SUBSCRIBE, subscription)
+ log.info("Connecting to %s" % (stream_host,))
+ sock.connect("tcp://%s" % (stream_host,))
+
+ # Now that we are connected, loop almost forever emiting events.
+ # If we fail to receive any events within the specified timeout, we'll quit
+ # and verify that we are connected to a valid stream.
+ poller = zmq.Poller()
+ poller.register(sock, zmq.POLLIN)
+ while True:
+ result = dict(poller.poll(5000))
+ if sock not in result:
+ break
+
+ parts = sock.recv_multipart()
+ if len(parts) == 2:
+ channel, data = parts
+ # If the client only want exact matches, we'll skip this guy.
+ if not prefix and subscription and channel != subscription:
+ continue
+
+ yield bson.loads(data)
+ else:
+ break
+
+
+class Grouper(object):
+ """Utility for grouping events and sub-events together.
+
+ Events fed into a Grouper are joined by their common 'id'. Encountering the
+ parent event type will trigger emitting a list of all events and sub events
+ for that single id.
+
+ This assumes that the parent event will be the last encountered.
+
+ So for example, you might do something like:
+
+ stream = ziggy.client.decode_stream(stdin)
+ for event_group in client.Grouper(stream):
+ ... do some processing of the event group ...
+
+ """
+ def __init__(self, stream, max_size=1000):
+ self.max_size = max_size
+ self.stream = stream
+ self.dict = collections.OrderedDict()
+
+ @property
+ def size(self):
+ return len(self.dict)
+
+ def __iter__(self):
+ for event in self.stream:
+
+ while self.size > self.max_size:
+ self.dict.popitem(last=False)
+
+ try:
+ self.dict[event['id']].append(event)
+ except KeyError:
+ self.dict[event['id']] = [event]
+
+ if '.' not in event['type']:
+ yield self.dict.pop(event['id'])
+
+ raise StopIteration
+
+
+
+
+
+
View
6 ziggy/context.py
@@ -111,8 +111,10 @@ def add(self, key, value):
def to_dict(self):
return {'id': self.id,
'type': self.name,
- 'start_time': self.start_time,
- 'end_time': time.time(),
+ 'host': os.uname()[1],
+ 'pid': os.getpid(),
+ 'start': self.start_time,
+ 'end': time.time(),
'body': self.data
}
View
36 ziggy/logger.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+
+"""
+ziggy.logger
+~~~~~~~~
+
+This module provides integration with ziggy and standard python logging module.
+:copyright: (c) 2012 by Rhett Garber
+:license: ISC, see LICENSE for more details.
+
+"""
+import logging
+import traceback
+
+from .context import Context
+
+class LogHandler(logging.Handler):
+ """Handler to provide log events as ziggy events.
+
+ Records standard fields such as logger name, level the message and if an
+ exception was provided, the string formatted exception.
+
+ The type name, if not specified will be something like '<my parent context>.log'
+ """
+ def __init__(self, name=None):
+ super(LogHandler, self).__init__()
+
+ self.name = name
+
+ def emit(self, record):
+ with Context(self.name or '.log') as c:
+ c.set('name', record.name)
+ c.set('level', record.levelname)
+ c.set('msg', record.getMessage())
+ if record.exc_info:
+ c.set('exception', ''.join(traceback.format_exception(*record.exc_info)))
View
18 ziggy/tornado_utils.py
@@ -52,8 +52,8 @@ def decorate(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
self.ziggy = ziggy.Context(type_name)
+ self.ziggy.start()
try:
- self.ziggy.start()
return func(self, *args, **kwargs)
finally:
# We're done executing in this context for the time being. Either we've already
@@ -65,6 +65,17 @@ def wrapper(self, *args, **kwargs):
return decorate
+def wrap_exception(func):
+ @functools.wraps(func)
+ def wrapper(self, *args, **kwargs):
+ self.ziggy.start()
+ try:
+ return func(self, *args, **kwargs)
+ finally:
+ self.ziggy.stop()
+
+ return wrapper
+
def wrap_finish(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
@@ -106,13 +117,14 @@ def write(self, chunk):
ziggy.add('response_size', len(chunk))
return super(SampleRequestHandler, self).write(chunk)
- def finish(self):
- res = super(SampleRequestHandler, self).finish()
+ def finish(self, *args, **kwargs):
+ res = super(SampleRequestHandler, self).finish(*args, **kwargs)
ziggy.set('response_status_code', self._status_code)
return res
_execute = wrap_execute('request')(tornado.web.RequestHandler._execute)
finish = wrap_finish(finish)
+ _stack_context_handle_exception = wrap_exception(tornado.web.RequestHandler._stack_context_handle_exception)
# We need a custom version of this decorator so that we can pass in our ziggy

No commit comments for this range

Something went wrong with that request. Please try again.