# Synchronous Programming

This first example shows a somewhat contrived way of having a task retrieve work from a queue and process that work. A queue in Python is a nice FIFO (first in first out) data structure. It provides methods to put things in a queue and take them out again in the order they were inserted.

In this case, the work is to get a number from the queue and have a loop count up to that number. It prints to the console when the loop begins, and again to output the total. This program demonstrates one way for multiple synchronous tasks to process the work in a queue.

In [3]:
import queue

def task(name, work_queue):
    if work_queue.empty():
        print(f"Task {name} nothing to do")
    else:
        while not work_queue.empty():
            count = work_queue.get()
            total = 0
            print(f"Task {name} running")
            for x in range(count):
                total += 1
            print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some synchronous tasks
    tasks = [(task, "One", work_queue), (task, "Two", work_queue)]

    # Run the tasks
    for t, n, q in tasks:
        t(n, q)

if __name__ == "__main__":
    main()

Task One running
Task One total: 15
Task One running
Task One total: 10
Task One running
Task One total: 5
Task One running
Task One total: 2
Task Two nothing to do


### Let’s take a look at what each line does:

- Line 1 imports the queue module. This is where the program stores work to be done by the tasks. 
- Lines 3 to 13 define task(). This function pulls work out of work_queue and processes the work until there isn’t any more to do.
- Line 15 defines main() to run the program tasks.
- Line 20 creates the work_queue. All tasks use this shared resource to retrieve work.
- Lines 23 to 24 put work in work_queue. In this case, it’s just a random count of values for the tasks to process.
- Line 27 creates a list of task tuples, with the parameter values those tasks will be passed.
- Lines 30 to 31 iterate over the list of task tuples, calling each one and passing the previously defined parameter values.
- Line 34 calls main() to run the program.

The task in this program is just a function accepting a string and a queue as parameters. When executed, it looks for anything in the queue to process. If there is work to do, then it pulls values off the queue, starts a for loop to count up to that value, and outputs the total at the end. It continues getting work off the queue until there is nothing left and it exits.

# Simple Cooperative Concurrency

The next version of the program allows the two tasks to work together. Adding a yield statement means the loop will yield control at the specified point while still maintaining its context. This way, the yielding task can be restarted later.

The yield statement turns task() into a generator. A generator function is called just like any other function in Python, but when the yield statement is executed, control is returned to the caller of the function. This is essentially a context switch, as control moves from the generator function to the caller.

The interesting part is that control can be given back to the generator function by calling next() on the generator. This is a context switch back to the generator function, which picks up execution with all function variables that were defined before the yield still intact.

The while loop in main() takes advantage of this when it calls next(t). This statement restarts the task at the point where it previously yielded. All of this means that you’re in control when the context switch happens: when the yield statement is executed in task().

This is a form of cooperative multitasking. The program is yielding control of its current context so that something else can run. In this case, it allows the while loop in main() to run two instances of task() as a generator function. Each instance consumes work from the same queue. This is sort of clever, but it’s also a lot of work to get the same results as the first program. 

In [4]:
import queue

def task(name, queue):
    while not queue.empty():
        count = queue.get()
        total = 0
        print(f"Task {name} running")
        for x in range(count):
            total += 1
            yield
        print(f"Task {name} total: {total}")

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    # Create some tasks
    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    while not done:
        for t in tasks:
            try:
                next(t)
            except StopIteration:
                tasks.remove(t)
            if len(tasks) == 0:
                done = True

if __name__ == "__main__":
    main()

Task One running
Task Two running
Task Two total: 10
Task Two running
Task One total: 15
Task One running
Task Two total: 5
Task One total: 2


You can see that both Task One and Task Two are running and consuming work from the queue. This is what’s intended, as both tasks are processing work, and each is responsible for two items in the queue. This is interesting, but again, it takes quite a bit of work to achieve these results.

The trick here is using the yield statement, which turns task() into a generator and performs a context switch. The program uses this context switch to give control to the while loop in main(), allowing two instances of a task to run cooperatively.

