Skip to content

Commit

Permalink
rip out heartbeats for something simpler that allows users to achieve…
Browse files Browse the repository at this point in the history
… the same effect
  • Loading branch information
mahmoud committed Mar 4, 2016
1 parent 523c899 commit e1f8278
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 56 deletions.
14 changes: 1 addition & 13 deletions lithoxyl/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from actors import IntervalThreadActor

DEFAULT_HEARTBEAT_MS = 200
DEFAULT_JOIN_TIMEOUT = 0.5

LITHOXYL_CONTEXT = None
Expand All @@ -32,8 +31,6 @@ class LithoxylContext(object):
def __init__(self, **kwargs):
self.loggers = []

self.heartbeat_interval = kwargs.pop('heartbeat', DEFAULT_HEARTBEAT_MS)

self.async_mode = False
self.async_actor = None
self.async_timeout = DEFAULT_JOIN_TIMEOUT
Expand All @@ -43,7 +40,7 @@ def enable_async(self, **kwargs):
update_loggers = kwargs.pop('update_loggers', True)
update_sigterm = kwargs.pop('update_sigterm', True)
update_actor = kwargs.pop('update_actor', True)
actor_kw = {'task': self._async_task,
actor_kw = {'task': self.flush,
'interval': kwargs.pop('interval', None),
'max_interval': kwargs.pop('max_interval', None),
# be very careful when not daemonizing thread
Expand Down Expand Up @@ -93,15 +90,6 @@ def disable_async(self, **kwargs):
self.flush()
self.async_mode = False

def _async_task(self):
self.heartbeat()
self.flush()

def heartbeat(self, complete_record=None, force=False):
for logger in self.loggers:
logger.on_heartbeat(complete_record=complete_record, force=force)
return

def flush(self):
for logger in self.loggers:
logger.flush()
Expand Down
47 changes: 15 additions & 32 deletions lithoxyl/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ def __init__(self, name, sinks=None, **kwargs):
self.record_queue = deque(maxlen=QUEUE_LIMIT)
self.async_mode = kwargs.pop('async', self.context.async_mode)
self.async_lock = RLock()
self.heartbeat_interval = kwargs.pop('heartbeat',
self.context.heartbeat_interval)
self.last_heartbeat = time.time() - (self.heartbeat_interval / 1000.0)
self.preflush_hooks = []
self.last_flush = time.time()

self.module = kwargs.pop('module', None)
self._module_offset = kwargs.pop('module_offset', 0)
Expand All @@ -118,6 +117,11 @@ def flush(self):
# only one flush allowed to run at a time
# ensures that records are delivered to sinks in order
with self.async_lock:
for preflush_hook in self.preflush_hooks:
try:
preflush_hook(self)
except Exception:
pass
queue = self.record_queue
while queue:
rec_type, rec = queue.popleft()
Expand All @@ -133,6 +137,9 @@ def flush(self):
elif rec_type == 'comment':
for comment_hook in self._comment_hooks:
comment_hook(rec)
else:
pass # TODO
self.last_flush = time.time()
return

@property
Expand All @@ -150,7 +157,6 @@ def set_sinks(self, sinks):
self._warn_hooks = []
self._complete_hooks = []
self._exc_hooks = []
self._hb_hooks = []
self._comment_hooks = []
for s in sinks:
self.add_sink(s)
Expand Down Expand Up @@ -178,9 +184,6 @@ def add_sink(self, sink):
exc_hook = getattr(sink, 'on_exception', None)
if callable(exc_hook):
self._exc_hooks.append(exc_hook)
hb_hook = getattr(sink, 'on_heartbeat', None)
if callable(hb_hook):
self._hb_hooks.append(hb_hook)
comment_hook = getattr(sink, 'on_comment', None)
if callable(comment_hook):
self._comment_hooks.append(comment_hook)
Expand Down Expand Up @@ -222,36 +225,16 @@ def on_exception(self, exc_record, exc_type, exc_obj, exc_tb):
exc_hook(exc_record, exc_type, exc_obj, exc_tb)
return

def on_heartbeat(self, complete_record=None, force=False):
if force:
pass
elif not self.heartbeat_interval:
return
elif self.heartbeat_interval > (time.time() - self.last_heartbeat) * 1000:
return
if not complete_record:
# heartbeat takes special construction to avoid hitting other hooks
rec_name = '%s_heartbeat' % self.name
root_rec = self.record_type(logger=self, level=CRITICAL,
name=rec_name, frame=sys._getframe(1))
root_rec.begin_record = BeginRecord(root_rec, self.last_heartbeat,
'{} previous heartbeat',
(self.name,))
root_rec.complete_record = CompleteRecord(root_rec, time.time(),
'{} current heartbeat ({duration_msecs} since previous)',
(self.name,), 'success')
complete_record = root_rec.complete_record
for hb_hook in self._hb_hooks:
hb_hook(complete_record)
self.last_heartbeat = time.time()
return

def comment(self, message, *a, **kw):
root = self.record_type(logger=self,
level=CRITICAL,
name='comment',
data=kw)
rec = CommentRecord(root, time.time(), message, a)
cur_time = time.time()
root.begin_record = BeginRecord(root, cur_time, 'comment', ())
root.complete_record = CompleteRecord(root, cur_time,
'comment', (), 'success')
rec = CommentRecord(root, cur_time, message, a)
if self.async_mode:
self.record_queue.append(('comment', rec))
else:
Expand Down
3 changes: 0 additions & 3 deletions lithoxyl/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ def _on_begin(self, record):
entry = self.formatter.on_begin(record)
return self.emitter.on_begin(record, entry)

def _on_hearbeat(self, record):
pass

def __repr__(self):
cn = self.__class__.__name__
return ('<%s filters=%r formatter=%r emitter=%r>'
Expand Down
18 changes: 10 additions & 8 deletions playground.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

from lithoxyl import context

context.get_context().enable_async()
#context.get_context().enable_async()

log = Logger('test', sinks=[stderr_sink], heartbeat=1000)
log = Logger('test', sinks=[stderr_sink])

with log.critical('first'):
print 'did some work'
Expand All @@ -42,19 +42,21 @@
print os.getpid()


class HeartbeatSink(object):
def on_heartbeat(self, complete_record):
print complete_record.message


class CommentSink(object):
def on_comment(self, comment_record):
print comment_record, comment_record.message
# import pdb;pdb.set_trace()


def emit_cur_time_hook(logger):
logger.comment('simpler heartbeats for a simpler time')

log.add_sink(HeartbeatSink())

log.preflush_hooks.append(emit_cur_time_hook)
log.add_sink(CommentSink())

# log.flush()


log.comment('{} {hah}', 'hah!', hah='HAH!')

Expand Down

0 comments on commit e1f8278

Please sign in to comment.