<a href="https://colab.research.google.com/github/leopedroso1/Mutiprocessing-Python/blob/main/Python_Concurrency_Asyncio_Threading_Multiprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **What Is Concurrency?**

The dictionary definition of **concurrency is simultaneous occurrence**. In Python, the things that are occurring simultaneously are called by different names **(thread, task, process)** but at a high level, they all refer to a sequence of instructions that run in order.

I like to think of them as different trains of thought. Each one can be stopped at certain points, and the CPU or brain that is processing them can switch to a different one. The state of each one is saved so it can be restarted right where it was interrupted.

You might wonder why Python uses different words for the same concept. It turns out that threads, tasks, and processes are only the same if you view them from a high level. Once you start digging into the details, they all represent slightly different things. You’ll see more of how they are different as you progress through the examples.

Now let’s talk about the simultaneous part of that definition. You have to be a little careful because, when you get down to the details, **only multiprocessing actually runs these trains of thought at literally the same time**. **Threading and asyncio both run on a single processor and therefore only run one at a time**. They just cleverly find ways to take turns to speed up the overall process. Even though they don’t run different trains of thought simultaneously, we still call this concurrency.


The way the threads or tasks take turns is the big difference between threading and asyncio. **In threading, the operating system actually knows about each thread and can interrupt it at any time to start running a different thread. This is called pre-emptive multitasking since the operating system can pre-empt your thread to make the switch.**

Pre-emptive multitasking is handy in that the code in the thread doesn’t need to do anything to make the switch. It can also be difficult because of that “at any time” phrase. This switch can happen in the middle of a single Python statement, even a trivial one like x = x + 1.

**Asyncio, on the other hand, uses cooperative multitasking. The tasks must cooperate by announcing when they are ready to be switched out. That means that the code in the task has to change slightly to make this happen.**

The **benefit of doing this extra work up front is that you always know where your task will be swapped out**. It will not be swapped out in the middle of a Python statement unless that statement is marked. You’ll see later how this can simplify parts of your design.


### **What Is Parallelism?**
So far, you’ve looked at concurrency that happens on a single processor. What about all of those CPU cores your cool, new laptop has? How can you make use of them? multiprocessing is the answer.

With multiprocessing, Python creates new processes. A process here can be thought of as almost a completely different program, though technically they’re usually defined as a collection of resources where the resources include memory, file handles and things like that. One way to think about it is that each process runs in its own Python interpreter.

Because they are different processes, each of your trains of thought in a multiprocessing program can run on a different core. Running on a different core means that they actually can run at the same time, which is fabulous. There are some complications that arise from doing this, but Python does a pretty good job of smoothing them over most of the time.

Now that you have an idea of what concurrency and parallelism are, let’s review their differences, and then we can look at why they can be useful:

###**When Is Concurrency Useful?**
Concurrency can make a big difference for two types of problems. These are generally called CPU-bound and I/O-bound.

I/O-bound problems cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise frequently when your program is working with things that are much slower than your CPU.

Examples of things that are slower than your CPU are legion, but your program thankfully does not interact with most of them. The slow things your program will interact with most frequently are the file system and network connections.

In the diagram above, the blue boxes show time when your program is doing work, and the red boxes are time spent waiting for an I/O operation to complete. This diagram is not to scale because requests on the internet can take several orders of magnitude longer than CPU instructions, so your program can end up spending most of its time waiting. This is what your browser is doing most of the time.

On the flip side, there are classes of programs that do significant computation without talking to the network or accessing a file. These are the CPU-bound programs, because the resource limiting the speed of your program is the CPU, not the network or the file system.

Here’s a corresponding diagram for a CPU-bound program:

As you work through the examples in the following section, you’ll see that different forms of concurrency work better or worse with CPU-bound and I/O-bound programs. Adding concurrency to your program adds extra code and complications, so you’ll need to decide if the potential speed up is worth the extra effort. By the end of this article, you should have enough info to start making that decision.

I/O-Bound Process	CPU-Bound Process
Your program spends most of its time talking to a slow device, like a network connection, a hard drive, or a printer.	You program spends most of its time doing CPU operations.
Speeding it up involves overlapping the times spent waiting for these devices.	Speeding it up involves finding ways to do more computations in the same amount of time.

### **How to Speed Up an I/O-Bound Program**

In [None]:
# Synchronous Version

import requests
import time

def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jyth

### **Threading**

In [1]:
# Threading Version

import concurrent.futures
import requests
import threading
import time

thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80

    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time

    print(f"Downloaded {len(sites)} in {duration} seconds")


Read 278 from http://olympus.realpython.org/dice
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 10490 from https://www.jython.org
Read 10490 from https://www.jython.org
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 278 from http://olympus.realpython.org/dice
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 10490 from https://www.jython.org
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jython.org
Read 278 from http://olympus.realpython.org/dice
Read 278 from http://olympus.realpython.org/dice
Read 10490 from https://www.jyth