Notice how Task Two outputs its total first. This might lead you to think that the tasks are running asynchronously. However, this is still a synchronous program. It’s structured so the two tasks can trade contexts back and forth. The reason why Task Two outputs its total first is that it’s only counting to 10, while Task One is counting to 15. Task Two simply arrives at its total first, so it gets to print its output to the console before Task One.

Here’s what’s happening in the code above:

- Lines 3 to 11 define task() as before, but the addition of yield on Line 10 turns the function into a generator. This where the context switch is made and control is handed back to the while loop in main().
- Line 25 creates the task list, but in a slightly different manner than you saw in the previous example code. In this case, each task is called with its parameters as its entered in the tasks list variable. This is necessary to get the task() generator function running the first time.
- Lines 31 to 36 are the modifications to the while loop in main() that allow task() to run cooperatively. This is where control returns to each instance of task() when it yields, allowing the loop to continue and run another task.
- Line 32 gives control back to task(), and continues its execution after the point where yield was called.
- Line 36 sets the done variable. The while loop ends when all tasks have been completed and removed from tasks.

# Cooperative Concurrency With Blocking Calls
The next version of the program is the same as the last, except for the addition of a time.sleep(delay) in the body of your task loop. This adds a delay based on the value retrieved from the work queue to every iteration of the task loop. The delay simulates the effect of a blocking call occurring in your task.

A blocking call is code that stops the CPU from doing anything else for some period of time. In the thought experiments above, if a parent wasn’t able to break away from balancing the checkbook until it was complete, that would be a blocking call.

time.sleep(delay) does the same thing in this example, because the CPU can’t do anything else but wait for the delay to expire.

In [6]:
!pip install codetiming

Collecting codetiming
  Downloading codetiming-1.2.0-py3-none-any.whl (10 kB)
Installing collected packages: codetiming
Successfully installed codetiming-1.2.0


In [7]:
import time
import queue
from codetiming import Timer

def task(name, queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not queue.empty():
        delay = queue.get()
        print(f"Task {name} running")
        timer.start()
        time.sleep(delay)
        timer.stop()
        yield

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        work_queue.put(work)

    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True

if __name__ == "__main__":
    main()

Task One running
Task One elapsed time: 15.0
Task Two running
Task Two elapsed time: 10.0
Task One running
Task One elapsed time: 5.0
Task Two running
Task Two elapsed time: 2.0

Total elapsed time: 32.0


Here’s what’s different in the code above:

- Line 1 imports the time module to give the program access to time.sleep().
- Line 3 imports the the Timer code from the codetiming module.
- Line 6 creates the Timer instance used to measure the time taken for each iteration of the task loop.
- Line 10 starts the timer instance
- Line 11 changes task() to include a time.sleep(delay) to mimic an IO delay. This replaces the for loop that did the counting in previous example.
- Line 12 stops the timer instance and outputs the elapsed time since timer.start() was called.
- Line 30 creates a Timer context manager that will output the elapsed time the entire while loop took to execute.

# Cooperative Concurrency With Non-Blocking Calls
The next version of the program has been modified quite a bit. It makes use of Python async features using asyncio/await provided in Python 3.

The time and queue modules have been replaced with the asyncio package. This gives your program access to asynchronous friendly (non-blocking) sleep and queue functionality. The change to task() defines it as asynchronous with the addition of the async prefix on line 4. This indicates to Python that the function will be asynchronous.

The other big change is removing the time.sleep(delay) and yield statements, and replacing them with await asyncio.sleep(delay). This creates a non-blocking delay that will perform a context switch back to the caller main().

The while loop inside main() no longer exists. Instead of task_array, there’s a call to await asyncio.gather(...). This tells asyncio two things:

Create two tasks based on task() and start running them.
Wait for both of these to be completed before moving forward.
The last line of the program asyncio.run(main()) runs main(). This creates what’s known as an event loop). It’s this loop that will run main(), which in turn will run the two instances of task().

The event loop is at the heart of the Python async system. It runs all the code, including main(). When task code is executing, the CPU is busy doing work. When the await keyword is reached, a context switch occurs, and control passes back to the event loop. The event loop looks at all the tasks waiting for an event (in this case, an asyncio.sleep(delay) timeout) and passes control to a task with an event that’s ready.

await asyncio.sleep(delay) is non-blocking in regards to the CPU. Instead of waiting for the delay to timeout, the CPU registers a sleep event on the event loop task queue and performs a context switch by passing control to the event loop. The event loop continuously looks for completed events and passes control back to the task waiting for that event. In this way, the CPU can stay busy if work is available, while the event loop monitors the events that will happen in the future.

In [10]:
import asyncio
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    while not work_queue.empty():
        delay = await work_queue.get()
        print(f"Task {name} running")
        timer.start()
        await asyncio.sleep(delay)
        timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for work in [15, 10, 5, 2]:
        await work_queue.put(work)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    await main()



Task One running
Task Two running
Task Two elapsed time: 10.0
Task Two running
Task One elapsed time: 15.0
Task One running
Task Two elapsed time: 5.0
Task One elapsed time: 2.0

Total elapsed time: 17.0


- Line 1 imports asyncio to gain access to Python async functionality. This replaces the time import.
- Line 2 imports the the Timer code from the codetiming module.
- Line 4 shows the addition of the async keyword in front of the task() definition. This informs the program that task can run asynchronously.
- Line 5 creates the Timer instance used to measure the time taken for each iteration of the task loop.
- Line 9 starts the timer instance
- Line 10 replaces time.sleep(delay) with the non-blocking asyncio.sleep(delay), which also yields control (or switches contexts) back to the main event loop.
- Line 11 stops the timer instance and outputs the elapsed time since timer.start() was called.
- Line 18 creates the non-blocking asynchronous work_queue.
- Lines 21 to 22 put work into work_queue in an asynchronous manner using the await keyword.
- Line 25 creates a Timer context manager that will output the elapsed time the entire while loop took to execute.
- Lines 26 to 29 create the two tasks and gather them together, so the program will wait for both tasks to complete.
- Line 32 starts the program running asynchronously. It also starts the internal event loop.

# Synchronous (Blocking) HTTP Calls
The next version of the program is kind of a step forward as well as a step back. The program is doing some actual work with real IO by making HTTP requests to a list of URLs and getting the page contents. However, it’s doing so in a blocking (synchronous) manner.

The program has been modified to import the wonderful requests module to make the actual HTTP requests. Also, the queue now contains a list of URLs, rather than numbers. In addition, task() no longer increments a counter. Instead, requests gets the contents of a URL retrieved from the queue, and prints how long it took to do so.

In [5]:
import queue
import requests
from codetiming import Timer

def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    with requests.Session() as session:
        while not work_queue.empty():
            url = work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            session.get(url)
            timer.stop()
            yield

def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = queue.Queue()

    # Put some work in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
    ]:
        work_queue.put(url)

    tasks = [task("One", work_queue), task("Two", work_queue)]

    # Run the tasks
    done = False
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        while not done:
            for t in tasks:
                try:
                    next(t)
                except StopIteration:
                    tasks.remove(t)
                if len(tasks) == 0:
                    done = True

if __name__ == "__main__":
    main()

  class _Tellable:


Task One getting URL: http://google.com
Task One elapsed time: 0.7
Task Two getting URL: http://yahoo.com
Task Two elapsed time: 3.1
Task One getting URL: http://linkedin.com
Task One elapsed time: 3.9
Task Two getting URL: http://apple.com
Task Two elapsed time: 1.0
Task One getting URL: http://microsoft.com
Task One elapsed time: 1.2
Task Two getting URL: http://facebook.com
Task Two elapsed time: 6.4

Total elapsed time: 16.4


- Line 2 imports requests, which provides a convenient way to make HTTP calls.
- Line 3 imports the the Timer code from the codetiming module.
- Line 6 creates the Timer instance used to measure the time taken for each iteration of the task loop.
- Line 11 starts the timer instance
- Line 12 introduces a delay, similar to example_3.py. However, this time it calls session.get(url), which returns the contents of the URL retrieved from work_queue.
- Line 13 stops the timer instance and outputs the elapsed time since timer.start() was called.
- Lines 23 to 32 put the list of URLs into work_queue.
- Line 39 creates a Timer context manager that will output the elapsed time the entire while loop took to execute.

