Mulitprocessing è il modulo che permette di creare e gestire più processi in python.
Le classi principali sono:
- Process
- Queue
- Lock

In [49]:
import multiprocessing as mp

In [50]:
print(f"The number of cores is {mp.cpu_count()}")

The number of cores is 8


## Process
permette di creare un processo figlio del processo principale.

Le due funzioni principali sono:
- start() : Il processo parte e ritorna il suo risultato
- join() : Per terminare il processo dopo che ha fatto il suo lavoro (se non usato il processo rimane IDLE a vita)

se si vuole passare degli argomenti al processo bisogna usare la keyword args.

**Si noti come non sempre i processi vengono usati nell'ordine impartito**
Ricorda che noi generiamo solo diversi processi ma è lo scheduler del sistema operativo che li distribuisce sulle cpu. Potrebbero comunque essere lanciati tutti sulla stessa cpu ma tipicamente gli scheduler sono furbi

In [3]:
def stampa(string="ciao"):
    print(f"La stringa è {string}")

nomi=["pippo", "pluto", "paperino"]

#Istanzio classe process (non so perchè ci vuole virgola in args ma ci vuole)
procs=[mp.Process(target=stampa, args=(str(i),)) for i in range(40)]
for i in procs:
    i.start()
for i in procs:
    i.join()

La stringa è 0
La stringa è 1La stringa è 2
La stringa è 3

La stringa è 4La stringa è 5

La stringa è 6
La stringa è 7
La stringa è 8
La stringa è 9
La stringa è 10
La stringa è 11
La stringa è 12
La stringa è 13
La stringa è 14
La stringa è 15
La stringa è 16
La stringa è 17
La stringa è 18
La stringa è 19
La stringa è 20
La stringa è 21
La stringa è 22
La stringa è 23
La stringa è 24
La stringa è 25La stringa è 27La stringa è 26
La stringa è 29
La stringa è 28


La stringa è 30La stringa è 31
La stringa è 32

La stringa è 33La stringa è 34

La stringa è 36La stringa è 35La stringa è 37


La stringa è 38
La stringa è 39


## Queue
Una queue è una struttura dati First in First out (FIFO) process e thread safe che può contenere qualsiasi oggetto python (in realtà solo quelli pickleable).

Questa struttura è molto utile per condividere dati tra i processi soprattutto se passata come paramentro a un Process

Le funzioni principali sono:
- put() : Inserisce un oggetto nella coda
- get() : Preleva un oggetto dalla coda

Nell'esempio qui sotto la versione seriale è più veloce in quanto il task è molto semplice e c'è un overhead enorme nella creazione dei processi e causa un effort più grande dell' operazione stessa

In [11]:
import time
def somma_2(num,queue):
    res=num+2
    queue.put(res)
N=100
nums=[i for i in range(N)]

def parallel_sum():
    queue=mp.Queue()
    procs=[mp.Process(target=somma_2, args=(num,queue)) for num in nums]
    for p in procs:
        p.start()
    for p in procs:
        p.join()

    results=[queue.get() for p in procs]
    print(results)

def serial_sum():
    queue=mp.Queue()
    for i in nums:
        somma_2(i,queue)
    results=[queue.get() for i in nums]
    print(results)

start_time=time.time()
parallel_sum()
print(f"Time for parallel version: {time.time()-start_time}")
start_time=time.time()
serial_sum()
print(f"Time for serial version: {time.time()-start_time}")



[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 25, 26, 24, 29, 27, 30, 31, 28, 32, 33, 34, 35, 36, 37, 38, 40, 39, 41, 43, 42, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101]
Time for parallel version: 0.9140706062316895
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101]
Time for serial version: 0.005970954895019531


## Pool
La classe pool può essere usata per runnare una funzione in parallelo con input multipli.

NB: pool.map ritorna e ha come argomento solo iterabili

La differenza tra map e map_async è che map_async non aspetta che tutti i processi finiscano prima di ritornare. Nel caso asincrono quando si chiama .get se i processi non sono terminati il programma attende la fine dell'esecuzione

Oltre le funzioni map e map_async si possono usare anche le funzioni apply e apply_async. Su queste funzioni si possono passare anche una funzione di callback che viene eseguita quando il processo termina. Inoltre ci sono alcune differenze sui tempi di esecuzione quando vengono passate strutture più complesse come dizionari

In [15]:
# L'esempio di prima può essere effettuato con i pool
#Il numero in pool è il numero di processi
nums=[i for i in range(10)]
p=mp.Pool(2)
def somma_2_2(num):
    time.sleep(0.5)
    return num+2

results=p.map(somma_2_2,nums)
print("Sincrono")
print(results)
results=p.map_async(somma_2_2,nums)
## Avendo usato map async results è un oggetto asincrono che continua a runnare finchè non è completato.
## Il comando .get ritorna il risulatato.
print("Asincrono")
print(results.get())

Sincrono
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Asincrono
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]


## Comunicazione tra i processi
**Bisogna ricordare che processi diversi hanno spazi di memoria separati quindi non possiamo far comunicare i processi (o anche solo scrivere i risultati) su una variabile globale**

Ci sono diversi modi per far comunicare i processi:
- Queue: come abbiamo gia viso. Un processo fa il put e inserisce i dati un altro processo fa il get
- Shared Memory: Possiamo usare una memoria condivisa tra i processi (anche se in questi spazi di memoria possiamo definire solo degli array o delle variabili singole).
  **NOTA che esiste un modulo shared_memory in multiprocessing. Potrebbe essere molto utile a risolvere questo problema e a diminuire lo spazio di allocazione**
- Server process: Un server process è un processo che gestisce la memoria di tutti i sotto processi. Il vantaggio è che può operare su qualsiasi struttura dati e i processi possono essere anche distribuiti su una rete
- Pipe: Una pipe è semplicemente una connessione tra 2 oggetti. Non è una struttura dati ed è l'elemento più semplice da usare quindi preferibile quando possibile (più veloce)

Questi esempi sono abbastanza inutile poichè delego un compito semplice a un solo processo quindi non c'è neanche parallelismo ma è per far capire la sintassi

In [20]:
#SHARED MEMORY
def sum_2(num_list,result,result_sum):
    for idx,num in enumerate(num_list):
        result[idx]=num+2
    result_sum=sum(num_list)

nums=[1,2,3,4,5,6.4,6.8,5.3]
##Si crea un array nella shared memory: bisogna specificare tipo e size (array  inizializzato a 0)
result=mp.Array('f',len(nums))
# Per il singolo valore funziona nello stesso modo
result_sum=mp.Value('f')

p=mp.Process(target=sum_2, args=(nums,result,result_sum))
p.start()
p.join()

#Per richiamare i valori serve uno slicing per gli array e un .value per i valori
print(f"Array:{result[:]}, sum of array:{result_sum.value}")


Array:[3.0, 4.0, 5.0, 6.0, 7.0, 8.399999618530273, 8.800000190734863, 7.300000190734863], sum of array:0.0


In [31]:
# Server Process
def sum_2(num_list,ns):
    for nums in num_list:
        (ns.result).append(nums+2)
manager=mp.Manager()

#E' possibile creare manualmente dei namespace
ns=manager.Namespace()
#Tutte le strutture dati vanno create in queso modo (manager.list, manager.dict ,etc...)
ns.result=manager.list()

p=mp.Process(target=sum_2, args=(nums,ns))
p.start()
p.join()
print(ns.result)

[3, 4, 5, 6, 7, 8.4, 8.8, 7.3]


In [41]:
#Pipe. Si può mandare l'informazione all'altra connessione con .send() e ruceverla con .recv()
conn1,conn2=mp.Pipe()

#mando a conn la stringa
def print_1(nome,conn):
    conn.send(nome)
    print(f"Ciao {nome} questa funzione è print_1")
def print_2(conn):
    nome=conn.recv()
    print(f"Attenzione: E' stato eseguito l'accesso da {nome}")
p1=mp.Process(target=print_1, args=("Piero",conn1))
p2=mp.Process(target=print_2, args=(conn2,))
p1.start()
p2.start()
p2.join()
p1.join()

Ciao Piero questa funzione è print_1
Attenzione: E' stato eseguito l'accesso da Piero


## Sincronizzazione tra i processi
Potrebbero accadere grandi disastri se processi diversi vogliono modificare gli stessi dati contemporaneamente. Se i processi devono agire sugli stessi dati devono essere sincronizzati altrimenti avviene quello che si chiama race condition

Questo è possibile tramite un lock. Un lock è una variabile condivisa che permette di bloccare un processo per permettere l'accesso ai dati.

Quando il lock viene acquisito da un processo gli altri processi non eseguono operazioni. Quando il lock viene rilasciato gli altri processi possono acquisire il lock ed eseguire le operazioni

In [51]:
#Esempio con e senza lock.
def withdraw(balance,lock,lock_bool=True):
    for i in range(10000):
        if lock_bool:
            lock.acquire()
            balance.value-=1
            lock.release()
        else:
            balance.value-=1
def deposit(balance,lock,lock_bool=True):
    for i in range(10000):
        if lock_bool:
            lock.acquire()
            balance.value+=1
            lock.release()
        else:
            balance.value+=1
def loop(N,lock_bool=True):
    for i in range(N):
        balance=mp.Value("i",100)
        lock=mp.Lock()
        p1=mp.Process(target=withdraw, args=(balance,lock,lock_bool))
        p2=mp.Process(target=deposit, args=(balance,lock,lock_bool))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        print(f"Balance={balance.value} con lock={lock_bool}")
loop(10,lock_bool=False)
loop(10,lock_bool=True)

Balance=545 con lock=False
Balance=100 con lock=False
Balance=662 con lock=False
Balance=10 con lock=False
Balance=-53 con lock=False
Balance=184 con lock=False
Balance=-92 con lock=False
Balance=-250 con lock=False
Balance=128 con lock=False
Balance=342 con lock=False
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True
Balance=100 con lock=True


# Multithreading
I thread runnano all'interno dello stesso processo. PERO' non possono essere killati


Il GIL in python impedisce l'esecuzione di thread in parallelo.

Il modulo per gestire i thread è threading e la sintassi per istanziare i thread è molto simile a quella dei processi in Process



In [1]:
#Runnare diverse funzioni in thread diversi
import threading as td
def func1():
    print("thread 1")
def func2():
    print("thread 2")
t1=td.Thread(target=func1)
t2=td.Thread(target=func2)
t1.start()
t2.start()
t1.join()
t2.join()


thread 1
thread 2


La comunicazione tra diversi thread è più semplice da gestire in quanto tutti i thread vivono nello stesso spazio di memoria quindi volendo si può tranquillamente usare una variabile globale.

Però anche per i thread bisogna porre attenzione al sincronismo dei thread. Anche se non runnano in parallelo.
Questo succede perchè il GIL funziona in modo particolare: preveiene l'accesso simultaneo dei thread all'interprete di python ma i thread vengono comunque schedulati e il GIL ogni 10 operazioni di bytecode rilascia il thread e lascia al sistema operativo la scelta di quale thread eseguire.
Questo significa che un thread può essere interrotto in qualsiasi momento nonostante non abbia concluso tutte le operazioni impartite dall'utente.

Per prevenire questo comportamento anche in questo caso è necessario l'uso dei lock

In [34]:
balance=0
import sys
def increment():
    global balance
    balance+=1
def loop(N,lock,lock_bool):
    if lock_bool:
        for _ in range(N):
            lock.acquire()
            increment()
            lock.release()
    else:
        for _ in range(N):
            increment()
def threads(N=100000,lock_bool=True):
    global balance
    balance=0
    lock=td.Lock()
    t1=td.Thread(target=loop, args=(N,lock,lock_bool))
    t2=td.Thread(target=loop, args=(N,lock,lock_bool))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print(f"Balance={balance} con lock={lock_bool}")

for i in range(10):
    threads(lock_bool=False)
for i in range(10):
    threads(lock_bool=True)


Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=False
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True
Balance=200000 con lock=True


**NB NON so se questo problema è stato risolto nelle ultime versioni di python, è strano che senza lock il risultato sia giusto. Forse hanno cambiato comportamento gil ma non ci sperare troppo**

I thread quindi sono utili in 2 casi:
- Quando si vuole runnare codice in C (Il GIL viene rilasciato e i thread vanno in parallelo)
- Per operazioni I/O l'interprete rilascia il GIL

In [37]:
# ESEMPIO COPIATO DALLE SLIDES PER SEMPLICITA'
import threading as thr
import requests
import os
from time import perf_counter
buffer_size=1024
#define a function to manage the download
def download(url):
    response = requests.get(url, stream=True)
    filename = url.split("/")[-1]
    with open(filename,"wb") as f:
        for data in response.iter_content(buffer_size):
            f.write(data)
#MAIN

urls= [
"http://cds.cern.ch/record/2690508/files/201909-262_01.jpg",
"http://cds.cern.ch/record/2274473/files/05-07-2017_Calorimeters.jpg",
"http://cds.cern.ch/record/2274473/files/08-07-2017_Spectrometer_magnet.jpg",
"http://cds.cern.ch/record/2127067/files/_MG_3944.jpg",
"http://cds.cern.ch/record/2274473/files/08-07-2017_Electronics.jpg",
]
#define 5 threads
threads = [thr.Thread(target=download, args=(urls[x],)) for x in range(4)]
t = perf_counter()
#start threads
for thread in threads:
    thread.start()
#join threads
for thread in threads:
    thread.join()
print("Time: "+str(perf_counter()-t))

Time: 17.484549723998498
