Skip to content

Commit

Permalink
refactor: make EventSource emit only events that can be processed
Browse files Browse the repository at this point in the history
  • Loading branch information
cquintana92 committed May 24, 2024
1 parent a2dfc2a commit 1cf7106
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 29 deletions.
2 changes: 1 addition & 1 deletion event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def main(mode: Mode, dry_run: bool):
LOG.i("Starting with HttpEventSink")
sink = HttpEventSink()

runner = Runner(source=source, sink=sink, force_execution=mode == Mode.DEAD_LETTER)
runner = Runner(source=source, sink=sink)
runner.run()


Expand Down
7 changes: 6 additions & 1 deletion events/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
webhook_id = int(notify.payload)
event = SyncEvent.get_by(id=webhook_id)
if event is not None:
on_event(event)
if event.mark_as_taken():
on_event(event)
else:
LOG.info(
f"Event {event.id} was handled by another runner"
)
else:
LOG.info(f"Could not find event with id={notify.payload}")
except Exception as e:
Expand Down
45 changes: 18 additions & 27 deletions events/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,35 @@


class Runner:
def __init__(
self, source: EventSource, sink: EventSink, force_execution: bool = False
):
def __init__(self, source: EventSource, sink: EventSink):
self.__source = source
self.__sink = sink
self.__force_execution = force_execution

def run(self):
self.__source.run(self.__on_event)

@newrelic.agent.background_task()
def __on_event(self, event: SyncEvent):
try:
can_process = event.mark_as_taken()
if can_process or self.__force_execution:
event_created_at = event.created_at
start_time = arrow.now()
success = self.__sink.process(event)
if success:
event_id = event.id
SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done")
event_created_at = event.created_at
start_time = arrow.now()
success = self.__sink.process(event)
if success:
event_id = event.id
SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done")

end_time = arrow.now() - start_time
time_between_taken_and_created = start_time - event_created_at
end_time = arrow.now() - start_time
time_between_taken_and_created = start_time - event_created_at

newrelic.agent.record_custom_metric(
"Custom/sync_event_processed", 1
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_process_time", end_time.total_seconds()
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
else:
LOG.info(f"{event.id} was handled by another runner")
newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1)
newrelic.agent.record_custom_metric(
"Custom/sync_event_process_time", end_time.total_seconds()
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
except Exception as e:
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)

0 comments on commit 1cf7106

Please sign in to comment.