# Concurrency

In [12]:
import time
import pandas as pd
import requests # make HTTP requests

In [13]:
# Create url list for IO-bound task
def webcrawl(url):
    header = {
        "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)\
        Chrome/50.0.2661.75 Safari/537.36", "X-Requested-With": "XMLHttpRequest"
    }
    req = requests.get(url, headers=header)
    
    return pd.read_html(req.text)[0]

sites = webcrawl('https://moz.com/top500')

toplist = []
# select 50 random websites from list
for url in sites['Root Domain'].iloc[0:50]: #sample(n=50):
    toplist.append('http://' + url)

    
# CPO-bound task
def cpu_task(number):
    return sum(i * i for i in range(number))

numbers = [5_000_000 + x for x in range(20)]

## Synchronous (standard)

### IO-bound

In [29]:
def download_site(url, session):
    try:
        with session.get(url) as response:
            print(f"Read {len(response.content)} characters from {url}")
            
            return response
        
    except Exception as e:
        print(e)

def download_all_sites(sites):
    site_responses = []
    with requests.Session() as session:
        for url in sites:
            site_responses.append(download_site(url, session))
    
    return site_responses


# Standard IO-bound task
start_time = time.time()
site_content = download_all_sites(toplist)

duration_s1 = round(time.time() - start_time, 3)
print(f"\nDownloaded {len(toplist)} sites in {duration_s1} seconds")

Read 163940 characters from http://microsoft.com
Read 12763 characters from http://www.google.com
Read 60157 characters from http://apple.com
Read 63669 characters from http://docs.google.com
Read 1140934 characters from http://play.google.com
Read 413847 characters from http://support.google.com
Read 94478 characters from http://www.blogger.com
Read 459659 characters from http://youtube.com
Read 63879 characters from http://sites.google.com
Read 63769 characters from http://plus.google.com
Read 108598 characters from http://adobe.com
Read 167511 characters from http://mozilla.org
Read 75672 characters from http://en.wikipedia.org
Read 63748 characters from http://accounts.google.com
Read 37649 characters from http://wordpress.org
HTTPConnectionPool(host='googleusercontent.com', port=80): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001E53E080388>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed

In [31]:
site_content[26].content

b'<!DOCTYPE html>\n<!--[if lt IE 7]> <html lang="en-us" class="a-no-js a-lt-ie9 a-lt-ie8 a-lt-ie7"> <![endif]-->\n<!--[if IE 7]>    <html lang="en-us" class="a-no-js a-lt-ie9 a-lt-ie8"> <![endif]-->\n<!--[if IE 8]>    <html lang="en-us" class="a-no-js a-lt-ie9"> <![endif]-->\n<!--[if gt IE 8]><!-->\n<html class="a-no-js" lang="en-us"><!--<![endif]--><head>\n<meta http-equiv="content-type" content="text/html; charset=UTF-8">\n<meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">\n<title dir="ltr">Robot Check</title>\n<meta name="viewport" content="width=device-width">\n<link rel="stylesheet" href="https://images-na.ssl-images-amazon.com/images/G/01/AUIClients/AmazonUI-3c913031596ca78a3768f4e934b1cc02ce238101.secure.min._V1_.css">\n<script>\n\nif (true === true) {\n    var ue_t0 = (+ new Date()),\n        ue_csm = window,\n        ue = { t0: ue_t0, d: function() { return (+new Date() - ue_t0); } },\n        ue_furl = "fls-na.amazon.com",\n        ue_mid =

<img src="https://files.realpython.com/media/IOBound.4810a888b457.png" width=500>

### CPU-bound

In [17]:
# Standard CPU-bound task
def find_sums(num):
    for number in num:
        cpu_task(number)

start_time = time.time()
find_sums(numbers)

duration_s2 = round(time.time() - start_time, 3)
print(f"Duration {duration_s2} seconds")

Duration 12.111 seconds


## Threading

###  IO-bound

In [18]:
import concurrent.futures as ccf
import threading

thread_local = threading.local()
def get_session():
    """Local storage for each individual thread needed to make session thread-safe"""
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    
    return thread_local.session


