This notebook explores concurrency in Python 3.8, following the tutorial at https://realpython.com/python-concurrency/

Warning: Jupyter Notebook and especially Google Colab are not the best places to test concurrency. Crashes and unexpected behavious can occur.

#I/O Bound Concurrency

When the program spends most of the time waiting for an I/O operation, like accessing data from Hard Disk, Internet etc.

##Synchronous Version - without any deliberate concurrency

The following piece of code downloads some web pages repeatedly. The session object is used to maintain HTTP persistent connections and speed up the operation.

`download_site` collects the response from a particular URL.

`download_all_sites` runs the previous function on a list of URLs.

In [1]:
import requests
import time

def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} bytes from {url}")


def download_all_sites(urls):
    with requests.Session() as session:
        for url in urls:
            download_site(url, session)


if __name__ == "__main__":
    urls = [
        "https://www.linkedin.com/in/sagnik-biswas-7740981b6/",
        "https://github.com/sagnikbiswas"
    ] * 100
    start_time = time.time()
    download_all_sites(urls)
    time_took = time.time() - start_time
    print(f"Downloaded {len(urls)} pages in {time_took} seconds")

Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.c

##Threading Version

With `threading`, we can run multiple threads concurrently in a single process, and thus utilize the time we would otherwise waste on waiting for a response.

`ThreadPoolExecutor` creates and maintains a pool of threads defined by `max_workers` parameter. The `.map` method runs the passed function on the iterable concurrently.

`requests.Session` is not thread safe, so each thread need to acquire its own session object. 

`threading.local()` separates access of different resources to different threads, though we only call it once.

As we increase number of threads in `max_workers` we may see speedup upto a point, after which the overhead of creating new threads outweigh the benefit of concurrency.

In [2]:
import concurrent.futures
import threading


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} bytes from {url}")


def download_all_sites(urls):
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        executor.map(download_site, urls)


if __name__ == "__main__":
    urls = [
        "https://www.linkedin.com/in/sagnik-biswas-7740981b6/",
        "https://github.com/sagnikbiswas"
    ] * 100
    start_time = time.time()
    download_all_sites(urls)
    duration = time.time() - start_time
    print(f"Downloaded {len(urls)} pages in {duration} seconds")


Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1474 bytes from https://github.com/sagnikbiswas
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
Read 1529 bytes from https

Pitfalls of threading: race condition among threads. We can already see wonky stdout in the previous example.

In the below code, upto 5000 threads race to increment the global shared variable `counter` for a total of `100,000 * 100` times. But the result is (probably) less than `10,000,000`, as some thrads read the value of `counter`, but before they can increment it, some other thread increments it to an inconsistent state.

For me Google Colab consistently gives wrong result, but the effect is more subtle in local machine.

In [3]:
import concurrent.futures

counter = 0

def increment_counter(fake_value):
    global counter
    for _ in range(100):
        counter += 1


if __name__ == "__main__":
    fake_data = range(100000)
    counter = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=5000) as executor:
        executor.map(increment_counter, fake_data)
    print(counter)

7425606


##Asyncio Version

With `asyncio`, we do not need to create separate threads, but take advantage of non-preemptive multitasking, where one *task* voluntarily hands over control to other tasks, via some internal mechanism of asyncio, when it needs to wait for response from an I/O.

We need specific Python modules which support cooperation for async to work.

In [4]:
!pip install aiohttp



Need to istall `nest-asyncio` for running in Colab.

In [5]:
!pip install nest-asyncio
import nest_asyncio
nest_asyncio.apply()



In [9]:
import asyncio
import time
import aiohttp

async def download_site(session, url):
    async with session.get(url) as response:
        length = response.content_length or "Some"

        # content_length is read from http header if its present
        # otherwise None is returned

        print(f"Read {length} bytes from {url}")

async def download_all_sites(urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)

if __name__ == "__main__":
    urls = [
        "https://www.linkedin.com/in/sagnik-biswas-7740981b6/",
        "https://github.com/sagnikbiswas"
    ] * 100

    start_time = time.time()
    asyncio.get_event_loop().run_until_complete(download_all_sites(urls))
    # asyncio.run is causing RuntimeError('Event loop is closed') in Win10
    time_took = time.time() - start_time
    print(f"Downloaded {len(urls)} pages in {time_took} seconds")

By far the fastest method and without the dangers of threading.

##Multiprocessing version

Use `multiprocessing` to utilize multiple cores of a CPU. A new instance of Python interpreter is run for each process, so the overhead is very high.

In [6]:
import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} bytes from {url}")


def download_all_sites(urls):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, urls)


if __name__ == "__main__":
    urls = [
        "https://www.linkedin.com/in/sagnik-biswas-7740981b6/",
        "https://github.com/sagnikbiswas"
    ] * 100
    start_time = time.time()
    download_all_sites(urls)
    time_took = time.time() - start_time
    print(f"Downloaded {len(urls)} pages in {time_took} seconds")


ForkPoolWorker-1:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-2:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-2:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-1:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-1:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-2:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-2:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-1:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-1:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-1:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-2:Read 1529 bytes from https://www.linkedin.com/in/sagnik-biswas-7740981b6/
ForkPoolWorker-1:Read 1474 bytes from https://github.com/sagnikbiswas
ForkPoolWorker-2:Read 1474 bytes f

#CPU Bound

Where programs spend most of the time doing computation rather than waiting for I/O.

##Synchronous - Single CPU

In [7]:
import time
import multiprocessing


def cpu_bound(number):
    name = multiprocessing.current_process().name
    print(f"{name} is crunching numbers")
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [10_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")


MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
MainProcess is crunching numbers
Duration 24.826056003570557 seconds


##Multiprocessing

Where `multiprocessing` really shines is in CPU bound programs, because more CPUs or cores can do more number crunching. Here the tasks are done in a truly parallel manner.

`async` and `threading` are counterproductive here since we are already using a single CPU to its maximum and there is no idle waiting where we can sneak in more amounts of computation with concurrency. So we incur the overhead without any actual benefit.

(Google Colab can not utilize multiprocessing for some reason,. Run in local machine to see speed up)

In [8]:
import multiprocessing
import time


def cpu_bound(number):
    name = multiprocessing.current_process().name
    print(f"{name} is crunching numbers")
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [10_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")


ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-3 is crunching numbers
ForkPoolWorker-4 is crunching numbers
ForkPoolWorker-4 is crunching numbers
Duration 25.99473738670349 seconds
