# Concurrent Programming with Python
## `async`, `await`, `asyncio`
**Date**: December 3rd, 2024<br/>
**Instructor**: Jan Janak &lt;janakj@cs.columbia.edu&gt;

**Disclaimer**: This lecture is work in progress. The content may change significantly in future iterations.

<hr/>

# Introduction

This lecture introduces concurrent (asynchronous) programming in Python to control a toy robot. It follows a use-inspired style, introducing and explaining concurrency concepts as the need for them arises in a Python program controlling the robot's movements.

This lecture will be a part in a series of lecture or lab notes. The subsequent notes will introduce threading, multiprocessing, and inter-process communication primitives available in Python.

## The Setup

This Jupyter notebook is similar to those that you may have seen in other courses like ENGI E1006 and COMS W2132. It is different from those notebooks in the following ways:

  1) The notebook (its IPython kernel) runs on a Raspberry Pi embedded in the robot
  1) It has access to the robot's hardware (3D mouse, speaker, microphone, servos, LEDs)
  1) It has access to the API keys for Google Cloud and OpenAI's ChatGPT API

When I edit the code cells in this notebook, I edit a running Python program on the robot. The following diagram illustrates the setup:

<img width=1024 src="../images/setup.png"/>

All the software used in this notebook is open-source. Check out the [GitHub repository](https://github.com/janakj/shaky-steve) or the robot's [home page](https://janakj.github.io/shaky-steve/).

## Steve's API

The robot is entirely programmed in Python. There is a custom package called `steve` providing classes to access individual subsystems:

<table style='width: 100%; font-size: 100%; border: 1px solid black'>
    <caption>Steve's low-level application programming interface (API)</caption>
    <colgroup>
       <col span="1" style="width: 15%">
       <col span="1" style="width: 20%">
       <col span="1" style="width: 55%">
    </colgroup>
    <thead style='text-align: left'>
        <tr>
            <th>Class</th>
            <th>Resources Used</th>
            <th>Description</th>
            <th>Blocking</th>
        </tr>
    </thead>
    <tbody>
        <tr>
            <td><tt><a href='../steve/jupyter.py'>LEDs</a></tt></td>
            <td>Hardware</td>
            <td>Controls the robot's four LED indicators</td>
            <td>No</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/jupyter.py'>Servos</a></tt></td>
            <td>Hardware</td>
            <td>Controls the robot's six servo motors</td>
            <td>No</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/audio.py'>AudioClip</a></tt></td>
            <td>Hardware (audio)</td>
            <td>Plays WAV files via the the robot's loudspeaker</td>
            <td>Yes</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/stt.py'>SpeechToText</a></tt></td>
            <td>Network (Google Cloud)</td>
            <td>Converts speech recorded by the robot's microphone to text</td>
            <td>Yes</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/tts.py'>TextToSpeech</a></tt></td>
            <td>Network (Google Cloud)<br/>Hardware (audio)</td>
            <td>Converts text (words) to speech and sends the output to the robot's loudspeaker</td>
            <td>No</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/wake.py'>WakeWord</a></tt></td>
            <td>Hardware (audio)</td>
            <td>Continuously listens for "Hey Steve"</td>
            <td>Yes</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/chatgpt.py'>ChatGPT</a></tt></td>
            <td>Network (OpenAI Cloud)</td>
            <td>Expands given text using OpenAI's ChatGPT service</td>
            <td>Yes</td>
        </tr>
        <tr>
            <td><tt><a href='../steve/mouse.py'>Mouse</a></tt></td>
            <td>Hardware (USB peripheral)</td>
            <td>Reads button and motion events from the 3D mouse</td>
            <td>Yes</td>
        </tr>
    </tbody>
</table>

All above classes (and a few additional utility functions) can be imported in a single statement:

In [1]:
from steve import *

The following example show how you can use the API:

In [None]:
chirp = AudioClip('../sounds/chirp.wav') # Load a .wav file
tts = TextToSpeech()                     # Create and start a Google Cloud Text-to-speech client
wake = WakeWord()                        # Create a wake word detector based on OpenWakeWord library

In [None]:
print('Listening for "Hey Steve"')
wake.detect()                            # Block until 'Hey Steve' has been detected
print('Detected')

chirp.play()                             # Play the chirp. Wait until the playback completes.
tts.say('Nice to hear from you!')        # Greet the user via text-to-speech

We will introduce the remaining APIs as we go.

# Servo Control

We will start by learning to move the robot. Obtain access to the servo API:

In [2]:
servos = Servos()
display(servos)

VBox(children=(FloatSlider(value=0.0, continuous_update=False, description='Torso', disabled=True, max=90.0, m…

There are six servo motors named "clamp", "wrist_ud", "wrist_lr", "elbow", "shoulder", and "torso". The servos start powered off. Setting a float value to a servo powers on and moves the servo:

In [3]:
# Rotate torso 90 degrees counter clock-wise
servos.torso.set(-90)

While the servo is powered on, it maintains its configured position (resist being pushed).

To move to a different position, write a different value to it:

In [5]:
# Rotate torso 90 degrees clock-wise
servos.torso.set(90)

**Note: the servo moves to the new position as quickly as possible. The write operation is non-blocking, i.e., it does not wait for the servo to reach the state.**

# Learn: Blocking and Non-Blocking Operations

Note the last column called "Blocking" in the API table. Most APIs are blocking, i.e., the corresponding method does not return until the operation completes. This can take significant time, e.g., if the operation needs to wait for a hardware event, or if it communicates over the network.

In contrast,servo and LED APIs are *non-blocking*, meaning they do not wait for the operation to complete. When you move a servo, the API returns *before* the servo reaches the desired state.

<img src="../images/blocking-non-blocking.png"/>

Non-blocking operations can be performed simultaneously, e.g., to move two or more servos:

In [6]:
# The two servos will move simultaneously
servos.torso.set(-90)
servos.wrist_lr.set(-90)

In [7]:
servos.torso.get()

-90.0

But, we do not control the speed and don't know when the operations complete.

**When writing concurrent (multi-tasking) programs, understanding which operations are blocking and non-blocking is crucial. This is usually documented.**

# Speed Control

The only way to control the speed of a servo is with incremental updates, i.e., by periodically updating its state. We know how to do that in Python:

In [8]:
import math
from datetime import datetime, timedelta
from time import sleep

# This function incrementally moves the servo to the destination state
# "dst" with "speed" and the given update "rate" (updates per second).

def move(servo, dst, speed=None, rate=50):
    # Get the servo's current state
    src = servo.get()

    if speed is not None:
        # If speed is set, compute the duration it will take to move
        # from the state "src" to "dst".
        duration = abs(dst - src) / 180 * math.pi / speed
    else:
        # If speed is unset, move as quickly as possible. With a duration
        # of 0 the code below degrades to a single update.
        duration = 0

    start = datetime.now()
    stop = start + timedelta(seconds=duration)
    while True:
        now = datetime.now()
        if now >= stop:  # We ran out of time
            servo = dst  # Move to dst and return
            return dst

        i = (now - start).total_seconds() / duration
        if i > 1: i = 1

        servo.set((dst - src) * i + src)
        sleep(1 / rate)

In [9]:
display(servos)

VBox(children=(FloatSlider(value=-90.0, continuous_update=False, description='Torso', max=90.0, min=-90.0, rea…

In [10]:
move(servos.wrist_lr, 90, 0.1)

90

This is great, except the operation is now **blocking**! While `move` is running, I cannot do anything else. My _main execution thread_ is blocked, although the function just sleeps most of the time. Thus, we cannot use this approach to slowly move multiple servos simultaneously.

# Learn: CPU-Bound and I/O-Bound Operations

The `move` function can take a while to return, depending on the speed and distance. Yet, if you look at its source code, it is not busy. All statements except `sleep` are fast. Sleep suspends the execution, allowing other processes (applications) to run. The function `sleep` waits for a timer to fire, i.e., an external event.

The `move` function is **I/O-bound**. Its running time is dominated by waiting for external events (timers). In contrast, an operation is **CPU-bound** if its running time is dominated by CPU-intensive computations.

<img src="../images/io-cpu.png"/>

# Learn: Interleaving I/O-Bound Operations With `async` and `await`

Python 3.5 (2015) introduced two new reserved words to the language: `async` and `await`. If you sprinkle your I/O-bound operations with these keywords, Python will let you run multiple such operations **concurrently** in the same (main) execution thread.

<img src='../images/interleaving.png'/>

_This would solve our problem, i.e., we could slowly move multiple servos simultaneously._

## Asynchronous Functions (`async`)

The `async` keyword can be used to mark a function as "asynchronous". We can apply that to our `move` function:

In [11]:
# This function incrementally moves the servo to the destination state
# "dst" with "speed" and the given update "rate" (updates per second).

# NOTE async at the beginning of the next line
async def move(servo, dst, speed=None, rate=50):
    # Get the servo's current state
    src = servo.get()

    if speed is not None:
        # If speed is set, compute the duration it will take to move
        # from the state "src" to "dst".
        duration = abs(dst - src) / 180 * math.pi / speed
    else:
        # If speed is unset, move as quickly as possible. With a duration
        # of 0 the code below degrades to a single update.
        duration = 0

    start = datetime.now()
    stop = start + timedelta(seconds=duration)
    while True:
        now = datetime.now()
        if now >= stop:  # We ran out of time
            servo = dst  # Move to dst and return
            return dst

        i = (now - start).total_seconds() / duration
        if i > 1: i = 1

        servo.set((dst - src) * i + src)
        sleep(1 / rate)

Let's invoke it and see what happens. Remember, the previous (synchronous) version would block until the movement completes.

In [12]:
move(servos.wrist_lr, 90, 0.1)

<coroutine object move at 0x106eb30a0>

This time, the function did not block, but it also did not run (no movement)! Furthermore, it returned a strange "coroutine" object. Let's try to understand it.

### From Subroutines to Coroutines

Recall (from E1006) what happens when you invoke a regular (synchronous) Python function:
  1) The main program (or another function) is temporarily suspended;
  1) Control is immediately passed to the invoked function;
  1) Once the function finishes and returns a value, the main program continues.

<img width=800 src="https://raw.githubusercontent.com/cucs-python/public/spring-2023/e1006/lectures/function_flow_of_control.png"/>

This process can be "nested", i.e., the function can invoke other functions, which can also invoke other functions, etc.

**We will refer to all the computation that happens to obtain the return value a subroutine**. A subroutine is created immediately when you invoke a function and terminates when the function returns. The main execution thread is blocked while the subroutine is running. _The subroutine is an abstract concept (not represented with objects in Python)._

The `async` keyword modifies this behavior. When you invoke a function defined as `async`, it DOES NOT start executing immediately. Instead, you get a `coroutine` object representing the "future computation" (subroutine). This extra steps makes it possible to create multiple coroutines to be started later:

In [13]:
movement1 = move(servos.torso, 90, 0.1)
movement2 = move(servos.wrist_lr, -90, 0.1)

In [14]:
movement1

<coroutine object move at 0x125117760>

In [15]:
movement2

<coroutine object move at 0x1251179a0>

We have two (not yet started) coroutines. Both will invoke the `move` function, but with different parameters. Being able to create multiple coroutines without starting them is a necessary, but not sufficient condition for concurrency.

## Cooperative Functions (`await`)

Once a coroutine has been running, Python will not automatically interrupt it. If the couroutine knows it cannot make progress, e.g., when it reaches the `sleep` statement, it must excplicitly tell Python to suspend the current coroutine and schedule another one, if there is any. The `await` keyword does exactly that.

Unfortunately, `await` cannot be used with arbitrary blocking functions such as `time.sleep`. The function must return a special "awaitable" object. Fortunately, the `asyncio` module provides alternatives for common blocking functions. Thus, we can replace `sleep` with `asyncio.sleep`.

In [16]:
import asyncio

async def move(servo, dst, speed=None, rate=50):
    # Get the servo's current state
    src = servo.get()

    if speed is not None:
        # If speed is set, compute the duration it will take to move
        # from the state "src" to "dst".
        duration = abs(dst - src) / 180 * math.pi / speed
    else:
        # If speed is unset, move as quickly as possible. With a duration
        # of 0 the code below degrades to a single update.
        duration = 0

    start = datetime.now()
    stop = start + timedelta(seconds=duration)
    while True:
        now = datetime.now()
        if now >= stop:  # We ran out of time
            servo = dst  # Move to dst and return
            return dst

        i = (now - start).total_seconds() / duration
        if i > 1: i = 1

        servo.set((dst - src) * i + src)
        await asyncio.sleep(1 / rate)         # <===== "sleep" became "await asyncio.sleep"

Turning a subroutine into a cooperatively-scheduled coroutine requires the following:
  1. Define the function with `async`;
  1. Identify all blocking statements (only `sleep` in our example);
  1. Replace them with awaitable alternatives provided by the `asyncio` module;
  1. Label the statements with `await` to let Python switch to other coroutines.

## Concurrent Scheduling

Finally, we can concurrently run the two coroutines defined earlier:

In [17]:
import asyncio

# Wrap the coroutines in named tasks and start executing as soon as possible
task1 = asyncio.create_task(movement1, name='Moving torso')
task2 = asyncio.create_task(movement2, name='Moving wrist')

Voilla! We have two concurrently running tasks. Furthermore, our main execution thread is available for more work.

The tasks and coroutines are "awaitable" themselves, which means we can wait for the completion of one task before starting another:

In [None]:
movement1 = move(servos.torso, 90, 0.1)
movement2 = move(servos.wrist_lr, -90, 0.1)

await asyncio.create_task(movement1, name='Moving torso') # This statement blocks until torso stops moving
await asyncio.create_task(movement2, name='Moving wrist') # This statement blocks until wrist stops moving

The `asyncio` module provides utility functions to compose sequential and concurrent tasks (coroutines) in natural and familiar style. This is very powerful programming abstraction which helps to keep the code readable and maintainable.

# Limitations

  - Need for `await` and `asyncio` wrappers
  - "Out of luck" with external code that cannot be modified

# Threading

# Additional Resources

  * Shaky Steve's Python software is open-source. You can study (and improve) it at https://github.com/janakj/shaky-steve.
  * Steve also has a home page at https://janakj.github.io/shaky-steve/.
  * Python `asyncio` module documentation
  * Python `threading` module documentation