# Listening for events

Another requirement was to be able to listen for events.
This can quite often lead to deadlocks due to a blocking behaviour. 
We need to find a smart way to circumvent this.

In [1]:
import asyncio
import async_timeout
import random
import time

import app_state  # for singleton 'keep_running'

In [2]:
# override print to allow showing the output in one cell as it was printed in a terminal
import os

real_print = print
lines = []
def print(text):
    global lines
    lines.append(text)

Here's a simple event source emulatin events occuring randomly within a configurable timespan.

In [3]:
class EventSource(object):
    def __init__(self, max_timespan=2):
        self.max_timespan = max_timespan

    async def read(self):
        await asyncio.sleep(self.max_timespan * random.random())
        return "Hello"

event_source = EventSource(1.5)

And here is a callback that shall be executed by the `event_listener` on every event.

In [4]:
async def event_notifier(event):
    print(f"New {event=} at {time.perf_counter()-start:.1f} seconds.")

This is a simplified implementation of the event_listener. In order to make it work just like the other tasks, an `async_timeout` is used to interrupt waiting for events.

In [5]:
async def event_listener(event_source, callback):
    while app_state.keep_running:
        try:
            async with async_timeout.timeout(1):
                event = await event_source.read()
                if event:
                    await callback(event)
        except asyncio.TimeoutError:
            # no events so far
            pass

In [6]:
async def run_forever():
    while app_state.keep_running:
        print(f"Still running @ {time.perf_counter()-start:.1f} seconds")
        await asyncio.sleep(1)
    print("Done with all the work.")

In [7]:
async def exit_after(exit_after):
    await asyncio.sleep(exit_after)
    app_state.keep_running = False

In [8]:
app_state.keep_running = True
start = time.perf_counter()

_ = await asyncio.gather(
    exit_after(5),
    run_forever(),
    event_listener(event_source, event_notifier),
)

end = time.perf_counter()
print(f"Execution finished after {end-start:.1f} seconds.")

In [9]:
real_print("\n".join(lines))
lines = []

Still running @ 0.0 seconds
Still running @ 1.0 seconds
New event='Hello' at 1.4 seconds.
Still running @ 2.0 seconds
Still running @ 3.0 seconds
Still running @ 4.0 seconds
New event='Hello' at 4.2 seconds.
New event='Hello' at 4.7 seconds.
Done with all the work.
Execution finished after 5.7 seconds.
