# Programmation concurrente

In [57]:
from concurrent.futures import (
    Future,
    ThreadPoolExecutor, 
    ProcessPoolExecutor,
    as_completed,
)
import os
import requests
import time # sleep
import random
import math

# async programming
import asyncio
import aiohttp

## Module: concurrent.futures
- ordonnanceur de tâches utilisant un pool de process ou de thread (choix taille)
- objet Future gerant le job et vson futur résultat

In [12]:
cpu_count = os.cpu_count()
process_cpu_count = os.process_cpu_count() # même fonction que cpu_count
# default_workers =  min(32, cpu_count + 4) # Python 3.5+
default_workers =  min(32, (process_cpu_count or 1) + 4) # Python 3.13+
print('Nb coeurs:', cpu_count)
print('Nb workers:', default_workers)

Nb coeurs: 16
Nb workers: 20


In [25]:
pool = ThreadPoolExecutor(max_workers=5)
pool

<concurrent.futures.thread.ThreadPoolExecutor at 0x1f5cae60690>

In [29]:
pool.shutdown()

In [32]:
urls = [
    "https://docs.python.org/3/library/concurrent.futures.html",
    "https://numpy.org/doc/stable/reference/index.html",
    "https://pandas.pydata.org/docs/reference/index.html",
    "https://nourl.nourl"
]
    

In [36]:
def get_url_v0(url):
    try:
        r = requests.get(url)
        if r.status_code == 200:
            print('OK:', r.text[:50])
        else:
            print(f'Error: {r.status_code}')
    except:
        print(f'Error: exception')

In [37]:
def get_url(url):
    time.sleep(random.randint(2,10))
    try:
        r = requests.get(url)
        if r.status_code == 200:
            return r.text
        else:
            return None
    except:
        return None

In [38]:
# main thread
with ThreadPoolExecutor(max_workers=5) as pool:
    jobs = [ 
        pool.submit(get_url, url) # started in a different thread
        for url in urls
    ]
    print(jobs)

    # attente explicite (avec timeout eventuel):
    results = [ job.result() for job in jobs ]
        
# __exit__ : pool.shutdown() # defaut: wait sur tous les jobs
print(jobs)
# results = [ job.result() for job in jobs ] # sans attente
[ (r[:10] if r is not None else 'KO') for r in results ]

[<Future at 0x17e5d84e8d0 state=running>, <Future at 0x17e5d84cfd0 state=running>, <Future at 0x17e5d84e4d0 state=running>, <Future at 0x17e5d84e850 state=running>]
[<Future at 0x17e5d84e8d0 state=finished returned str>, <Future at 0x17e5d84cfd0 state=finished returned str>, <Future at 0x17e5d84e4d0 state=finished returned str>, <Future at 0x17e5d84e850 state=finished returned NoneType>]


['<!DOCTYPE ', '\n<!DOCTYPE', '\n<!DOCTYPE', 'KO']

In [39]:
# main thread
with ThreadPoolExecutor(max_workers=5) as pool:
    results = list(pool.map(get_url, urls)) # list consomme les résultats et fait l'attente
[ (r[:10] if r is not None else 'KO') for r in results ]

['<!DOCTYPE ', '\n<!DOCTYPE', '\n<!DOCTYPE', 'KO']

## Traitement au fil de l'eau
iterateur: `as_completed`

In [40]:
doc_sources = {
    "python": "https://docs.python.org/3/library/concurrent.futures.html",
    "numpy": "https://numpy.org/doc/stable/reference/index.html",
    "pandas": "https://pandas.pydata.org/docs/reference/index.html",
    "dummy": "https://nourl.nourl",
    "requests": "https://pypi.org/project/requests/",
    "beautiful soup": "https://tedboy.github.io/bs4_doc",
    "scrapy": "https://docs.scrapy.org/en/latest/",
}

In [46]:
with ThreadPoolExecutor(max_workers=3) as pool:
    job_dict: dict[Future, str] = {}
    for doc_name, url in doc_sources.items():
        job = pool.submit(get_url, url)
        job_dict[job] = doc_name
    print('Jobs started:', job_dict)
    print()
    
    # traiter les jobs dans l'ordre de finition
    for job in as_completed(job_dict.keys()):
        source = job_dict[job]
        print(f'* Finished [{source}]: {job}')
        result = job.result() # immediate (job already finished)
        print('Result (extract):')
        print((result[:100] if result is not None else 'KO'))
        print()

Jobs started: {<Future at 0x17e5d838650 state=running>: 'python', <Future at 0x17e5d84e5d0 state=running>: 'numpy', <Future at 0x17e5d84db50 state=running>: 'pandas', <Future at 0x17e5d84ca50 state=pending>: 'dummy', <Future at 0x17e5d84d250 state=pending>: 'requests', <Future at 0x17e5d84dad0 state=pending>: 'beautiful soup', <Future at 0x17e5d84f6d0 state=pending>: 'scrapy'}

* Finished [pandas]: <Future at 0x17e5d84db50 state=finished returned str>
Result (extract):

<!DOCTYPE html>


<html lang="en" data-content_root="../" >

  <head>
    <meta charset="utf-8" />


* Finished [numpy]: <Future at 0x17e5d84e5d0 state=finished returned str>
Result (extract):

<!DOCTYPE html>


<html lang="en" data-content_root="../" data-theme="light">

  <head>
    <meta c

* Finished [python]: <Future at 0x17e5d838650 state=finished returned str>
Result (extract):
<!DOCTYPE html>

<html lang="en" data-content_root="../">
  <head>
    <meta charset="utf-8" />
    

* Finished [dummy]: <Future at 0x17

## Async programming
voir programme python asunc_downloader.py

## Calcul concurrent

In [2]:
PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

In [4]:
def calcul_monothread(primes):
    return [is_prime(n) for n in primes]

In [10]:
%timeit -n1 -r7 calcul_monothread(PRIMES)

2.39 s ± 60.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
def calcul_multithread(primes):
    with ThreadPoolExecutor() as pool:
        return list(pool.map(is_prime, primes))

In [20]:
# AIE AIE AIE GIL is here
%timeit -n1 -r7 calcul_multithread(PRIMES)

2.56 s ± 186 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [24]:
def calcul_multiprocess(primes):
    with ProcessPoolExecutor() as pool:
        return list(pool.map(is_prime, primes))

# A executer dans un vrai code python protegé par (if __name__ == "__main__") pour que le fork ce passe bien