## Asyncio Asynchronous Programming Lab
This lab is designed to get you familiar with the concepts of asynchronous programming, distributed system architectures such as async client server, service, pub-sub, and layered architectures, and web interfaces / microservices interfaces to start contributing Python microservices for monitoring and observability within a large tech environment.

In [1]:
print("Hello world")

Hello world


In [103]:
import asyncio
from time import time, sleep
import requests
import pdb

In [3]:
# This is a native coroutine
async def add(x, y):
    return x + y

In [4]:
# This is another native coroutine
async def add_wrapper(x, y):
    return await add(1, 2)

In [5]:
res = await add_wrapper(2, 3)

# Exercise: Implement the timeit function decorator without arguments, and then implement it WITHD arguments (for rounding)

In [35]:
def timeit(func, *args, **kwargs):
    def wrapper():
        start_time = time()
        ret = func(*args, **kwargs)
        end_time = time()
        elapsed_time = round(end_time - start_time, 2)
        print(f"Total time elapsed for function {func.__name__}: {elapsed_time} seconds")
    return wrapper

In [40]:
@timeit
def hello():
    sleep(0.0123)
    print("hello")

In [41]:
hello()

hello
Total time elapsed for function hello: 0.01 seconds


In [62]:
def a():
    return
a.__name__

'a'

In [86]:
def timeit(sig_fig=3):
    def _timeit(func, *args, **kwargs):
        def wrapper():
            start_time = time()
            ret = func(*args, **kwargs)
            end_time = time()
            elapsed_time = round(end_time - start_time, sig_fig)
            print(f"Total time elapsed for function {func.__name__}: {elapsed_time} seconds")
            return ret
        # Set wrapper name to calling function name
        wrapper.__name__ = func.__name__
        return wrapper
    return _timeit

In [87]:
@timeit()
def hello():
    sleep(0.0123)
    print("hello")

In [88]:
hello()

hello
Total time elapsed for function hello: 0.013 seconds


In [82]:
hello.__name__

'hello'

# Exercise: Implement serial and async versions of blogdom.py (adapter for local server) and time their execution
To do this, you will also need to set up a very simple echo server in another terminal. Do this with async.

In [123]:
URL = "http://127.0.0.1:9000"
N = 10

@timeit()
def serial_send_request():
    response = requests.get(URL)
    return response

@timeit()
async def async_send_request():
    response = requests.get(URL)
    return response

In [124]:
@timeit()
def serial_multiple():
    for _ in range(N):
        response = serial_send_request()
        print(f"Response returned status code: {response.status_code if response else "None"}")

In [92]:
serial_multiple()

Total time elapsed for function serial_send_request: 0.07 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.47 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.463 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.959 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.29 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.431 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.158 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.823 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.307 seconds
Response returned status code: 200
Total time elapsed for function serial_send_request: 0.28 seconds
Response returned s

In [109]:
await async_send_request()


Total time elapsed for function async_send_request: 0.0 seconds


<Response [200]>

In [141]:
# @timeit()
async def async_multiple():
    tasks = [async_send_request() for _ in range(N)]
    for task in asyncio.as_completed(tasks):
        response = await task
        print(f"Response returned status code: {response.status_code if response else None}")

In [133]:
start_time = time()
await async_multiple()
end_time = time()
print(f"Real elapsed time: {end_time - start_time:.2f} secs")

Total time elapsed for function async_multiple: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned st

# Exercise: Fix the above asynchronous implementation and figure out why it's not working. Write at least 5 sentences explaining where you went wrong

In [158]:
@timeit()
async def async_send_request():
    loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(None, requests.get, URL)
    return response

In [159]:
resp = await async_send_request()

Total time elapsed for function async_send_request: 0.0 seconds


In [160]:
resp

<Response [200]>

In [171]:
start_time = time()
await async_multiple()
end_time = time()
print(f"Real elapsed time: {end_time - start_time:.2f} secs")

Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Real elaps

What was the problem?

The problem was twofold. Here is our original `async_send_request` function:

```
async def async_send_request():
    response = requests.get(URL)
    return response
```

The problem is in the line
```
response = requests.get(URL)
```
This is a SYNCHRONOUS, BLOCKING call. The golden rule in asynchronous programming is that all components of the system must be asynchronous. Otherwise, the event loop will never regain control, causing your performance gains to be lost. The solution is to let the event loop know about this call by running it within the context of the executor, like so:

```
response = await loop.run_in_executor(None, requests.get, URL)
```

The `run_in_executor` function lets asyncio know about the synchronous function. That way, it can do other things if it blocks. The `None` argument tells it to do it within the context of the default event loop. So the whole corrected function looks like so:

```
async def async_send_request():
    loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(None, requests.get, URL)
    return response
```

Now the whole thing is asynchronous!

---

The second problem is with the Tornado server. The original server code looked like so:

```
import asyncio
import tornado
import time
from random import random

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        sleep_time = random()
        time.sleep(sleep_time)
        self.write("Hello, world")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

async def main():
    app = make_app()
    app.listen(8888)
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())
```

However, there were several problems with the `get` function: it was synchronous!
    - 1) The function was defined using `def get` instead of `async def get`, rendering it synchronous and therefore, blocking to the server's event loop.
    - 2) The function used the synchronous `time.sleep(sleep_time)` which is again, blocking.

The changes we made were to 1) redefine the function to be asynchronous and 2) replace the synchronous `time.sleep` with its asynchronous equivalent: `asyncio.sleep`. The complete code is below:

```
import asyncio
import tornado
from random import random

class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        sleep_time = random()
        await asyncio.sleep(sleep_time)
        self.write(f"Slept for {sleep_time:.2f} seconds")

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

async def main():
    app = make_app()
    app.listen(9000)
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())
```


# Exercise: Fix the timeit decorator above and adapt it to be able work for both synchronous and asynchronous 

Make it as good as possible with minimal duplication of code.
Also write at least 5 sentences on why the previous naive version didn't work for asynchronous functions.

In [203]:
import inspect
from functools import wraps

def timeit_fixed(sig_fig=3):
    def _timeit(func):
        if inspect.iscoroutinefunction(func):
            
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                start_time = time()
                ret = await func(*args, **kwargs)
                end_time = time()
                elapsed_time = round(end_time - start_time, sig_fig)
                print(f"Total time elapsed for asynchronous function {func.__name__}: {elapsed_time} seconds")
                return ret
            wrapper = async_wrapper
        else:

            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                start_time = time()
                ret = func(*args, **kwargs)
                end_time = time()
                elapsed_time = round(end_time - start_time, sig_fig)
                print(f"Total time elapsed for synchronous function {func.__name__}: {elapsed_time} seconds")
                return ret
            wrapper = sync_wrapper

        return wrapper
    return _timeit

In [204]:
@timeit_fixed()
async def async_send_request():
    loop = asyncio.get_running_loop()
    response = await loop.run_in_executor(None, requests.get, URL)
    return response

In [205]:
@timeit_fixed()
async def async_multiple(N):
    tasks = [async_send_request() for _ in range(N)]
    for task in asyncio.as_completed(tasks):
        response = await task
        print(f"Response returned status code: {response.status_code if response else None}")

In [206]:
await async_multiple(10)

Total time elapsed for asynchronous function async_send_request: 0.069 seconds
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.261 seconds
Total time elapsed for asynchronous function async_send_request: 0.26 seconds
Response returned status code: 200
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.294 seconds
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.397 seconds
Total time elapsed for asynchronous function async_send_request: 0.396 seconds
Response returned status code: 200
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.493 seconds
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.717 seconds
Response returned status code: 200
Total time elapsed for asynchronous function async_send_request: 0.825 seconds
Response r

In [207]:
@timeit_fixed()
def serial_send_request():
    response = requests.get(URL)
    return response

In [208]:
@timeit_fixed()
def serial_multiple(N):
    for _ in range(N):
        response = serial_send_request()
        print(f"Response returned status code: {response.status_code if response else "None"}")

In [209]:
serial_multiple(10)

Total time elapsed for synchronous function serial_send_request: 0.234 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.764 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.731 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.134 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.25 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.348 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.924 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.067 seconds
Response returned status code: 200
Total time elapsed for synchronous function serial_send_request: 0.074 seconds
Response r

The previous `@timeit` function decorator returned this erroneous output when the `await async_multiple()` command was executed:

```
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Total time elapsed for function async_send_request: 0.0 seconds
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
Response returned status code: 200
```

This is because the original `@timeit` decorator looked like this, failing to distinguish between asynchronous and synchronous functions:

```
def timeit(sig_fig=3):
    def _timeit(func, *args, **kwargs):
        def wrapper():
            start_time = time()
            ret = func(*args, **kwargs)
            end_time = time()
            elapsed_time = round(end_time - start_time, sig_fig)
            print(f"Total time elapsed for function {func.__name__}: {elapsed_time} seconds")
            return ret
        # Set wrapper name to calling function name
        wrapper.__name__ = func.__name__
        return wrapper
    return _timeit
```

Let's consider what happens when the `func` argument is asynchronous. In this case, the `func` call `ret = func(*args, **kwargs)` will return an awaitable coroutine object, and not the result of the actual asynchronous function itself. This causes the function execution to appear impossibly fast. This is due to the nature of asynchronous functions. They are designed to return immediately so as to yield control to the event loop, freeing up CPU cycles for something else to execute.

The solution is to distinguish between asynchronous and non-asynchronous functions in the `timeit` decorator, like so:

```
def timeit_fixed(sig_fig=3):
    def _timeit(func):
        if inspect.iscoroutinefunction(func):
            
            @wraps(func)
            async def async_wrapper(*args, **kwargs):
                start_time = time()
                ret = await func(*args, **kwargs)
                end_time = time()
                elapsed_time = round(end_time - start_time, sig_fig)
                print(f"Total time elapsed for asynchronous function {func.__name__}: {elapsed_time} seconds")
                return ret
            wrapper = async_wrapper
        else:

            @wraps(func)
            def sync_wrapper(*args, **kwargs):
                start_time = time()
                ret = func(*args, **kwargs)
                end_time = time()
                elapsed_time = round(end_time - start_time, sig_fig)
                print(f"Total time elapsed for synchronous function {func.__name__}: {elapsed_time} seconds")
                return ret
            wrapper = sync_wrapper

        return wrapper
    return _timeit
```
Here we use a branching condition `inspect.iscoroutinefunction(func)` to check if the function passed in is async or not. If it is async, we return an async wrapper function, wrapped so as to `await` on it inside the function so we can time it correctly. The `await` keyword allows other async functions to execute concurrently and for their execution times to overlap. If isn't async, we do the same thing we did before. Now, we can handle async functions correctly. A note about this is that since functions are first-class citizens in Python, we can treat functions just like objects and do property checks on them and branching conditions to return different kinds of functions.

# Exercise: Parallelizing prime number checking: Find all prime numbers from 1-1 million 1) naively 2) by implementing asyncio.

You can reference the section from fluent Python.

# Exercise: Do the TORNADO library exercises. GO DDEP into Tornado and figure out how it works! Almost like you have a new gadget on your desktop, tinker with it and figure out how it works!!

# Exercises: After reading distributed systems "System Architecture" section, set up two FastAPI / microservices Python modules, one pub and one sub, with and without asyncio and profile

# Exercise: Refactor gazepointer module to use async / await syntax and test it out (write some tests) to make sure it works as desired

In [128]:
!python --version

Python 3.12.10


# Bonus Exercise: Do some large-scale statistical analysis on simulated latency / response times and plot the statistical distributions and how well they conform to theoretically calculated values (long-tail latency and throughput analysis)

# Bonus Exercise: Write your own static / dynamic code analysis tool to inspect a piece of static / running code with the `inspect` module, based on [this StackOverflow answer](https://stackoverflow.com/questions/36076619/test-if-function-or-method-is-normal-or-asynchronous)