def download_site_thread(url):
    """Download urls"""
    session = get_session()
    try:
        with session.get(url) as response: 
            print(f"Read {len(response.content)} characters from {url}")        
            
            return response
        
    except Exception as e:
        print(f"Exception for {url}: {e}")
        

def download_all_sites_thread(sites):
    """Create a pool of threads and call function multiple times with map"""
    with ccf.ThreadPoolExecutor(max_workers=10) as executor:
        response = list(executor.map(download_site_thread, sites))
    
    return response


# IO-bound task using threading
start_time = time.time()

site_content = []
site_content.append(download_all_sites_thread(toplist))

duration_t1 = round(time.time() - start_time, 3)
print(f"\nDownloaded {len(toplist)} sites in {duration_t1} seconds")

Read 12767 characters from http://www.google.com
Read 413845 characters from http://support.google.com
Read 60157 characters from http://apple.com
Read 63738 characters from http://plus.google.com
Read 163940 characters from http://microsoft.com
Read 1140468 characters from http://play.google.com
Read 63677 characters from http://docs.google.com
Read 63864 characters from http://accounts.google.com
Exception for http://googleusercontent.com: HTTPConnectionPool(host='googleusercontent.com', port=80): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001E538818C08>: Failed to establish a new connection: [Errno 11001] getaddrinfo failed'))
Read 63882 characters from http://sites.google.com
Read 107576 characters from http://adobe.com
Read 75641 characters from http://en.wikipedia.org
Read 104372 characters from http://creativecommons.org
Read 368793 characters from http://youtube.com
Read 44534 characters from http://maps.goo

<img src="https://files.realpython.com/media/Threading.3eef48da829e.png" width=500>

### CPU-bound

In [19]:
# CPU-bound task using threading
def find_sums_thread(num):
    with ccf.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(cpu_task, num)

start_time = time.time()
find_sums_thread(numbers)

duration_t2 = round(time.time() - start_time, 3)
print(f"Duration {duration_t2} seconds")

Duration 15.259 seconds


## Asyncio

### IO-bound (problems in jupyter)

In [20]:
import asyncio
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites_asyn(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


# IO-bound task using asyncio
#start_time = time.time()
#asyncio.get_event_loop().run_until_complete(download_all_sites_asyn(toplist))

duration = round(time.time() - start_time, 3)
print(f"Downloaded {len(toplist)} sites in {duration} seconds")

Downloaded 50 sites in 145.138 seconds


<img src="https://files.realpython.com/media/Asyncio.31182d3731cf.png" width=500>

## Multiprocessing

In [21]:
import multiprocessing
import multiprocess_func as mpf

# Content of multiprocess_func, saved in separate file
# Multiprocess does not always work when objects are not defined in an imported module.
session = None
def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site_multi(url):
    try:
        with requests.Session().get(url) as response:
            name = multiprocessing.current_process().name
            print(f"{name}:Read {len(response.content)} characters from {url}")
    except Exception as e:
        print(f"Exception for {url}: {e}")


def download_all_sites_multi(sites):
    # Determines number of cores and runs separate Python interpreter on each of them
    with multiprocessing.Pool(initializer=set_global_session) as pool:
    #with multiprocessing.Pool() as pool:
        pool.map(download_site_multi, sites)

        
# IO-bound task using multiprocessing
start_time = time.time()
mpf.download_all_sites_multi(toplist)

duration_m1 = round(time.time() - start_time, 3)
print(f"Downloaded {len(toplist)} in {duration_m1} seconds")

Downloaded 50 in 11.416 seconds


<img src="https://files.realpython.com/media/MProc.7cf3be371bbc.png" width=500>

In [23]:
# CPU-bound task with multiprocessing
start_time = time.time()
mpf.find_sums_multi(numbers)

duration_m2 = round(time.time() - start_time, 3)
print(f"Duration {duration_m2} seconds")

Duration 8.862 seconds


In [24]:
summary = pd.DataFrame(
    {"I/O" : [duration_s1, duration_t1 ,3.602, duration_m1],
     "CPU" : [duration_s2, duration_t2, "-", duration_m2]},
    index = ["Synchronous", "Threading", "Async IO", "Multithread"])
summary

Unnamed: 0,I/O,CPU
Synchronous,31.381,12.111
Threading,4.986,15.259
Async IO,3.602,-
Multithread,11.416,8.862