In [None]:
!pip install aiohttp

Collecting aiohttp
  Downloading aiohttp-3.7.4.post0-cp37-cp37m-manylinux2014_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 6.9 MB/s 
[?25hCollecting multidict<7.0,>=4.5
  Downloading multidict-5.1.0-cp37-cp37m-manylinux2014_x86_64.whl (142 kB)
[K     |████████████████████████████████| 142 kB 68.4 MB/s 
Collecting async-timeout<4.0,>=3.0
  Downloading async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.6.3-cp37-cp37m-manylinux2014_x86_64.whl (294 kB)
[K     |████████████████████████████████| 294 kB 55.2 MB/s 
Installing collected packages: multidict, yarl, async-timeout, aiohttp
Successfully installed aiohttp-3.7.4.post0 async-timeout-3.0.1 multidict-5.1.0 yarl-1.6.3


### **Asyncio**

In [None]:
import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80

    start_time = time.time()
    await asyncio.run(download_all_sites(sites))
#    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

SyntaxError: ignored

### **Multiprocessing**

In [2]:
import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"{name}:Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

ForkPoolWorker-1:Read 10490 from https://www.jython.org
ForkPoolWorker-2:Read 10490 from https://www.jython.org
ForkPoolWorker-2:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 10490 from https://www.jython.org
ForkPoolWorker-1:Read 10490 from https://www.jython.org
ForkPoolWorker-2:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 10490 from https://www.jython.org
ForkPoolWorker-1:Read 10490 from https://www.jython.org
ForkPoolWorker-2:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 10490 from https://www.jython.org
ForkPoolWorker-1:Read 10490 from https://www.jython.org
ForkPoolWorker-2:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-1:Read 278 from http://olympus.realpython.org/dice
ForkPoolWorker-2:Read 10

## **How to Speed Up a CPU-Bound Program**

Let’s shift gears here a little bit. The examples so far have all dealt with an I/O-bound problem. Now, you’ll look into a CPU-bound problem. As you saw, an I/O-bound problem spends most of its time waiting for external operations, like a network call, to complete. A CPU-bound problem, on the other hand, does few I/O operations, and its overall execution time is a factor of how fast it can process the required data.

For the purposes of our example, we’ll use a somewhat silly function to create something that takes a long time to run on the CPU. This function computes the sum of the squares of each number from 0 to the passed-in value:

In [3]:
def cpu_bound(number):
    return sum(i * i for i in range(number))

You’ll be passing in large numbers, so this will take a while. Remember, this is just a placeholder for your code that actually does something useful and requires significant processing time, like computing the roots of equations or sorting a large data structure.

In [4]:
# Synchronous Version 

import time

def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 11.141467571258545 seconds


### **Threading and Asyncio versions**
How much do you think rewriting this code using threading or asyncio will speed this up?

If you answered “Not at all,” give yourself a cookie. If you answered, “It will slow it down,” give yourself two cookies.

Here’s why: In your I/O-bound example above, much of the overall time was spent waiting for slow operations to finish. threading and asyncio sped this up by allowing you to overlap the times you were waiting instead of doing them sequentially.

On a CPU-bound problem, however, there is no waiting. The CPU is cranking away as fast as it can to finish the problem. In Python, both threads and tasks run on the same CPU in the same process. That means that the one CPU is doing all of the work of the non-concurrent code plus the extra work of setting up threads or tasks. It takes more than 10 seconds:

In [3]:
# Threading 

import concurrent.futures
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 10.699211597442627 seconds


### **CPU-Bound multiprocessing Version**
Now you’ve finally reached where multiprocessing really shines. Unlike the other concurrency libraries, multiprocessing is explicitly designed to share heavy CPU workloads across multiple CPUs. Here’s what its execution timing diagram looks like:

In [1]:
import multiprocessing
import time

def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Duration 12.872897863388062 seconds


Little of this code had to change from the non-concurrent version. You had to import multiprocessing and then just change from looping through the numbers to creating a multiprocessing.Pool object and using its .map() method to send individual numbers to worker-processes as they become free.

This was just what you did for the I/O-bound multiprocessing code, but here you don’t need to worry about the Session object.

As mentioned above, the processes optional parameter to the multiprocessing.Pool() constructor deserves some attention. You can specify how many Process objects you want created and managed in the Pool. By default, it will determine how many CPUs are in your machine and create a process for each one. While this works great for our simple example, you might want to have a little more control in a production environment.

Also, as we mentioned in the first section about threading, the multiprocessing.Pool code is built upon building blocks like Queue and Semaphore that will be familiar to those of you who have done multithreaded and multiprocessing code in other languages.

Why the multiprocessing Version Rocks

The multiprocessing version of this example is great because it’s relatively easy to set up and requires little extra code. It also takes full advantage of the CPU power in your computer.

Hey, that’s exactly what I said the last time we looked at multiprocessing. The big difference is that this time it is clearly the best option. It takes 2.5 seconds on my machine: