Skip to content

Commit

Permalink
fix: event listener (#2119)
Browse files Browse the repository at this point in the history
* fix: commit transaction after taking event

* feat: allow to reconnect to postgres for event listener

* chore: log sync events pending to process to metrics

* fix: make dead_letter runner able to process events without needing to have lock on the event

* chore: close Session after reconnect

* refactor: make EventSource emit only events that can be processed
  • Loading branch information
cquintana92 committed May 24, 2024
1 parent 450322f commit 6862ed3
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 25 deletions.
3 changes: 3 additions & 0 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3699,7 +3699,10 @@ def mark_as_taken(self) -> bool:
AND taken_time IS NULL
"""
args = {"taken_time": arrow.now().datetime, "sync_event_id": self.id}

res = Session.execute(sql, args)
Session.commit()

return res.rowcount > 0

@classmethod
Expand Down
29 changes: 27 additions & 2 deletions events/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
_DEAD_LETTER_THRESHOLD_MINUTES = 10
_DEAD_LETTER_INTERVAL_SECONDS = 30

_POSTGRES_RECONNECT_INTERVAL_SECONDS = 5


class EventSource(ABC):
@abstractmethod
Expand All @@ -22,9 +24,19 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]):

class PostgresEventSource(EventSource):
def __init__(self, connection_string: str):
self.__connection = psycopg2.connect(connection_string)
self.__connection_string = connection_string
self.__connect()

def run(self, on_event: Callable[[SyncEvent], NoReturn]):
while True:
try:
self.__listen(on_event)
except Exception as e:
LOG.warn(f"Error listening to events: {e}")
sleep(_POSTGRES_RECONNECT_INTERVAL_SECONDS)
self.__connect()

def __listen(self, on_event: Callable[[SyncEvent], NoReturn]):
self.__connection.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
)
Expand All @@ -44,12 +56,24 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]):
webhook_id = int(notify.payload)
event = SyncEvent.get_by(id=webhook_id)
if event is not None:
on_event(event)
if event.mark_as_taken():
on_event(event)
else:
LOG.info(
f"Event {event.id} was handled by another runner"
)
else:
LOG.info(f"Could not find event with id={notify.payload}")
except Exception as e:
LOG.warn(f"Error getting event: {e}")

def __connect(self):
self.__connection = psycopg2.connect(self.__connection_string)

from app.db import Session

Session.close()


class DeadLetterEventSource(EventSource):
@newrelic.agent.background_task()
Expand All @@ -73,3 +97,4 @@ def run(self, on_event: Callable[[SyncEvent], NoReturn]):
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
except Exception as e:
LOG.warn(f"Error getting dead letter event: {e}")
sleep(_DEAD_LETTER_INTERVAL_SECONDS)
40 changes: 17 additions & 23 deletions events/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,25 @@ def run(self):
@newrelic.agent.background_task()
def __on_event(self, event: SyncEvent):
try:
can_process = event.mark_as_taken()
if can_process:
event_created_at = event.created_at
start_time = arrow.now()
success = self.__sink.process(event)
if success:
event_id = event.id
SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done")
event_created_at = event.created_at
start_time = arrow.now()
success = self.__sink.process(event)
if success:
event_id = event.id
SyncEvent.delete(event.id, commit=True)
LOG.info(f"Marked {event_id} as done")

end_time = arrow.now() - start_time
time_between_taken_and_created = start_time - event_created_at
end_time = arrow.now() - start_time
time_between_taken_and_created = start_time - event_created_at

newrelic.agent.record_custom_metric(
"Custom/sync_event_processed", 1
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_process_time", end_time.total_seconds()
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
else:
LOG.info(f"{event.id} was handled by another runner")
newrelic.agent.record_custom_metric("Custom/sync_event_processed", 1)
newrelic.agent.record_custom_metric(
"Custom/sync_event_process_time", end_time.total_seconds()
)
newrelic.agent.record_custom_metric(
"Custom/sync_event_elapsed_time",
time_between_taken_and_created.total_seconds(),
)
except Exception as e:
LOG.warn(f"Exception processing event [id={event.id}]: {e}")
newrelic.agent.record_custom_metric("Custom/sync_event_failed", 1)
12 changes: 12 additions & 0 deletions monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,23 @@ def log_nb_db_connection():
newrelic.agent.record_custom_metric("Custom/nb_db_connections", nb_connection)


@newrelic.agent.background_task()
def log_pending_to_process_events():
r = Session.execute("select count(*) from sync_events WHERE taken_time IS NULL;")
events_pending = list(r)[0][0]

LOG.d("number of events pending to process %s", events_pending)
newrelic.agent.record_custom_metric(
"Custom/sync_events_pending_to_process", events_pending
)


if __name__ == "__main__":
exporter = MetricExporter(get_newrelic_license())
while True:
log_postfix_metrics()
log_nb_db_connection()
log_pending_to_process_events()
Session.close()

exporter.run()
Expand Down

0 comments on commit 6862ed3

Please sign in to comment.