# Concurrency & Parallelism in Python

*Write a Python program which can make two HTTP requests in parallel. You can use any library or framework of your choosing, and you can call whatever API’s you like. The only rule is that the HTTP requests have to be made in parallel.*

*Consider the two requests below: A correct program will have received both responses in 5 seconds instead of 8 seconds:*

- http://httpbin.org/delay/5
- http://httpbin.org/delay/3

## Oli's Solution

This notebook is adapted from a talk:

> Python Concurrency From the Ground Up
> 
> David Beazley, PyCon 2015.
> https://www.youtube.com/watch?v=MCs5OvhV9S4&t=1089s

My solution uses the `socket` library for making an HTTP request over a TCP socket.

It uses a generator to create an event loop that can make the requests concurrently.

This approach is purely didactic - it demonstrates that we can build our own event loop in Python without any fancy libraries or threading by using generators. Additionally, it allows us to write our own custom HTTP client.

The drawback is that this is definitely not production code - there is no error handling, there is no HTTPS support, and there are no tests!

### Import Statements

In [1]:
from collections import deque

import errno
import select
import socket
import time
import urllib

### HTTP Client

Our HTTP client is a simple function that reads and writes plain text to a TCP socket. As it is just using `socket`, which is part of the standard library, we are not using `requests` or `aiohttp`.

This function is a generator. Instead of returning a value, it can `yield` control to its calling function at specific points.

We can use these yield commands to suspend and resume our function at specific times. We can use this to create an event-driven program in Python which can suspend and resume tasks.

In [2]:
def http_get(url):
    parsed = urllib.parse.urlparse(url)
    
    target_host = parsed.netloc
    target_path = parsed.path

    # HTTP is always port 80. HTTPS is not supported, as that requires encryption
    target_port = 80
    
    # Create a socket - this is a buffer that we are allowing our server to write to.
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # Set socket to non-blocking mode.
    # Instead of blocking, we will need to test when our socket is ready to send or receive data.
    # Our event loop will use the `select` call to do this for us.
    sock.setblocking(0)

    # Connect to HTTPBin via TCP on port 80. As our socket is non-blocking, this call will not block.
    # In non-blocking mode, this function raises an exception to signal that the connect operation is in progress.
    err = sock.connect_ex((target_host, target_port))
    if err != errno.EINPROGRESS:
        raise RuntimeError(f"Error in connect: {err}")

    # Ask the event loop to wait for our socket to be ready to send data.
    yield 'send', sock

    # We are ready to send some data.
    # Two blank lines indicates the end of the response.
    # Without them, the server will wait for our request to finish.
    request = [
        f"GET {target_path} HTTP/1.1",
        f"Host: {target_host}",
        "",
        "",
    ]
    request = "\n".join(request)
    
    # Send our request as bytes. As our socket is non-blocking, this line does not block.
    sock.send(request.encode('utf-8'))

    # Ask the event loop to wait for our socket to be ready to receive data.
    yield 'recv', sock

    # We are now unblocked. Receive some data - 4096 bytes is enough.
    response = sock.recv(4096)

    # As we are in a generator, we can yield the results of our processing.
    # I have chosen to yield the request and the response as plain text, but you can return any data here.
    response = response.decode('utf-8')
    yield 'data', (request, response)


### Event Loop

Our event loop takes a list of generators and runs them until they are complete. Our generators must be written to be compatible with the event loop. In this case, our generators must `yield` tuples of events in the form of `(why, what)`.

We run each generator in turn until it `yield`s a socket, or some data. We keep any sockets in a holding area until we have no more tasks ready to run. We then call the `select` function, which is blocking. This is provided by your operating system, and you can read it's documentation with `man select`:

```
select() and pselect() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.
```

By waiting on multiple file descriptors simultaneously, we can achieve concurrency. By using generators, we can ensure that we have "gathered" all of the tasks that have work to do before performing a blocking operation.

There many are alternative implementations that use more sophisticated methods than `select`.

In [3]:
def run_tasks(tasks):
    output = []
    tasks = deque(tasks)
    send_wait = {}
    recv_wait = {}
    
    # This program is our event loop.
    # It loops until all events - in this case, tasks and sockets, have completed.
    while any([tasks, send_wait, recv_wait]):
        while not tasks:
            # While there is no python code to run, make a blocking call to `select`.
            # This function returns when any of our sockets are ready to send or receive.
            # When they have changed state, we retrieve the task generator using the socket as a key,
            # and put it back onto the task list for processing.
            can_recv, can_send, [] = select.select(recv_wait, send_wait, [])
            for s in can_recv:
                tasks.append(recv_wait.pop(s))
            for s in can_send:
                tasks.append(send_wait.pop(s))
        
        # Retrieve a generator from the queue.
        task = tasks.popleft()
        try:
            # next() advances the code until the next yield statement.
            # Each yield returns a tuple describing why and what we have yielded.
            why, what = next(task)
        except StopIteration:
            # We've reached the end of our generator function.
            # Continue our event loop.
            continue
            
        # We need to wait to be ready to receive data.
        # Put our socket (what) in the recv_wait dict.
        if why == 'recv':
            recv_wait[what] = task

        # We need to wait to be ready to send data.
        # Put our socket (what) in the send_wait dict.
        elif why == 'send':
            send_wait[what] = task

        # We finally have some data! Add the results to a list.
        elif why == 'data':
            output.append(what)

    return output

### Testing

The code below should run in under 8 seconds - less than the time that it would take to perform the two requests sequentially.

In [4]:
urls = [
    "http://httpbin.org/delay/5",
    "http://httpbin.org/delay/3",
]

start_time = time.time()
responses = run_tasks([http_get(url) for url in urls])
end_time = time.time()

duration_seconds = end_time - start_time

print(f"Took {duration_seconds:.2f} seconds to receive {len(urls)} responses")

Took 5.16 seconds to receive 2 responses


### Printing the request & response

A benefit of writing our own HTTP client is that we can expose the text-based nature of HTTP. Here we can see exactly the data that we read and wrote to our underlying TCP socket, and how the server responded.

We can see that the requests and responses begin with the header information in plain text, followed by two blank lines, followed by the response body.

Your web browser probably uses HTTP/2 instead of HTTP, where possible. This is a more sophisticated binary protocol which is more efficient for sending and receiving data.

What would you do to make the HTTP protocol more efficient?

In [5]:
for request, response in responses:
    print(request)
    print(response)

GET /delay/3 HTTP/1.1
Host: httpbin.org


HTTP/1.1 200 OK
Date: Fri, 12 Feb 2021 16:37:06 GMT
Content-Type: application/json
Content-Length: 249
Connection: keep-alive
Server: gunicorn/19.9.0
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true

{
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-6026aeaf-08c03be06a7a6be529f79d12"
  }, 
  "origin": "86.153.157.254", 
  "url": "http://httpbin.org/delay/3"
}

GET /delay/5 HTTP/1.1
Host: httpbin.org


HTTP/1.1 200 OK
Date: Fri, 12 Feb 2021 16:37:08 GMT
Content-Type: application/json
Content-Length: 249
Connection: keep-alive
Server: gunicorn/19.9.0
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true

{
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Host": "httpbin.org", 
    "X-Amzn-Trace-Id": "Root=1-6026aeaf-65da73b013a7ab8679e7f744"
  }, 
  "origin": "86.153.157.254", 
  "url": "http: