# TD6 - Concurrence et parallélisme
Source des explications et des images : https://realpython.com/python-concurrency/#how-to-speed-up-a-cpu-bound-program


Les programmes sont généralement exécutés sur un seul coeur de l'ordinateur à la fois, et de manière successive. Cependant, il se peut que l'exécution de ces programmes prennent plus de temps que nécessaire, en raison de facteurs tels que :

* le programme attend la réponse d'un autre programme, e.g. requête dans une base de donnée
* la tâche à effectuer est intensive, e.g. création de fichiers, calculs, etc. 



In [1]:
# le programme ci-dessous peut prendre du temps
import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    # ici on télécharge 80 fois ces URLs
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jyth

## Types de blocages


On peut identifier deux types de blocages : 
* liés à l'entrée/sortie (Input-Output bound)
* liés au temps de calcul du processeur (CPU-bound)

### Input-Output bound

Le programme est ralenti car il attend un signal (input-output) d'un autre programme, ressource extérieure, qui est plus lent que le processeur (ex : fichiers, réseau...). Dans l'image ci-dessous, en bleu le temps que le programme effectue une action, et en rouge le temps que le programme attend une réponse d'une source externe. Pour accélerer le programme, il faudrait pouvoir le débloquer pendant qu'il attend une ressource de la source externe. Il s'agit alors d'un problème de **concurrence**.

<img src="io.png">

### CPU-bound

Le programme est ralenti, ou surtout, ne peut pas aller plus vite, puisqu'il est déjà occupé par une opération, qui demande beaucoup de ressources (ex: calculs). Ici, la limite est bien celle du processeur lui-même. Pour accélerer le programme, il faudrait pouvoir faire en sorte de lui allouer plus de ressources, par exemple en exécutant plusieurs tâches en même temps. Il s'agit alors d'un problème de **parallélisme**.

<img src="cpu.png">


## Concurrence

### Threading 

En Python, la concurrence est réalisable via les threads. Les threads en Python sont des processus qui se déclenchent les uns à la suite des autres, dès qu'un autre thread est mis en pause. Les différentes tâches du programme peuvent alors sembler se dérouler en même temps, même si ça n'est en fait pas le cas : ils ne font que ce succéder les uns aux autres. 

En Python, les threads sont réalisables via le module "threading".

Documentation sur les threads en Python : https://realpython.com/intro-to-python-threading/ 

In [2]:
import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()

# thread_local permet de séparer l'accès à la Session 

def get_session():

    # dans ce code, on utilise requests.Session() pour faire des requêtes HTTP. 
    # Chaque thread a besoin de créer sa propre Session
    # La session est créée la première fois que le Thread appelle cette fonction. Ensuite, il la recherche dans la variable 
    # thread_local
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session

def download_site(url):
    # 
    session = get_session()
    with session.get(url) as response:
        print(f"Read {len(response.content)} from {url}")


def download_all_sites(sites):
    # ThreadPoolExecutor crée une piscine (Pool) de Threads, chacun étant associé à une tâche. 
    # l'Executor se charge de contrôler quand et comment chaque Thread se déclenche
    # max_workers indique le nombre de Threads à créer

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

        # la fonction map permet d'appliquer la fonction à chaque élement de l'iterable (comme la fonction map())
        executor.map(download_site, sites)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    # on donne la liste de sites à télécharger
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} in {duration} seconds")

Read 10782 from https://www.jython.orgRead 10782 from https://www.jython.orgRead 10782 from https://www.jython.org


Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/diceRead 277 from http://olympus.realpython.org/dice

Read 277 from http://olympus.realpython.org/dice
Read 277 from http://olympus.realpython.org/diceRead 277 from http://olympus.realpython.org/diceRead 277 from http://olympus.realpython.org/dice

Read 10782 from https://www.jython.org

Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Read 277 from http://olympus.realpython.org/dice
Read 10782 from https://www.jython.org
Rea

### Asyncio 

La concurrence est également réalisables via le module "asyncio", qui permet de déclencher les programmes de manières asynchrone. 

Le concept général d'asyncio est qu'un seul objet Python, appelé boucle d'événements, contrôle comment et quand chaque tâche est exécutée. La boucle d'événements est consciente de chaque tâche et sait dans quel état elle se trouve. En réalité, il existe de nombreux états dans lesquels les tâches peuvent se trouver.

L'état "prêt" indique qu'une tâche a du travail à faire et qu'elle est prête à être exécutée, tandis que l'état "en attente" signifie que la tâche attend qu'un élément extérieur se termine, par exemple une opération réseau.

Votre boucle d'événements simplifiée maintient deux listes de tâches, une pour chacun de ces états. Elle sélectionne l'une des tâches prêtes et la remet en marche. Cette tâche a le contrôle total jusqu'à ce qu'elle remette le contrôle à la boucle d'événements de manière coopérative.

Lorsque la tâche en cours d'exécution redonne le contrôle à la boucle d'événements, celle-ci place cette tâche dans la liste des tâches prêtes ou en attente, puis passe en revue chacune des tâches de la liste d'attente pour voir si elle est devenue prête à la suite de l'achèvement d'une opération d'entrée-sortie. Elle sait que les tâches de la liste des tâches prêtes sont toujours prêtes car elle sait qu'elles n'ont pas encore été exécutées.

Une fois que toutes les tâches ont été à nouveau triées dans la bonne liste, la boucle d'événements choisit la tâche suivante à exécuter, et le processus se répète. Votre boucle d'événements simplifiée choisit la tâche qui a attendu le plus longtemps et l'exécute. Ce processus se répète jusqu'à ce que la boucle d'événements soit terminée.

Un point important de de asyncio est que les tâches n'abandonnent jamais le contrôle sans le faire intentionnellement. Elles ne sont jamais interrompues au milieu d'une opération. Cela nous permet de partager les ressources un peu plus facilement avec asyncio qu'avec threading. Vous n'avez pas à vous soucier de rendre votre code sûr pour les threads.

### async et await

asyncio repose notamment sur les mots clés async et await. await permet à la fonction de rendre le contrôle à la boucle d'événements. async est utilisé avant la définition d'une fonction, et sert principalement à indiquer à Python que await sera utilisé (mais pas que).

Note : le code ci-dessous ne peut pas fonctionner dans un notebook Jupyter. Créez un fichier "asynchrone.py", dans lequel vous copiez-collerez ce code. Executez ensuite ce fichier. 

Documentation d'asyncio : 
* https://realpython.com/async-io-python/#where-does-async-io-fit-in
* https://docs.python.org/3/library/asyncio.html


In [2]:
!pip install nest-asyncio
!pip install aiohttp



In [1]:
import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print("Read {0} from {1}".format(response.content_length, url))


async def download_all_sites(sites):
    # contrairement à Threading, on peut partager la Session avec les différentes tâches
    # on la crée donc via un context manager (with)
    async with aiohttp.ClientSession() as session:

        tasks = []
        # asyncio.ensure_future crée une liste de tâche à réaliser, et s'assure de les déclencher 
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        # asyncio.gather attend que toutes les tâches soient terminées, avant de ferme la Session
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = [
        "https://www.jython.org",
        "http://olympus.realpython.org/dice",
    ] * 80
    start_time = time.time()
    # déclenche la boucle d'événements, qui tournera jusqu'à ce que la tâche soit accomplie
    asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"Downloaded {len(sites)} sites in {duration} seconds")

RuntimeError: This event loop is already running

## Code asynchrone avec Flask

Source : https://progressstory.com/tech/python/async-requests-with-flask/ 

Un des intérêts principaux de l'asynchrone est sont emploi avec le Web. Par défaut, Flask est synchrone, mais l'extension "async" lui permet d'utiliser async et await.

Installez Flask[async], créez un dossier "app", puis un fichier "hello.py" dans ce dossier, avant de copier-coller le code ci-dessous dedans. Executez ce code et accédez à l'URL "/comic"

In [5]:
!pip install "Flask[async]"

Collecting asgiref>=3.2
  Downloading asgiref-3.7.2-py3-none-any.whl (24 kB)
Installing collected packages: asgiref
Successfully installed asgiref-3.7.2


In [None]:
import time
from random import randint
import requests as requests
from flask import Flask


app = Flask(__name__)


def get_xkcd_image():
    # choisi un identifiant au hasard et collecte une image sur une API
    random = randint(0, 300)
    response = requests.get(f'http://xkcd.com/{random}/info.0.json')
    return response.json()['img']


@app.get('/comic')
def hello():
    # on démarre le compteur, pour voir le temps que prend le process
    start = time.perf_counter()

    url = get_xkcd_image()
    end = time.perf_counter()
    return f"""
        Time taken: {end-start}<br><br>
        <img src="{url}"></img>
    """


if __name__ == '__main__':
    app.run(debug=True)

Ajoutez le code ci-dessous à "hello.py", puis accédez à l'URL "/comic2". Le chargement des images prend significativement plus de temps. 

In [None]:
def get_multiple_images(number):
    return [get_xkcd_image() for _ in range(number)]


@app.get('/comic2')
def hello2():
    start = time.perf_counter()
    # ici on télécharge 5 images, ce qui va prendre beaucoup plus de temps
    urls = get_multiple_images(5)
    end = time.perf_counter()

    markup = f"Time taken: {end-start}<br><br>"
    for url in urls:
        markup += f'<img src="{url}"></img><br><br>'

    return markup


La librairie request n'est pas pas asynchrone. Pour cela, il nous faut des librairies adaptées, telle que "httpx" ou "aiohttp"
Installez httpx

Copiez-collez le code ci-dessous dans un fichier "hello2.py" dans le dossier app, puis exécutez ce fichier. 

In [None]:
!pip install httpx

In [None]:
import asyncio
import time
from random import randint
import httpx
from flask import Flask


app = Flask(__name__)

# version asynchrone de la fonction
async def get_xkcd_image(session):
    random = randint(0, 300)
    # ici on redonne le contrôle à la boucle d'événement le temps que l'API donne une réponse
    result = await session.get(f'http://xkcd.com/{random}/info.0.json') # dont wait for the response of API
    return result.json()['img']

# function converted to coroutine
async def get_multiple_images(number):
    # on ouvre la Session, qui cette fois est asynchrone
    async with httpx.AsyncClient() as session: 
        # on crée les tâches 
        tasks = [get_xkcd_image(session) for _ in range(number)]
        # on attend que les tâches soient réalisées
        result = await asyncio.gather(*tasks, return_exceptions=True)
    return result


@app.get('/comic')
async def hello():
    start = time.perf_counter()
    urls = await get_multiple_images(5)
    end = time.perf_counter()
    markup = f"Time taken: {end-start}<br><br>"
    for url in urls:
        markup += f'<img src="{url}"></img><br><br>'

    return markup


if __name__ == '__main__':
    app.run(debug=True)

### Note

Bien que Flask puisse gérer l'asynchrone, ce n'est pas le framework le plus adapté pour ce genre de tâches. D'autres le sont plus, tels que :
* Quart (réimplémentation de Flask qui intégère l'asynchrone)
* Django
* FastAPI

## Parallélisme 

Le parallélisme permet notamment de résoudre des problèmes liés au processeur (CPU-bound). Le threading et asyncio ne peuvent résoudre ce type de problème puisqu'ils n'emploient qu'un seul processeur à la fois, tandis que le parallélisme profite des nombreux coeurs de l'ordinateur. Pour cela, un nouvel interpréteur Python est ouvert pour chaque coeur. Chaque interpréteur se charge d'une tâche, indépendamment des autres. 

<img src="multiprocessing.png">


In [3]:
# ce code est très long a exécuter de manière non-concurrente
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


numbers = [5000000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds")

Duration 6.995326042175293 seconds


En Python, le parallélisme se fait par le module "multiprocessing". Bien qu'il existe des usages plus avancés, il suffit d'indiquer dans un context manager le nombre de coeur à employer, et à assigner la fonction à l'itérable. 

Note : le code ci-dessous ne fonctionne pas dans un notebook. Copiez-collez le dans un fichier 'multiproc.py', que vous exécuterez.

Documentation : https://docs.python.org/3/library/multiprocessing.html 

In [3]:
import time
import multiprocessing

def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    # ici on crée une piscine (Pool), qui assignera une tâche à un nombre donné de coeur
    # on peut préciser le nombre de coeur à employer. Par défaut, Pool() utilisera le nombre
    # maximal possible
    with multiprocessing.Pool() as pool:
        # map assigne la fonction a chaque valeur de l'iterable
        # chaque coeur réalise ensuite la tâche, puis en commence une autre
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5000000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Process SpawnPoolWorker-29:
Process SpawnPoolWorker-30:
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'cpu_bound' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.

KeyboardInterrupt: 

La fonction map() ne peut prendre que la fonction et un seul argument. Afin d'employer une fonction à plusieurs arguments, il faut soit :
* utiliser la fonction partial() de la librairie functools
* utiliser une fonction lambda

In [None]:
import time
import multiprocessing
from functools import partial

def cpu_bound(number, number2):
    return sum(i * i + number2 for i in range(number))


def find_sums(numbers):
    # ici on crée une piscine (Pool), qui assignera une tâche à un nombre donné de coeur
    # on peut préciser le nombre de coeur à employer. Par défaut, Pool() utilisera le nombre
    # maximal possible
    with multiprocessing.Pool() as pool:
        # partial prend d'abord la fonction qui sera appelée, puis tous les arguments
        # autres que le 1er argument
        partial_func = partial(cpu_bound, number2=10)
        # on donne à map() la fonction partial() et l'itérable qui contient le premier argument de la fonction
        pool.map(partial_func, numbers)


if __name__ == "__main__":
    numbers = [5000000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Process SpawnPoolWorker-29:
Process SpawnPoolWorker-30:
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'cpu_bound' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.

KeyboardInterrupt: 

In [None]:
import time
import multiprocessing
from functools import partial

def cpu_bound(number, number2):
    return sum(i * i + number2 for i in range(number))


def find_sums(numbers):
    # ici on crée une piscine (Pool), qui assignera une tâche à un nombre donné de coeur
    # on peut préciser le nombre de coeur à employer. Par défaut, Pool() utilisera le nombre
    # maximal possible
    with multiprocessing.Pool() as pool:
        # on crée une fonction lambda, dans laquelle on appelle la fonction cpu_bound(), dans la fonction map.
        #  Le x correspond aux valeurs de l'itérable
        pool.map(lambda x: cpu_bound(x, number2=10), numbers)


if __name__ == "__main__":
    numbers = [5000000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

Process SpawnPoolWorker-29:
Process SpawnPoolWorker-30:
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'cpu_bound' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/nicolasgutehrle/opt/anaconda3/envs/cours/lib/python3.

KeyboardInterrupt: 