Just like in earlier versions of the program, yield turns task() into a generator. It also performs a context switch that lets the other task instance run.

Each task gets a URL from the work queue, retrieves the contents of the page, and reports how long it took to get that content.

As before, yield allows both your tasks to run cooperatively. However, since this program is running synchronously, each session.get() call blocks the CPU until the page is retrieved. Note the total time it took to run the entire program at the end. This will be meaningful for the next example.

# Asynchronous (Non-Blocking) HTTP Calls
This version of the program modifies the previous one to use Python async features. It also imports the aiohttp module, which is a library to make HTTP requests in an asynchronous fashion using asyncio.

The tasks here have been modified to remove the yield call since the code to make the HTTP GET call is no longer blocking. It also performs a context switch back to the event loop.

In [13]:
!pip install aiohttp

Collecting aiohttp
  Downloading aiohttp-3.6.2-py3-none-any.whl (441 kB)
Collecting multidict<5.0,>=4.5
  Downloading multidict-4.7.6-cp38-cp38-win_amd64.whl (48 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.5.1-cp38-cp38-win_amd64.whl (128 kB)
Collecting async-timeout<4.0,>=3.0
  Downloading async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Installing collected packages: multidict, yarl, async-timeout, aiohttp
Successfully installed aiohttp-3.6.2 async-timeout-3.0.1 multidict-4.7.6 yarl-1.5.1


In [8]:
import asyncio
import aiohttp
from codetiming import Timer

async def task(name, work_queue):
    timer = Timer(text=f"Task {name} elapsed time: {{:.1f}}")
    async with aiohttp.ClientSession() as session:
        while not work_queue.empty():
            url = await work_queue.get()
            print(f"Task {name} getting URL: {url}")
            timer.start()
            async with session.get(url) as response:
                await response.text()
            timer.stop()

async def main():
    """
    This is the main entry point for the program
    """
    # Create the queue of work
    work_queue = asyncio.Queue()

    # Put some work in the queue
    for url in [
        "http://google.com",
        "http://yahoo.com",
        "http://linkedin.com",
        "http://apple.com",
        "http://microsoft.com",
        "http://facebook.com",
    ]:
        await work_queue.put(url)

    # Run the tasks
    with Timer(text="\nTotal elapsed time: {:.1f}"):
        await asyncio.gather(
            asyncio.create_task(task("One", work_queue)),
            asyncio.create_task(task("Two", work_queue)),
        )

if __name__ == "__main__":
    await main()

Task One getting URL: http://google.com
Task Two getting URL: http://yahoo.com
Task One elapsed time: 0.9
Task One getting URL: http://linkedin.com
Task One elapsed time: 4.6
Task One getting URL: http://apple.com
Task Two elapsed time: 6.1
Task Two getting URL: http://microsoft.com
Task One elapsed time: 1.1
Task One getting URL: http://facebook.com
Task Two elapsed time: 2.4
Task One elapsed time: 3.2

Total elapsed time: 9.7


- Line 2 imports the aiohttp library, which provides an asynchronous way to make HTTP calls.
- Line 3 imports the the Timer code from the codetiming module.
- Line 5 marks task() as an asynchronous function.
- Line 6 creates the Timer instance used to measure the time taken for each iteration of the task loop.
- Line 7 creates an aiohttp session context manager.
- Line 8 creates an aiohttp response context manager. It also makes an HTTP GET call to the URL taken from work_queue.
- Line 11 starts the timer instance
- Line 12 uses the session to get the text retrieved from the URL asynchronously.
- Line 13 stops the timer instance and outputs the elapsed time since timer.start() was called.
- Line 39 creates a Timer context manager that will output the elapsed time the entire while loop took to execute.

Take a look at the total elapsed time, as well as the individual times to get the contents of each URL. You’ll see that the duration is about half the cumulative time of all the HTTP GET calls. This is because the HTTP GET calls are running asynchronously. In other words, you’re effectively taking better advantage of the CPU by allowing it to make multiple requests at once.

Because the CPU is so fast, this example could likely create as many tasks as there are URLs. In this case, the program’s run time would be that of the single slowest URL retrieval.