# Concurrent Programming

In Python std lib:
- async programming (python 3.6): async/await
    - Example: FastAPI
- multi-threading (GIL lock)
- multi-processing

https://docs.python.org/3/library/concurrency.html

External libraries: Numeric + Data Science + AI (CPU/GPU)

## Example with thread pool and future

In [38]:
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    # NB: jobs are handled with a dictionnary
    # - key: job (future)
    # - value: url for this job
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}

    # wait for the completion of each job int the completion order
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

'http://nonexistent-subdomain.python.org/' generated an exception: <urlopen error [Errno 11001] getaddrinfo failed>
'http://www.bbc.co.uk/' page is 568581 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.foxnews.com/' page is 701473 bytes
'http://www.cnn.com/' page is 3432124 bytes


In [12]:
page = load_url(URLS[1], 60)
print(len(page))
print(page[:50])

3447255
b'  <!DOCTYPE html>\n<html lang="en" data-uri="cms.cn'


In [16]:
# pool of 5 threads
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
executor

<concurrent.futures.thread.ThreadPoolExecutor at 0x1a3b2675fd0>

In [20]:
# submit 1 job
job = executor.submit(load_url, URLS[1], 60)
job

<Future at 0x1a3b4754560 state=pending>

In [22]:
job

<Future at 0x1a3b4754560 state=finished returned bytes>

In [24]:
# wait and fetch result
page = job.result()
print(page[:50])

b'  <!DOCTYPE html>\n<html lang="en" data-uri="cms.cn'


In [28]:
urls_ok = [URLS[0], URLS[1], URLS[3]]
jobs = []

# submit all jobs
for url in urls_ok:
    jobs.append(executor.submit(load_url, url=url, timeout=60))
print('Jobs submitted:', jobs)
print()

# wait for all results
pages = [ job.result() for job in jobs ]
print('Results:')
for page in pages:
    print(page[:200])
    print()
    
    

Jobs submitted: [<Future at 0x1a3b46ecda0 state=pending>, <Future at 0x1a3b5f22330 state=pending>, <Future at 0x1a3b5f20e00 state=pending>]

Results:
b'<!DOCTYPE html><html lang="en"><head>\n\n  <meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=edge">\n<meta name="viewport" content="width=device-width, minimum-scale=1, initial-scale='

b'  <!DOCTYPE html>\n<html lang="en" data-uri="cms.cnn.com/_pages/clg35wfph000047qb0ndy7s77@published" data-layout-uri="cms.cnn.com/_layouts/layout-homepage/instances/homepage-international@published" >\n'

b'<!DOCTYPE html><html lang="en-GB" class="no-js"><head><meta charSet="utf-8" /><meta name="viewport" content="width=device-width, initial-scale=1" /><title data-rh="true">BBC - Home</title><meta data-r'



In [34]:
pages = executor.map(
    lambda url: load_url(url, 60),
    urls_ok
)

for page in pages:
    print(page[:200])
    print()

b'<!DOCTYPE html><html lang="en"><head>\n\n  <meta charset="utf-8">\n<meta http-equiv="X-UA-Compatible" content="IE=edge">\n<meta name="viewport" content="width=device-width, minimum-scale=1, initial-scale='

b'  <!DOCTYPE html>\n<html lang="en" data-uri="cms.cnn.com/_pages/clg35wfph000047qb0ndy7s77@published" data-layout-uri="cms.cnn.com/_layouts/layout-homepage/instances/homepage-international@published" >\n'

b'<!DOCTYPE html><html lang="en-GB" class="no-js"><head><meta charSet="utf-8" /><meta name="viewport" content="width=device-width, initial-scale=1" /><title data-rh="true">BBC - Home</title><meta data-r'



In [None]:
executor.shutdown()

## GIL Lock
until python 3.12, a lock is created (GIL) on the HEAP

In [45]:
n = 100_000
data = list(range(n))

In [49]:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    nb_job = 10
    chunk_size = n // nb_job
    jobs = []
    # submit jobs
    for i in range(nb_job):
        job = executor.submit(
            lambda i: sum(data[i*chunk_size:(i+1)*chunk_size]),
            i
        )
        jobs.append(job)
    # wait for results
    results = [job.result() for job in jobs]
print(results)

[49995000, 149995000, 249995000, 349995000, 449995000, 549995000, 649995000, 749995000, 849995000, 949995000]


In [51]:
def concurrent_sum(data):
    n = len(data)
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        nb_job = 10
        chunk_size = n // nb_job
        jobs = []
        # submit jobs
        for i in range(nb_job):
            job = executor.submit(
                lambda i: sum(data[i*chunk_size:(i+1)*chunk_size]),
                i
            )
            jobs.append(job)
        # wait for results
        return [job.result() for job in jobs]

In [53]:
%timeit concurrent_sum(data)

3.29 ms ± 644 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [55]:
%timeit sum(data)

1.95 ms ± 283 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


concurrent version is less efficient due to GIL lock (+ distribution overhead)

## async/await
Example: FastApi

https://fastapi.tiangolo.com/async/