Skip to content

Commit

Permalink
Make entity filtering more robust, such that internal errors should n…
Browse files Browse the repository at this point in the history
…ot throw us into a loop
  • Loading branch information
mikeboers committed Sep 8, 2015
1 parent 4423df5 commit e9dae68
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
2 changes: 1 addition & 1 deletion sgevents/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
args = parser.parse_args()


for e in EventLog().iter_events():
for e in EventLog().iter_events_forever():
if args.dumps:
if args.verbose: print >> sys.stderr, e.summary
print e.dumps(pretty=args.pretty)
Expand Down
100 changes: 60 additions & 40 deletions sgevents/eventlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def __init__(self, shotgun=None, last_id=None, last_time=None, extra_fields=None
self.missing_ids = {}

#: How long to track missing IDs until we give up on them;
#: defaults to 15 seconds.
self.id_timeout = 15.0
#: defaults to 30 seconds.
self.id_timeout = 30.0

#: The time of the last event we have seen.
self.last_time = last_time or None
Expand All @@ -54,7 +54,7 @@ def __init__(self, shotgun=None, last_id=None, last_time=None, extra_fields=None
def process_events_forever(self, func, *args, **kwargs):
while True:
try:
for event in self.iter_events(*args, **kwargs):
for event in self.iter_events_forever(*args, **kwargs):
try:
with update_log_meta(event=event.id):
log.info(event.summary)
Expand All @@ -69,8 +69,11 @@ def process_events_forever(self, func, *args, **kwargs):
except:
log.exception('error during event iteration; sleeping for 10s')
time.sleep(10)
else:
log.warning('iter_events_forever returned; sleeping for 10s')
time.sleep(10)

def iter_events(self, batch_size=100, idle_delay=3.0):
def iter_events_forever(self, batch_size=100, idle_delay=3.0):
"""Yields :class:`Event` objects as they become availible.
:param int batch_size: The number of events to read from the API at once.
Expand All @@ -83,81 +86,100 @@ def iter_events(self, batch_size=100, idle_delay=3.0):
::
for event in event_log.iter():
for event in event_log.iter_events_forever():
handle_event(event)
"""

while True:

batch = self.read(batch_size)
batch = self.iter_events(batch_size)
for e in batch:
yield e

if not batch:
time.sleep(idle_delay)


def read(self, count=100):
def iter_events(self, count=100, wrap=True):
"""Polls for new events, filtering with :func:`filter_new`.
The EventLog assumes that once an event has been yielded OR an exception
raised, the underlying event has been processed and will not be yielded
again by subsequent calls. This is required for our error handling
to not go into an infinite loop in case we have internal errors for
specific events (e.g. in the specialization classes).
:param int count: The number of events to read from the API at once.
:return list: of new :class:`Event`.
:param bool wrap: Return :class:`Event` instead of raw ``EventLogEntry``.
:return generator: of events (or entities).
"""
raw_entities = self.find_next_entities(count)
for entity in raw_entities:
if self.filter_new(entity):
yield Event.factory(entity) if wrap else entity

def find_next_entities(self, count=100):
"""Find the next raw ``EventLogEntry`` entities.
This method does NOT update the ``last_id`` or ``last_time`` fields;
use :meth:`filter_new` on each entity for that.
"""

if self.max_complete_id:
entities = self._find(count, filters=[('id', 'greater_than', self.max_complete_id)])
entities = self.find_entities(count, filters=[('id', 'greater_than', self.max_complete_id)])
else:
if self.last_time:
# everything since the last time
log.info('starting at most recent event since %s' % self.last_time)
entities = self._find(count, filters=[('created_at', 'greater_than', self.last_time)])
entities = self.find_entities(count, filters=[('created_at', 'greater_than', self.last_time)])
else:
# the last event
log.info('starting at most recent event')
entities = self._find(1, order=[{
entities = self.find_entities(1, order=[{
'field_name': 'created_at',
'direction': 'desc',
}])
if entities:
log.info('most recent event is %d at %s' % (entities[0].id, entities[0].created_at))
log.info('most recent event is %d at %s' % (entities[0]['id'], entities[0]['created_at']))

return self.filter_new(entities) if entities else []
return entities or []

def filter_new(self, entities):
"""Filter out entities which we have seen before.
def filter_new(self, entity):
"""Filter out entities which we have seen before, and update our records.
Due to the transaction model of Shotgun's underlying database, it is
possible for events with lower IDs to be created after those with
higher IDs. This method is primarly dealing with remembering those
gaps, and making sure we don't skip those events if they do eventually
show up in the log stream.
:param dict entity: A raw ``EventLogEntry`` entity.
:return: The entity if it has not been seen before, else ``None``.
"""

# We try to be agressively defensive in this function, doing
# things in a manner/order such that if we made a mistake (or get
# odd entities from Shotgun) then we do not go into an infinite loop
# of fetching and processing the same entities over and over.

id_ = entity['id']
entity_is_new = id_ > self.max_partial_id or id_ in self.missing_ids

now = time.time()
newly_missed = []
unseen_entities = []

# TODO: be really defensive in here re: exceptions being raised
# and causing our tracking data to corrupt if we keep polling afterwards.

for e in entities:

if e.id > self.max_partial_id or e.id in self.missing_ids:
unseen_entities.append(e)

# If we have run before, and there is a gap being introduced by
# this event, then track it.
if self.max_partial_id:
for i in xrange(self.max_partial_id + 1, e.id):
log.info('newly missed event?? %s' % e.summary)
newly_missed.append(i)
self.missing_ids[i] = now
# If we have run before, and there is a gap being introduced by
# this event, then track it.
if self.max_partial_id:
for i in xrange(self.max_partial_id + 1, id_):
newly_missed.append(i)
self.missing_ids[i] = now

self.max_partial_id = max(self.max_partial_id, e.id)
self.missing_ids.pop(e.id, None)
self.max_partial_id = max(self.max_partial_id, id_)
self.missing_ids.pop(id_, None)

if newly_missed:
log.warning('missing %d event id%s: %s' % (
Expand All @@ -184,19 +206,17 @@ def filter_new(self, entities):
self.max_complete_id = self.max_partial_id

# We don't use this ourselves after the first scan, but it may be nice to have.
self.last_time = max(e['created_at'] for e in entities)

return unseen_entities
self.last_time = max(self.last_time, entity['created_at']) if self.last_time else entity['created_at']

return entity if entity_is_new else None

def _find(self, limit, filters=None, **kwargs):
raw_events = self.shotgun.find('EventLogEntry',
def find_entities(self, limit, filters=None, **kwargs):
return self.shotgun.find('EventLogEntry',
filters or [],
self.return_fields,
limit=limit,
**kwargs
)
return [Event.factory(e) for e in raw_events]



Expand Down

0 comments on commit e9dae68

Please sign in to comment.