Skip to content

Commit

Permalink
Use conditions to sleep so that we can be woken up
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeboers committed Feb 11, 2016
1 parent 6e20d2f commit 9758498
Showing 1 changed file with 26 additions and 11 deletions.
37 changes: 26 additions & 11 deletions sgevents/eventlog.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import pprint
import logging
from threading import Condition

from .event import Event
from .logs import update_log_meta
Expand Down Expand Up @@ -32,7 +33,7 @@ class EventLog(object):
def __init__(self, shotgun=None, last_id=None, last_time=None, extra_fields=None):

self.shotgun = get_shotgun(shotgun)

# We need to force a timeout.
self.shotgun.config.timeout_secs = 30.1 # Marginally more than a multiple of 3.

Expand All @@ -53,6 +54,9 @@ def __init__(self, shotgun=None, last_id=None, last_time=None, extra_fields=None
#: The time of the last event we have seen.
self.last_time = last_time or None

self._signal_wake_up = Condition()
self._signal_sleep = Condition()

self.return_fields = list(Event.return_fields)
if extra_fields:
self.return_fields.extend(extra_fields)
Expand All @@ -71,7 +75,7 @@ def process_events_forever(self, func, *args, **kwargs):
return
except:
log.exception('error calling %s with event %d:\n%s' % (get_func_name(func), event.id, event.dumps(pretty=True)))

except KeyboardInterrupt:
return
except:
Expand All @@ -85,10 +89,26 @@ def process_events_forever(self, func, *args, **kwargs):
log.exception('Error %d during event iteration; sleeping for 10s' % error_count)
else:
log.warning('Error %d during event iteration; sleeping for 10s' % error_count, exc_info=True)
time.sleep(10)
self._sleep(10)
else:
log.warning('iter_events_forever returned; sleeping for 10s')
time.sleep(10)
self._sleep(10)

def _sleep(self, delay):
# If anything is waiting for us to sleep, let them know.
with self._signal_sleep:
self._signal_sleep.notify_all()
# Sleep until something wakes us up.
delay = min(delay, 60)
with self._signal_wake_up:
self._signal_wake_up.wait(delay)

def wake_up(self, wait=False, timeout=30.0):
with self._signal_wake_up:
self._signal_wake_up.notify_all()
if wait:
with self._signal_sleep:
self._signal_sleep.wait(timeout)

def iter_events_forever(self, batch_size=DEFAULT_COUNT, idle_delay=3.0):
"""Yields :class:`Event` objects as they become availible.
Expand All @@ -107,7 +127,7 @@ def iter_events_forever(self, batch_size=DEFAULT_COUNT, idle_delay=3.0):
handle_event(event)
"""

last_time = time.time()
warn_at = 600

Expand All @@ -129,7 +149,7 @@ def iter_events_forever(self, batch_size=DEFAULT_COUNT, idle_delay=3.0):
warn_at += 600
log.info('no new events in last %d minutes' % minutes)

time.sleep(idle_delay)
self._sleep(idle_delay)

def iter_events(self, count=DEFAULT_COUNT, wrap=True):
"""Polls for new events, filtering with :func:`filter_new`.
Expand Down Expand Up @@ -299,8 +319,3 @@ def find_entities(self, limit, filters=None, **kwargs):
limit=limit,
**kwargs
)





0 comments on commit 9758498

Please sign in to comment.