<a href="https://colab.research.google.com/github/mdessolis/pythontps/blob/main/Threading.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Programmazione Multithreading e Multiprocessing in Python

## Differenza tra multithread e multiprocessing 

Per **thread** si intende un blocco di istruzioni (di solito una funzione o un metodo) eseguito all'interno di un processo in parallelo con altre istruzioni.
  
Pensiamo ad esempio ad un browser che deve mostrare una pagina web: se questa contiene diverse immagini/fogli di stile o altri elementi esterni il browser dovrà scaricare tutti questi file. Ebbene, di solito questo lo fa in parallelo facendo partire una procedura di download (thread) per ciascun file in modo da ottimizzare i tempi.

Oppure pensiamo ad un qualunque sistema di videoscrittura: mentre digitiamo il testo questo automaticamente viene impaginato e spesso anche controllato come ortografia. Ebbene, queste operazioni sono probabilmente eseguite da thread paralleli alla digitazione.

Nel caso del thread quindi si lavora sempre all'interno di un singolo processo di esecuzione che ha al suo interno delle diramazioni di esecuzione che lavorano in parallelo. Però, essendo un singolo processo, l'area dati e i registri di esecuzione del codice sono comuni, quindi i thread condividono tutti gli stessi dati.

Per **processo** si intende invece un blocco di codice che viene eseguito in genere in modo autonomo sul computer e al quale viene quindi assegnato un proprio spazio di memoria. Ad esempio se avviamo un programma di videoscrittura verrà creato un processo di esecuzione e assegnato a questo un codice univoco (detto PID). Nulla ci impedisce però di lanciare due esecuzioni contemporanee del programma, creando però due processi che non condividono tra loro nessuno spazio di memoria.

E' però possibile programmare un codice per istruirlo a dividere l'esecuzione del processo in più sottoprocessi da avviare in parallelo, sfruttando in questo modo le caratteristiche dei moderni processori, composti da più core che possono tranquillamente eseguire operazion in parallelismo reale.

A differenza però di una esecuzione multithread, in quella multiprocess non vi è condivisione della memoria per cui bisogna prevedere, nel caso se ne abbia bisogno, strumenti per far comunicare tra loro i sottoprocessi.

Vediamo ora il tutto con degli esempi.



## Esercizio guidato: conta numeri primi

Si voglia contare quanti numeri primi sono presenti in un certo intervallo fornito in input.

Innanzitutto dovremo definire una funzione per stabilire se un numero è primo o no:


In [2]:
def is_prime(x):
    """ restituisce True se x è primo, False altrimenti """
    if x<2: return False
    i = 2
    while i*i <= x:
        if x % i == 0: return False
        i += 1
    return True

# Esempio di uso:
print(f"5 is prime? {is_prime(5)}")
print(f"6 is prime? {is_prime(6)}")

5 is prime? True
6 is prime? False


Successivamente definiamo una funzione per contare quanti primi troviamo in un dato intervallo:

In [7]:
def count_primes(a,b):
    """ Conta i numeri primi trovati all'interno dell'intervallo [a,b[ """
    count = 0
    for i in range(a, b):
        if is_prime(i):
            count +=1
    print(f"In [{a}, {b}] ho trovato {count} numeri primi")
    return count

# Esempio di uso
count_primes(1,10000)


In [1, 10000] ho trovato 1229 numeri primi


1229

La funzione count_primes si può scrivere in tanti modi differenti in Python. Sfruttando alcune funzioni e strutture del linguaggio potremmo per esempio scriverla anche così:

In [8]:
def count_primesV2(a,b):
    """ Conta i numeri primi trovati all'interno dell'intervallo [a,b[ """
    """ memorizza nella variabile count la somma di una serie di 1 corrispondenti ad ogni primo che trova nell'intervallo """
    count = sum(1 for i in range(a, b) if is_prime(i))
    print(f"In [{a}, {b}] ho trovato {count} numeri primi")
    return count

# Esempio di uso

count_primesV2(1,10000)


In [1, 10000] ho trovato 1229 numeri primi


1229

Proviamo adesso a calcolare il tempo esatto di esecuzione della funzione cont_primes per un intervallo abbastanza grande. Per far questo richiamiamo la libreria **time** che ci mette a disposizione la funzione time() che restituisce un float con il tempo attuale in secondi calcolato a partire dall'1/1/70 (Epoch). Volendo ci sono anche funzioni differenti per il calcolo del tempo ma questa è generica e la possiamo applicare bene a tanti contesti.

In [None]:
from time import time

start = time() # memorizziamo in start l'ora corrente
count_primes(1, 1_000_000) # eseguiamo la funzione
tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"Tempo impiegato: {tempo:.2f} secondi")

# Facciamo la stessa prova con count_primesV2 e vediamo se cambiano i tempi

start = time() # memorizziamo in start l'ora corrente
count_primesV2(1, 1_000_000) # eseguiamo la funzione
tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.19 secondi
In [1, 1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.13 secondi


Bene, quello che abbiamo appena fatto è un algoritmo sviluppato con la programmazione tradizionale impostata pensando di far svolgere i calcoli ad un solo processore.

Però potremmo chiederci: ma anziché fare un unico ciclo in count_primes per tutto l'intervallo, non potremmo dividere il problema su più intervalli da far eseguire il più possibile in parallelo?

Qui intervengono i **thread**.

Andiamo però per passi: prima dividiamo l'unico intervallo in n intervalli che eseguiamo uno di seguito all'altro:


In [None]:
inizio = 1
fine   = 1_000_000
n      = 5 # numero di intervalli
step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

start = time()

conta = 0
for i in range(n):
    conta += count_primes(inizio + i*step, inizio + (i+1) * step)

print(f"In [{inizio}, {fine}] ho trovato {conta} numeri primi")

tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 200001] ho trovato 17984 numeri primi
In [200001, 400001] ho trovato 15876 numeri primi
In [400001, 600001] ho trovato 15238 numeri primi
In [600001, 800001] ho trovato 14853 numeri primi
In [800001, 1000001] ho trovato 14547 numeri primi
In [1, 1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.12 secondi


Ora dobbiamo istruire Python ad eseguire le funzioni count_primes in parallelo mediante thread. 

Per farlo Python mette a disposizione il modulo **threading** con la classe **Thread** che ci consente proprio di avviare una funzione in un nuovo thread.
 

In [None]:
from threading import Thread

inizio = 1
fine   = 1_000_000
n      = 5 # numero di intervalli
step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

start = time()

# creiamo un array in cui memorizzare i vari thread
t = []

for i in range(n):

    # creiamo un instanza della classe Thread e associamo la funzione da eseguire
    thread = Thread(target = count_primes, args=(inizio + i*step, inizio + (i+1) * step))
    
    # facciamo partire il thread
    thread.start()
    
    # aggiungiamo il thread all'array
    t.append(thread)

# comunichiamo al programma di attendere la fine di tutti i thread prima di proseguire
for thread in t: thread.join()

tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 200001] ho trovato 17984 numeri primi
In [200001, 400001] ho trovato 15876 numeri primi
In [400001, 600001] ho trovato 15238 numeri primi
In [600001, 800001] ho trovato 14853 numeri primi
In [800001, 1000001] ho trovato 14547 numeri primi
Tempo impiegato: 11.30 secondi


Notare che in queste ultime istruzioni è stato rimosso il contatore complessivo dei numeri trovati. Questo perché quando si eseguono le funzioni come thread non si può ottenere il valore restituito dalla funzione stessa, quindi occorrerà modificare il codice della funzione in modo che possa modificare dal suo interno il contatore.

Dall'esecuzione su questo notebook si può notare che non ci sono particolari vantaggi nei tempi. Provando però ad eseguirlo, anche più volte, sul proprio pc si può notare un piccolo miglioramento. 

A dire il vero in Python i thread non sono gestiti come in C++, dove il tempo di esecuzione verrebbe apprezzato di più. 
Di fatto in Python i thread, a causa di come è implementato l'interprete dei comandi, sono eseguiti uno alla volta e il parallelismo è solo simulato.
Vi sono però diversi problemi che anche con Python forniscono dei risultati migliori di un'esecuzione sequenziale. Pensiamo ad esempio a tutti quelli che devono lavorare con i file, dove un'eventuale attesa nell'accesso ad una risorsa può essere sfruttato da altri thread.  

In questo esempio di carattere matematico apprezzeremo di più l'esecuzione parallela se impostiamo il programma in versione non multi-thread ma multi-process.

Provare sul proprio pc la seguente versione:

In [None]:
from multiprocessing import Process

if __name__ == "__main__":
    """ Questa if serve per far eseguire le istruzioni seguenti solo al processo principale """
    inizio = 1
    fine   = 1_000_000
    n      = 5 # numero di intervalli
    step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

    start = time()

    # creiamo un array in cui memorizzare i vari thread
    t = []

    for i in range(n):

        # creiamo un instanza della classe Process e associamo la funzione da eseguire
        p = Process(target = count_primes, args=(inizio + i*step, inizio + (i+1) * step))
        
        # facciamo partire il processo
        p.start()
        
        # aggiungiamo il processo all'array
        t.append(p)

    # comunichiamo al programma di attendere la fine di tutti i processi prima di proseguire
    for p in t: p.join()

    tempo = time() - start # calcoliamo in tempo il tempo trascorso

    print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 200001] ho trovato 17984 numeri primi
In [200001, 400001] ho trovato 15876 numeri primi
In [400001, 600001] ho trovato 15238 numeri primi
In [600001, 800001] ho trovato 14853 numeri primi
In [800001, 1000001] ho trovato 14547 numeri primi
Tempo impiegato: 10.59 secondi


Come si può vedere da quest'ultimo codice, le differenze sintattiche nell'uso dei processi e non dei thread sono lievi, basta richiamare il modulo **multiprocessing** e non **threading** e usare la classe **Process** anziché **Thread**.
Inoltre bisogna far eseguire tutto il codice del processo principale solo appunto al main process. Questo lo si impone con l'istruzione

> if \_\_name\_\_ == "\_\_main\_\_":

Le differenze però terminano qui, ricordiamo che eseguendo il codice in multiprocessing ogni processo ha il suo spazio di memoria separato quindi la gestione/scambio di dati comuni cambia completamente. Infatti è stato rimosso dal codice il calcolo del numero totale di valori trovati.


## Esercizi

1.   Calcolare in modalità multithread le coppie di numeri amici presenti in un intervallo [a,b[. Ricordiamo che sono amici due numeri a e b se la somma dei divisori propri di a è uguale a b e viceversa.
2.   Scrivere un programma per cercare all'interno di diversi file di testo salvati in una cartella una parola fornita in input. La ricerca deve essere fatta mediante l'attivazione di un thread per ogni file.



## Scambio dati tra thread

Supponiamo ora di voler eseguire del codice su più thread ma di aver bisogno di scambiare dati tra questi. Ad esempio nell'esercizio sui numeri primi vogliamo calcolare il numero totale di valori trovati che è presente nella versione sequenziale.

Per risolvere questo problema si possono usare diversi metodi in Python.

Vediamoli brevemente:


### Variabili globali

Un primo metodo consiste nell'usare delle variabili globali. Ricordiamo che nell'istanziazione dell'oggetto Thread passiamo la funzione da eseguire ma non possiamo usare di questa un eventuale valore di return. Possiamo però usare una variabile globale e modificarla da dentro la funzione eseguita nel thread. Siccome i thread condividono lo stesso spazio di memoria possono anche accedere alle stesse variabili globali.
Una prima soluzione può quindi essere la seguente:

In [None]:
from threading import Thread
from time import time

def is_prime(x):
    """ restituisce True se x è primo, False altrimenti """
    if x<2: return False
    i = 2
    while i*i <= x:
        if x % i == 0: return False
        i += 1
    return True

def count_primes(a,b):
    """ Conta i numeri primi trovati all'interno dell'intervallo [a,b[ """
    global conta # variabile definita alla riga 24
    count = 0
    for i in range(a, b):
        if is_prime(i):
            count +=1
    print(f"In [{a}, {b}] ho trovato {count} numeri primi")
    conta += count
    return count

conta  = 0 # VARIABILE GLOBALE

inizio = 1
fine   = 1_000_000
n      = 5 # numero di intervalli
step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

start = time()

# creiamo un array in cui memorizzare i vari thread
t = []

for i in range(n):

    # creiamo un instanza della classe Thread e associamo la funzione da eseguire
    thread = Thread(target = count_primes, args=(inizio + i*step, inizio + (i+1) * step))
    
    # facciamo partire il thread
    thread.start()
    
    # aggiungiamo il thread all'array
    t.append(thread)

# comunichiamo al programma di attendere la fine di tutti i thread prima di proseguire
for thread in t: thread.join()

tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"In [{inizio}, {fine}] ho trovato {conta} numeri primi")
print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 200001] ho trovato 17984 numeri primi
In [400001, 600001] ho trovato 15238 numeri primi
In [200001, 400001] ho trovato 15876 numeri primi
In [800001, 1000001] ho trovato 14547 numeri primi
In [600001, 800001] ho trovato 14853 numeri primi
In [1, 1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.26 secondi


Questa soluzione, apparentemente molto semplice, potrebbe creare dei problemi se eseguita in un ambiente multithread reale (ricordiamo che nelle versioni standard di Python di fatto i thread sono eseguiti uno alla volta per come è progettato l'interprete).

Infatti che accadrebbe se l'istruzione *conta += count* venisse eseguita in contemporanea da due thread? Ognuno potrebbe prendere il valore iniziale di conta e incrementarlo del valore locale di count sovrascrivendo la modifica fatta dall'altro thread.

Per evitare questo tipo di problemi vedremo più avanti anche il meccanismo del lock.


### Uso di classi Thread

Un secondo metodo, più elegante dell'uso delle variabili globali, consiste nello sfruttare la programmazione ad oggetti e la classe Thread.

Si potrebbe creare una classe derivata da Thread contenente una funzione **run** che gestirà i calcoli da eseguire nel thread. Questa funzione però avrà accesso anche agli attributi della classe che possono anche essere accessibili dal main, quindi potremo salvare i conteggi come attrobui e sommarli tutti nel main.


In [None]:
from threading import Thread
from time import time

def is_prime(x):
    """ restituisce True se x è primo, False altrimenti """
    if x<2: return False
    i = 2
    while i*i <= x:
        if x % i == 0: return False
        i += 1
    return True

def count_primes(a,b):
    """ Conta i numeri primi trovati all'interno dell'intervallo [a,b[ """
    count = 0
    for i in range(a, b):
        if is_prime(i):
            count +=1
    print(f"In [{a}, {b}] ho trovato {count} numeri primi")
    return count

class MyThread(Thread):
    """ Questa classe sarà istanziata nel main per attivare i thread """
    conta = 0
    a = 0
    b = 0

    def __init__(self, a, b):
        super().__init__()
        self.a = a
        self.b = b

    def run(self):
        self.conta = count_primes(self.a, self.b)

inizio = 1
fine   = 1_000_000
n      = 5 # numero di intervalli
step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

start = time()

# creiamo un array in cui memorizzare i vari thread
t = []

for i in range(n):

    # creiamo un instanza della classe Thread e associamo la funzione da eseguire
    thread = MyThread(inizio + i*step, inizio + (i+1) * step)
    
    # facciamo partire il thread
    thread.start()
    
    # aggiungiamo il thread all'array
    t.append(thread)

# comunichiamo al programma di attendere la fine di tutti i thread prima di proseguire
for thread in t: thread.join()

# Ora possiamo raccogliere da ogni oggetto thread i vari conteggi e sommarli
conta = sum(th.conta for th in t)

tempo = time() - start # calcoliamo in tempo il tempo trascorso


print(f"In [{inizio}, {fine}] ho trovato {conta} numeri primi")
print(f"Tempo impiegato: {tempo:.2f} secondi")

In [1, 200001] ho trovato 17984 numeri primi
In [200001, 400001] ho trovato 15876 numeri primi
In [400001, 600001] ho trovato 15238 numeri primi
In [600001, 800001] ho trovato 14853 numeri primi
In [800001, 1000001] ho trovato 14547 numeri primi
In [1, 1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.21 secondi


Questo metodo ha il vantaggio di dare maggiore flessibilità al codice e di permetterci in altri problemi di salvare più informazioni in modo semplice.

### Uso di code (queue)

Un altro metodo molto usato al posto delle variabili globali e che può essere applicato anche in contesti di thread Produttore-Consumatore, consiste nel condividere i dati tra thread mediante code.

Python mette a disposizione un modulo queue che implementa una classe Queue strutturata in modo thread-safe, quindi con inclusi meccanismi di lock per garantire l'accesso corretto ai dati.

Nell'esempio dei numeri primi potremo ad esempio usare un oggetto globale come coda in cui salvare i conteggi, oppure passarlo come argomento alla funzione count_primes.


In [None]:
from threading import Thread
from time import time
from queue import Queue

def is_prime(x):
    """ restituisce True se x è primo, False altrimenti """
    if x<2: return False
    i = 2
    while i*i <= x:
        if x % i == 0: return False
        i += 1
    return True

def count_primes(a,b, q):
    """ Conta i numeri primi trovati all'interno dell'intervallo [a,b[ """
    count = 0
    for i in range(a, b):
        if is_prime(i):
            count +=1
    print(f"In [{a:8}, {b:8}] ho trovato {count} numeri primi")
    q.put(count)
    return count

coda  = Queue() # VARIABILE GLOBALE

inizio = 1
fine   = 1_000_000
n      = 5 # numero di intervalli
step   = (fine-inizio+1) // n # ampiezza di ogni intervallo

start = time()

# creiamo un array in cui memorizzare i vari thread
t = []

for i in range(n):

    # creiamo un instanza della classe Thread e associamo la funzione da eseguire
    thread = Thread(target = count_primes, args=(inizio + i*step, inizio + (i+1) * step, coda))
    
    # facciamo partire il thread
    thread.start()
    
    # aggiungiamo il thread all'array
    t.append(thread)

# comunichiamo al programma di attendere la fine di tutti i thread prima di proseguire
for thread in t: thread.join()

# calcoliamo il conteggio finale
conta = sum(coda.queue) # tenere presente che la coda non viene svuotata

tempo = time() - start # calcoliamo in tempo il tempo trascorso

print(f"In [{inizio:8}, {fine:8}] ho trovato {conta} numeri primi")
print(f"Tempo impiegato: {tempo:.2f} secondi")

In [       1,   200001] ho trovato 17984 numeri primi
In [  200001,   400001] ho trovato 15876 numeri primi
In [  400001,   600001] ho trovato 15238 numeri primi
In [  600001,   800001] ho trovato 14853 numeri primi
In [  800001,  1000001] ho trovato 14547 numeri primi
In [       1,  1000000] ho trovato 78498 numeri primi
Tempo impiegato: 11.22 secondi


### Uso di oggetti o classi condivise

Quando si ha necessità di scambiare diverse informazioni, in modo anche bidirezionale tra i vari thread, si potrebbe definire una classe contentente tutte le informazioni che i thread si devono scambiare e passare un'istanza di questa classe come argomento all'avvio dei thread oppure come variabile globale.

Supponiamo ad esempio di voler calcolare i primi N numeri amici dividendo l'elaborazione tra più thread che analizzino diversi intervalli in parallelo.

Vogliamo però interrompere i calcoli quando troviamo l'ennesima coppia. Per fare ciò è necessario che il contatore dei numeri sia condiviso tra i vari thread. Poi per leggibilità potremmo avere bisogno di una variabile booleana che segni la fine dell'elaborazione.

Una possibile soluzione potrebbe quindi essere la seguente:


In [None]:
from threading import Thread

def sum_dividers(x):
    s = 1
    i = 2
    while i*i<x:
        if x % i == 0: s += i + x//i
        i += 1
    if i*i==x: s += i
    return s

def have_a_friend(a):
    b = sum_dividers(a)
    return b if a == sum_dividers(b) else None

def search_friends(a,b):
    print(f"Searching in [{a:7}:{b:7}]")
    for x in range(a,b):
        if Share.end: break
        y = have_a_friend(x)
        if y != None and x<y:
            Share.count += 1
            if Share.count>=Share.number_of_friends: 
                Share.end = True
            print(f"{Share.count:2}) {x:7} - {y:7} are friends")
            

class Share:
    end = False
    count = 0
    number_of_friends = 20


n = 5
inizio = 2_000_001
fine   = 3_000_000
step   = (fine-inizio+1)//n
print(f"Step: {step}")
t = []
for i in range(n):
    thread = Thread(target = search_friends, args = (inizio+step*i, inizio+step*(i+1)))
    t.append(thread)

for thread in t: thread.start()
for thread in t: thread.join()


Step: 200000
Searching in [2000001:2200001]
Searching in [2200001:2400001]
Searching in [2400001:2600001]
Searching in [2600001:2800001]Searching in [2800001:3000001]

 1) 2802416 - 2947216 are friends
 2) 2803580 - 3716164 are friends
 3) 2236570 - 2429030 are friends
 4) 2652728 - 2941672 are friends
 5) 2082464 - 2090656 are friends
 6) 2723792 - 2874064 are friends
 7) 2728726 - 3077354 are friends
 8) 2739704 - 2928136 are friends


Nelle righe 28-31 è stata definita la class Share. Questa non ha bisogno di metodi ma solamente di definire quali variabili vogliamo condividere tra i thread, quindi la booleana **end** per segnalare la fine dell'elebaorazione, il contatore **count** che verrà incrementato di 1 da ogni thread ogni qualvolta si trova una coppia di numeri amici e che non dovrà superare il valore memorizzato in **number_of_friends**, che volendo si potrebbe anche chiedere in input anziché impostarlo in modo statico (idem per le variabili inizio e fine).

A nostra discrezione possiamo poi decidere se istanziare la classe e passare l'oggetto come argomento della funzione del thread, oppure, come fatto nell'esempio, accedere direttamente alle proprietà della classe senza peraltro aver bisogno di dichiarare variabili globali. Di fatto in questo caso la classe è statica.

## Meccanismi di lock

Nell'esecuzione di codice in multithreading si pone spesso il problema dell'accesso contemporaneo a variabili condivise oppure a risorse condivise (ad esempio un file su cui vogliono accedere in contemporanea più thread).

In questi casi bisogna definire dei metodi che gestiscano quelle che in gergo informatico si chiamano *race conditions*, ovvero situazioni in cui l'accesso contemporaneo alla stessa risorsa può creare effetti indesiderati o compromettere la risorsa stessa.

In Python il modulo **threading** mette a disposizione la classe **Lock** proprio per gestire queste situazioni. Mediante questa classe è possibile bloccare un thread sino a che un altro thread non rilasci una risorsa, e quando questa viene rilasciata è possibile per il thread in attesa bloccarla e prendere il controllo.

Vediamo il tutto con un esempio che opera su file di testo. Nella cartella *data* sono presenti tre file con il testo completo della Divina Commedia. Vogliamo realizzare delle funzioni che operino sui file.


### Esercizio svolto: Statistiche di testo

Vogliamo definire in Python delle funzioni che leggano un file di testo ed eseguano delle statistiche e ricerche:

1.   Conteggio righe
2.   Conteggio lettere
3.   Conteggio parole
4.   Ricerca e conteggio delle righe in cui si trova una parola 

Per ciascuna statistica vogliamo sviluppare una funzione apposita.


**NOTA**: prima di eseguire gli esempi di codice proposti, caricare i file di testo nel runtime di questo notebook con i seguenti comandi:


In [None]:
!mkdir data
!wget https://gist.githubusercontent.com/mdessolis/6b2a02b525761b7f16fcde5827c5c833/raw/e019b3c833dcf52fafd5464669a5233006fcfa12/inferno.txt -O data/inferno.txt
!wget https://gist.githubusercontent.com/mdessolis/66d965c2b83f4880639ba77c73d10c99/raw/d6b86aa65ef1f6fc06350b22e3a9f33abaf9b165/purgatorio.txt -O data/purgatorio.txt
!wget https://gist.githubusercontent.com/mdessolis/7d996ab4ba95488b29b724405c54cd00/raw/2110e2ced145de322f932e7c0f665a3c04e0833f/paradiso.txt -O data/paradiso.txt

--2023-04-12 09:42:36--  https://gist.githubusercontent.com/mdessolis/6b2a02b525761b7f16fcde5827c5c833/raw/e019b3c833dcf52fafd5464669a5233006fcfa12/inferno.txt
Resolving gist.githubusercontent.com (gist.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 181467 (177K) [text/plain]
Saving to: ‘data/inferno.txt’


2023-04-12 09:42:37 (11.3 MB/s) - ‘data/inferno.txt’ saved [181467/181467]

--2023-04-12 09:42:37--  https://gist.githubusercontent.com/mdessolis/66d965c2b83f4880639ba77c73d10c99/raw/d6b86aa65ef1f6fc06350b22e3a9f33abaf9b165/purgatorio.txt
Resolving gist.githubusercontent.com (gist.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to gist.githubusercontent.com (gist.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting r

In [None]:
def count_lines(file):
    """ stampa il numero di righe presenti nel file """
    c=sum(1 for riga in open(file))
    print(f"{file}: {c} righe")

def count_chars(file):
    """ stampa il numero di caratteri presenti nel file """
    c=sum(len(riga) for riga in open(file))
    print(f"{file}: {c} caratteri")

def count_words(file):
    """ stampa il numero di parole presenti nel file """
    c=0
    for riga in open(file):
        w = riga.split()
        words = len(w)
        c += words
    print(f"{file}: {c} parole")

def search_word(word, file):
    """ elenca e conta le righe in cui si trova una parola """
    c = 0
    i = 0
    for riga in open(file):
        i += 1
        if word in riga:
            c += 1
            print(f"{file}: riga {i:4}: {riga}", end="")
    print(f"{file}: {word} trovata in {c} righe\n")

files = ['data/inferno.txt', 'data/purgatorio.txt', 'data/paradiso.txt']

for file in files:
    count_lines(file)
    count_chars(file)
    count_words(file)
    search_word("stelle", file)


data/inferno.txt: 4894 righe
data/inferno.txt: 179452 caratteri
data/inferno.txt: 32413 parole
data/inferno.txt: riga   46: e 'l sol montava 'n sù con quelle stelle
data/inferno.txt: riga  319: risonavan per l'aere sanza stelle,
data/inferno.txt: riga 2188: e torni a riveder le belle stelle,
data/inferno.txt: riga 2716: per sua dimora; onde a guardar le stelle
data/inferno.txt: riga 3693:   Tutte le stelle già de l'altro polo
data/inferno.txt: riga 4894:   E quindi uscimmo a riveder le stelle.
data/inferno.txt: stelle trovata in 6 righe

data/purgatorio.txt: 4926 righe
data/purgatorio.txt: 180271 caratteri
data/purgatorio.txt: 32504 parole
data/purgatorio.txt: riga   29: a l'altro polo, e vidi quattro stelle
data/purgatorio.txt: riga  820:   giusto giudicio da le stelle caggia
data/purgatorio.txt: riga 1103: pur là dove le stelle son più tarde,
data/purgatorio.txt: riga 1108:   Ond'elli a me: "Le quattro chiare stelle
data/purgatorio.txt: riga 2432: che le stelle apparivan da più lati.

Proviamo adesso ad eseguire in parallelo le tre funzioni, definendone una nuova, stats, che verrà eseguita in thread per ciascun file:

In [None]:
from threading import Thread

def count_lines(file):
    """ stampa il numero di righe presenti nel file """
    c=sum(1 for riga in open(file))
    print(f"{file:25}: {c} righe")

def count_chars(file):
    """ stampa il numero di caratteri presenti nel file """
    c=sum(len(riga) for riga in open(file))
    print(f"{file:25}: {c} caratteri")

def count_words(file):
    """ stampa il numero di parole presenti nel file """
    c=0
    for riga in open(file):
        w = riga.split() # isolo le parole
        words = len(w)
        c += words
    print(f"{file:25}: {c} parole")

def search_word(word, file):
    """ elenca e conta le righe in cui si trova una parola """
    c = 0
    i = 0
    for riga in open(file):
        i += 1
        if word in riga:
            c += 1
            print(f"{file:25}: riga {i:4}: {riga}", end="")
    print(f"{file:25}: {word} trovata in {c} righe\n")

def stats(word, file):
    count_lines(file)
    count_chars(file)
    count_words(file)
    search_word(word, file)

files = ['data/inferno.txt', 'data/purgatorio.txt', 'data/paradiso.txt']

t = []
for file in files:
    t.append(Thread(target=stats, args=("stelle",file)))

for thread in t: thread.start()
for thread in t: thread.join()



data/purgatorio.txt      : 4926 righe
data/purgatorio.txt      : 180271 caratteridata/inferno.txt         : 4894 righe
data/paradiso.txt        : 4928 righe

data/purgatorio.txt      : 32504 parole
data/purgatorio.txt      : riga   29: a l'altro polo, e vidi quattro stelle
data/inferno.txt         : 179452 caratteri
data/paradiso.txt        : 177366 caratteri
data/purgatorio.txt      : riga  820:   giusto giudicio da le stelle caggia
data/inferno.txt         : 32413 parole
data/purgatorio.txt      : riga 1103: pur là dove le stelle son più tarde,
data/paradiso.txt        : 31853 paroledata/inferno.txt         : riga   46: e 'l sol montava 'n sù con quelle stelle
data/purgatorio.txt      : riga 1108:   Ond'elli a me: "Le quattro chiare stelle
data/purgatorio.txt      : riga 2432: che le stelle apparivan da più lati.

data/inferno.txt         : riga  319: risonavan per l'aere sanza stelle,
data/purgatorio.txt      : riga 2581: facea le stelle a noi parer più rade,
data/inferno.txt       

Come si può notare dall'output, il risultato è corretto ma, lavorando in multithreading alcune righe potrebbero non essere stampate bene e l'output di search_word viene spesso inframmezzato dagli output delle altre funzioni.

Se si prova lo stesso esercizio in C++ si noterà come addirittura le righe potrebbero essere interrotte al loro interno da altre righe. Questo perché in C++ il multithreading è reale e i comandi di output funzionerebbero in contemporanea.

Come già detto prima, in Python, a causa di come è progettato l'interprete, il multithreading non è di fatto tale, ma nelle future versioni potrebbe esserlo e in ogni caso già in questo esempio dobbiamo intervenire.

Aggiungiamo quindi al codice un oggetto di classe **Lock** e impostiamo nelle funzioni che quando si deve stampare qualcosa se ne chieda l'accesso esclusivo: 

In [None]:
from threading import Thread, Lock

def count_lines(file):
    """ stampa il numero di righe presenti nel file """
    global lock
    c=sum(1 for riga in open(file))
    lock.acquire()
    print(f"{file:25}: {c} righe")
    lock.release()

def count_chars(file):
    """ stampa il numero di caratteri presenti nel file """
    global lock
    c=sum(len(riga) for riga in open(file))
    lock.acquire()
    print(f"{file:25}: {c} caratteri")
    lock.release()

def count_words(file):
    """ stampa il numero di parole presenti nel file """
    global lock
    c=0
    for riga in open(file):
        w = riga.split() # isolo le parole
        words = len(w)
        c += words
    lock.acquire()
    print(f"{file:25}: {c} parole")
    lock.release()

def search_word(word, file):
    """ elenca e conta le righe in cui si trova una parola """
    c = 0
    i = 0
    for riga in open(file):
        i += 1
        if word in riga:
            c += 1
            print(f"{file:25}: riga {i:4}: {riga}", end="")
    print(f"{file:25}: {word} trovata in {c} righe\n")

def stats(word, file):
    global lock # in alternativa la si può anche passare come argomento
    count_lines(file)
    count_chars(file)
    count_words(file)
    lock.acquire()
    search_word(word, file)
    lock.release()

lock = Lock() # definiamo una variabile globale per gestire il lock dei thread

files = ['data/inferno.txt', 'data/purgatorio.txt', 'data/paradiso.txt']

t = []
for file in files:
    t.append(Thread(target=stats, args=("stelle",file)))

for thread in t: thread.start()
for thread in t: thread.join()

data/inferno.txt         : 4894 righe
data/inferno.txt         : 179452 caratteri
data/purgatorio.txt      : 4926 righe
data/paradiso.txt        : 4928 righe
data/purgatorio.txt      : 180271 caratteri
data/inferno.txt         : 32413 parole
data/inferno.txt         : riga   46: e 'l sol montava 'n sù con quelle stelle
data/inferno.txt         : riga  319: risonavan per l'aere sanza stelle,
data/inferno.txt         : riga 2188: e torni a riveder le belle stelle,
data/inferno.txt         : riga 2716: per sua dimora; onde a guardar le stelle
data/inferno.txt         : riga 3693:   Tutte le stelle già de l'altro polo
data/inferno.txt         : riga 4894:   E quindi uscimmo a riveder le stelle.
data/inferno.txt         : stelle trovata in 6 righe

data/paradiso.txt        : 177366 caratteri
data/purgatorio.txt      : 32504 parole
data/purgatorio.txt      : riga   29: a l'altro polo, e vidi quattro stelle
data/purgatorio.txt      : riga  820:   giusto giudicio da le stelle caggia
data/purga

Con il lock ora l'output è più ordinato, avendo imposto che quando search_word è in funzione, nessun altro thread può stampare.

In ogni funzione è stato quindi inserito un comando **lock.acquire()** per bloccare l'esecuzione della funzione sino a che il lock sia libero. Una volta ottenuto il lock l'esecuzione può proseguire e, terminato il blocco di istruzioni vincolate, bisogna ricordarsi *assolutamente* di rilasciare il lock con **lock.release()**, altrimenti gli altri thread potrebbero rimanere in attesa per tempo indefinito.

Volendo si può usare anche una sintassi più breve ed elegante della variabile lock tramite il comando **with**. Con questo viene esplicitato meglio il blocco di istruzioni da porre in lock e viene gestito in automatico sia **acquire** che **release**.

```
with lock:
    # istruzioni da eseguire con lock attivo
    # sia acquire() che release() vengono gestiti in automatico
```

Ecco la nuova versione del codice usando with:

In [None]:
from threading import Thread, Lock

def count_lines(file):
    """ stampa il numero di righe presenti nel file """
    global lock
    c=sum(1 for riga in open(file))
    with lock:
        print(f"{file:25}: {c} righe")

def count_chars(file):
    """ stampa il numero di caratteri presenti nel file """
    global lock
    c=sum(len(riga) for riga in open(file))
    with lock:
        print(f"{file:25}: {c} caratteri")

def count_words(file):
    """ stampa il numero di parole presenti nel file """
    global lock
    c=0
    for riga in open(file):
        w = riga.split() # isolo le parole
        words = len(w)
        c += words
    with lock:
        print(f"{file:25}: {c} parole")
    

def search_word(word, file):
    """ elenca e conta le righe in cui si trova una parola """
    c = 0
    i = 0
    for riga in open(file):
        i += 1
        if word in riga:
            c += 1
            print(f"{file:25}: riga {i:4}: {riga}", end="")
    print(f"{file:25}: {word} trovata in {c} righe\n")

def stats(word, file):
    global lock # in alternativa la si può anche passare come argomento
    count_lines(file)
    count_chars(file)
    count_words(file)
    with lock:
        search_word(word, file)
    

lock = Lock() # definiamo una variabile globale per gestire il lock dei thread

files = ['data/inferno.txt', 'data/purgatorio.txt', 'data/paradiso.txt']

t = []
for file in files:
    t.append(Thread(target=stats, args=("stelle",file)))

for thread in t: thread.start()
for thread in t: thread.join()

data/inferno.txt         : 4894 righe
data/inferno.txt         : 179452 caratteri
data/inferno.txt         : 32413 parole
data/inferno.txt         : riga   46: e 'l sol montava 'n sù con quelle stelle
data/inferno.txt         : riga  319: risonavan per l'aere sanza stelle,
data/inferno.txt         : riga 2188: e torni a riveder le belle stelle,
data/inferno.txt         : riga 2716: per sua dimora; onde a guardar le stelle
data/inferno.txt         : riga 3693:   Tutte le stelle già de l'altro polo
data/inferno.txt         : riga 4894:   E quindi uscimmo a riveder le stelle.
data/inferno.txt         : stelle trovata in 6 righe

data/purgatorio.txt      : 4926 righe
data/paradiso.txt        : 4928 righe
data/purgatorio.txt      : 180271 caratteri
data/paradiso.txt        : 177366 caratteri
data/purgatorio.txt      : 32504 parole
data/purgatorio.txt      : riga   29: a l'altro polo, e vidi quattro stelle
data/purgatorio.txt      : riga  820:   giusto giudicio da le stelle caggia
data/purga

## Pattern Produttore-Consumatore

Nell'ambito della programmazione multithreading c'è un tipo di problema che compare di frequente.

E' quello in cui uno o più thread generano dei dati e uno o più altri thread invece li elaborano.

Pensiamo ad esempio ad un programma di videoscrittura: ci può essere un thread che gestisce l'input da tastiera (produttore) e un thread (consumatore) che controlla l'ortografia, un altro che gestisce l'impaginazione sullo schermo, un terzo che fa il salvataggio automatico dei dati, e così via.

Oppure un programma in cui si ricevono dei testi contemporaneamente (produttori) che vengono elaborati da un unico thread per farne delle statistiche (consumatore).

In tutti questi casi di solito si deve gestire una struttura dati, di solito una coda, per consentire lo scambio dei dati tra thread produttore e thread consumatore, e bisogna definire una qualche strategia per stabilire quando e come far terminare i thread.

La struttura base del codice sarà quindi di questo tipo:

```
from threading import Thread
from queue import Queue

def Producer(q):
    global fine
    while not fine:
        ...
        data = ...
        q.put(data)
        if ... :
            fine = True

def Consumer(q):
    global fine
    while not fine or not q.empty():
        if not q.empty():
            data = q.get()
            ...

coda = Queue()
fine = False

producer = Thread(target=Producer, args=(coda,))
consumer = Thread(target=Consumer, args=(coda,))

producer.start()
consumer.start()

producer.join()
consumer.join()
```

La funzione thread Producer crea un ciclo che prevederà un controllo per stabilire quando terminarlo, ponendo la variabile globale fine a True.

In parallelo la funzione Consumer farà partire un altro ciclo che si interromperà quando rileverà che la variabile globale fine sia diventata False (e sarà il producer a determinarlo, per esempio quando termina la lettura di un file).

Il producer inoltre nelle istruzioni del ciclo avrà anche il compito di inserire nella coda q gli elementi che produrrà, elementi che verranno poi letti, in ordine di arrivo, dal consumer.

Anziché usare delle funzioni per i thread, volendo si possono definire delle classi derivate da Thread, molto più convenienti se si hanno da gestire diversi tipi di dati:

```
from threading import Thread
from queue import Queue

class Producer(Thread):
    ... attributi di classe
    q = None

    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        global fine
        while not fine:
            ...
            data = ...
            self.q.put(data)
            if ... :
                fine = True

class Consumer(Thread):
    ... attributi di classe
    q = None

    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        global fine
        while not fine or not self.q.empty():
            if not self.q.empty():
                data = self.q.get()
                ...

coda = Queue()
fine = False

producer = Producer(coda)
consumer = Consumer(coda)

producer.start()
consumer.start()

producer.join()
consumer.join()
```

### Esercizio svolto: Conta parole e righe

Vogliamo realizzare un programma che ci consenta di ricevere in input un testo e calcolarne il numero di righe e di parole contenute in esso.

Questo è un problema che potrebbe tranquillamente essere risolto in forma sequenziale, senza uso di thread. Però lo vogliamo risolvere in mutithreading per renderlo più flessibile con piccole modifiche, ad esempio per ricevere contemporaneamente il testo da più file o da un url, oppure in input da tastiera.

Sviluppiamo quindi una funzione produttore che si occupi solamente di ricevere il testo riga per riga e inserirlo in coda.

Una funzione consumatore, in parallelo, riceverà le righe ed effettuerà i calcoli richiesti. Il consumatore però non è tenuto a sapere la sorgente delle righe, può essere un file, un url, un input da tastiera, è completamente autonoma dall'input.



In [None]:
from threading import Thread
from queue import Queue

def Producer(q, file):
    """ legge le righe di un file di testo e le inserisce in una coda """
    global fine
    for riga in  open(file):
        q.put(riga)
    fine = True

def Consumer(q):
    """ legge le righe presenti in coda e le elabora """
    global fine
    rows = 0
    words = 0
    while not fine or not q.empty():
        if not q.empty():
            rows += 1
            riga = q.get()
            w = riga.split() # w è una lista con le parole separate
            words += len(w)  # len(w) ci dice quante parole ci sono nella riga
    print(f"Rows : {rows:6}")
    print(f"Words: {words:6}")

coda = Queue()
fine = False
file = "sample_data/mnist_test.csv"

producer = Thread(target=Producer, args=(coda, file))
consumer = Thread(target=Consumer, args=(coda,))

producer.start()
consumer.start()

producer.join()
consumer.join()

Rows :  10000
Words:  10000


Nel caso in cui volessimo analizzare i testi provenienti da più file potremmo eseguire più produttori lasciando inalterato il codice del consumatore, in quanto questo accede solo alla coda.

Si porrebbe però in questo caso un problema legato all'interruzione dell'elaborazione, poiché il primo produttore che termina la lettura del proprio file metterebbe la variabile globale *fine* a True facendo terminare il consumatore prima che gli altri produttori finiscano la loro elaborazione.

Per ovviare a questo ci sono diverse soluzioni, tra cui anche una classe apposita Barrier presente nel modulo *threading*.

Qui però per semplicità ci basterà trasformare la variabile fine in una lista di booleane ciascuna associata ad un produttore e modificheremo di conseguenza il consumatore per interrompere l'elaborazione solo quando tutti i produttori hanno terminato e quindi tutte le booleane della lista sono True.


In [None]:
from threading import Thread
from queue import Queue

def Producer(id, q, file):
    """ legge le righe di un file di testo e le inserisce in una coda """
    global fine
    for riga in  open(file):
        q.put(riga)
    fine[id] = True # tramite l'id sappiamo quale boolean di fine impostare

def Consumer(q):
    """ legge le righe presenti in coda e le elabora """
    global fine
    rows = 0
    words = 0
    while not all(fine) or not q.empty(): 
        if not q.empty():
            rows += 1
            riga = q.get()
            w = riga.split() # w è una lista con le parole separate
            words += len(w)  # len(w) ci dice quante parole ci sono nella riga
    print(f"Rows : {rows:6}")
    print(f"Words: {words:6}")

coda = Queue()
fine = [False, False]
file = ["sample_data/mnist_test.csv", "sample_data/california_housing_test.csv"]

producer0 = Thread(target=Producer, args=(0, coda, file[0]))
producer1 = Thread(target=Producer, args=(1, coda, file[1]))
consumer = Thread(target=Consumer, args=(coda,))

producer0.start()
producer1.start()
consumer.start()

producer0.join()
producer1.join()
consumer.join()

Rows :  13001
Words:  13001


Questa versione ci consente di far funzionare il codice correttamente sia con un solo produttore che con più produttori.

Vediamo ora, per finire, la stessa soluzione svolta però con l'uso di classi derivate da Thread anziché funzioni thread.

In [None]:
from threading import Thread
from queue import Queue

class Producer(Thread):
    id = 0
    q  = None
    file = ""

    def __init__(self, id, q, file):
        super().__init__()
        self.id = id
        self.q  = q
        self.file = file

    def run(self):
        """ legge le righe di un file di testo e le inserisce in una coda """
        global fine
        for riga in  open(self.file):
            self.q.put(riga)
        fine[self.id] = True # tramite l'id sappiamo quale boolean di fine impostare

class Consumer(Thread):
    q = None

    def __init__(self, q):
        super().__init__()
        self.q = q 

    def run(self):
        """ legge le righe presenti in coda e le elabora """
        global fine
        rows = 0
        words = 0
        while not all(fine) or not self.q.empty(): 
            if not self.q.empty():
                rows += 1
                riga = self.q.get()
                w = riga.split() # w è una lista con le parole separate
                words += len(w)  # len(w) ci dice quante parole ci sono nella riga
        print(f"Rows : {rows:6}")
        print(f"Words: {words:6}")

coda = Queue()
fine = [False, False]
file = ["sample_data/mnist_test.csv", "sample_data/california_housing_test.csv"]

producer0 = Producer(0, coda, file[0])
producer1 = Producer(1, coda, file[1])
consumer = Consumer(coda)

producer0.start()
producer1.start()
consumer.start()

producer0.join()
producer1.join()
consumer.join()

Rows :  13001
Words:  13001


## Esercizi

1.   Modificare l'esercizio precedente di lettura testi per calcolare anche il numero di caratteri alfabetici trovati
2.   Aggiungere all'esercizio precedente di lettura testi una classe o funzione ProducerKeyboard che riceva le righe di testo in input da tastiera anziché da file. L'input termina quando si inserisce una riga composta solo del carattere "."
3.   Scrivere un programma produttore-consumatore in cui il produttore genera una sequenza di numeri interi random compresi tra 1 e 100 e il consumatore deve determinare minimo, massimo e valore medio.
4.   Scrivere un programma produttore-consumatore in cui il produttore genera una sequenza di numeri interi random compresi tra 1 e 100 e il consumatore deve determinare il numero che compare con maggior frequenza.



### Esercizio svolto: simulazione della coda a uno o più sportelli

Si voglia simulare per un certo periodo di tempo la coda che si formerebbe agli sportelli di un supermercato/ufficio.

Il programma deve simulare l'ingresso di una persona ogni x (random) secondi e assegnare a questa un numero e metterla in coda.

Ci sono poi una o più casse che, quando libere, ricevono una persona dalla coda e la servono per un tempo random di y secondi.

Il programma deve mostrare ad ogni aggiornamento la situazione delle casse e della coda.

Questo è un problema che si presta molto bene ad essere gestito in multithreading o multiprocessing in quanto l'attività svolta dai movimenti all'ingresso e dai singoli sportelli avviene in contemporanea.

E' inoltre un classico problema di produttore/consumatore. Il thread produttore è quello che simula gli ingressi in coda, il/i thread consumatore sono invece quelli che gestiscono ogni singola cassa. 

In questo caso abbiamo, a differenza degli esercizi precedenti sull'analisi di testi, un solo produttore e uno o più consumatori, quindi possiamo anche usare una variabile semplice booleana fine per terminare le operazioni.



Per iniziare definiamo gli import di cui abbiamo bisogno:

In [None]:
from threading import Thread, Lock
from time import time, sleep
from random import random
from queue import Queue

Il modulo random ci metterà a disposizione la funzione **random()** che restituisce un float compreso tra 0 e 1. Useremo questa funzione per calcolare ogni quanto tempo far entrare un cliente in coda e per quanto tempo ogni cliente verrà servito allo sportello (ad esempio potremo assegnare un tempo casuale di 5 secondi nel primo caso e 10 secondi per gli sportelli).

La funzione sleep ci servirà per sospendere il thread prima di far entrare un cliente o passare al cliente successivo.

La coda (classe Queue) invece conterrà i numeri assegnati a ciascun cliente che entra.

Poiché ci sono diversi parametri da impostare e variabili da gestire, questo esercizio è forse più opportuno svolgerlo con l'uso delle classi derivate da Thread.

Vediamo la classe produttore, che potremo ora chiamare Ingressi:



In [None]:
class Ingressi(Thread):      # thread produttore
    tempo_totale        = 60 # facciamo eseguire la simulazione per 60 secondi di default
    numero_assegnato    = 0  # numero assegnato all'ultimo cliente entrato
    tempo_max_ingresso  = 5  # numero max di secondi entro cui entra un cliente
    coda                = None # oggetto coda condiviso con il consumer
    
    def __init__(self, coda, tempo_max_ingresso=5, tempo_totale=60):
        super().__init__() # ricordare di richiamare sempre il costruttore della classe padre (Thread)
        self.tempo_totale = tempo_totale
        self.tempo_max_ingresso = tempo_max_ingresso
        self.coda = coda

    def run(self):
        global FINE
        global lock
        start = time() # segniamo il tempo iniziale per calcolare il tempo_totale assegnato
        while time()-start < self.tempo_totale: # eseguiamo per tempo_totale secondi
            sleep(random()*self.tempo_max_ingresso+1) # sospendiamo il thread in attesa di un cliente
            self.numero_assegnato += 1 # assegniamo un numero al nuovo cliente
            self.coda.put(self.numero_assegnato) # mettiamo in coda il cliente
            with lock:
                print(f"Ingresso nuovo cliente al secondo {time()-start:2.0f}: {self.numero_assegnato:3}. Clienti in coda {self.coda.qsize():3}")
        print(f"Chiusura ingresso")
        FINE = True


Il nucleo della classe Ingressi è tutto nel metodo run() che viene esguito quando facciamo partire il Thread con start().

Tutto quello che deve fare è simulare per un certo periodo di tempo l'ingresso dei clienti e il loro inserimento in coda.

Esguendo solo questa classe si avrà un output simile al seguente:

In [None]:
FINE = False
coda = Queue()
ingressi = Ingressi(coda)
ingressi.start()
ingressi.join()


Ingresso nuovo cliente al secondo  3:   1. Clienti in coda   1
Ingresso nuovo cliente al secondo  8:   2. Clienti in coda   2
Ingresso nuovo cliente al secondo 13:   3. Clienti in coda   3
Ingresso nuovo cliente al secondo 15:   4. Clienti in coda   4
Ingresso nuovo cliente al secondo 20:   5. Clienti in coda   5
Ingresso nuovo cliente al secondo 24:   6. Clienti in coda   6
Ingresso nuovo cliente al secondo 28:   7. Clienti in coda   7
Ingresso nuovo cliente al secondo 31:   8. Clienti in coda   8
Ingresso nuovo cliente al secondo 34:   9. Clienti in coda   9
Ingresso nuovo cliente al secondo 39:  10. Clienti in coda  10
Ingresso nuovo cliente al secondo 45:  11. Clienti in coda  11
Ingresso nuovo cliente al secondo 46:  12. Clienti in coda  12
Ingresso nuovo cliente al secondo 48:  13. Clienti in coda  13
Ingresso nuovo cliente al secondo 53:  14. Clienti in coda  14
Ingresso nuovo cliente al secondo 58:  15. Clienti in coda  15
Ingresso nuovo cliente al secondo 60:  16. Clienti in c

Passando ora alla classe Consumatore, il lavoro che questa dovrà svolgere sarà di estrarre un elemento dalla coda (il cliente) e servirlo per un tempo sempre casuale. Dovrà fare questa operazione sino a che non riceverà il segnale True dalla variabile globale FINE e non si sarà esaurita la coda.

Questa classe potremo chiamarla Sportello, proprio perché deve gestire le operazioni di un singolo sportello. Se decidiamo di attivare più sportelli basterà attivare più istanze della classe.

In [None]:
class Sportello(Thread): # thread consumatore
    numero_sportello = 0
    max_tempo_sportello = 10
    coda = None

    def __init__(self, numero_sportello, coda, max_tempo_sportello=10 ):
        super().__init__()
        self.numero_sportello = numero_sportello
        self.coda = coda
        self.max_tempo_sportello = max_tempo_sportello

    def run(self):
        global FINE
        global lock
        while not FINE or not self.coda.empty():
            if not self.coda.empty():
                numero_assegnato = self.coda.get() # estraiamo il primo cliente in coda
                tempo_assegnato = random() * self.max_tempo_sportello +1 # gli assegniamo un tempo allo sportello
                with lock:
                    print(f"Sportello {self.numero_sportello}: Servo il cliente {numero_assegnato:3} per {tempo_assegnato:2.0f}sec. Clienti in coda: {self.coda.qsize():2}")
                sleep(tempo_assegnato) # sospendiamo il thread in modo che possa completare il servizio al cliente
        print(f"Chiusura sportello {self.numero_sportello}")

A questo punto siamo pronti per far partire la simulazione completa:

In [None]:
FINE = False
lock = Lock() # questo lo usiamo per gestire meglio l'output
coda = Queue()
ingressi = Ingressi(coda)
sportello = Sportello(1, coda)

ingressi.start()
sportello.start()

ingressi.join()
sportello.join()

Ingresso nuovo cliente al secondo  4:   1. Clienti in coda   1
Sportello 1: Servo il cliente   1 per  8sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 10:   2. Clienti in coda   1
Sportello 1: Servo il cliente   2 per  4sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 15:   3. Clienti in coda   1
Sportello 1: Servo il cliente   3 per 11sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 20:   4. Clienti in coda   1
Ingresso nuovo cliente al secondo 22:   5. Clienti in coda   2
Sportello 1: Servo il cliente   4 per  3sec. Clienti in coda:  1
Ingresso nuovo cliente al secondo 28:   6. Clienti in coda   2
Sportello 1: Servo il cliente   5 per  9sec. Clienti in coda:  1
Ingresso nuovo cliente al secondo 32:   7. Clienti in coda   2
Ingresso nuovo cliente al secondo 37:   8. Clienti in coda   3
Sportello 1: Servo il cliente   6 per  5sec. Clienti in coda:  2
Ingresso nuovo cliente al secondo 42:   9. Clienti in coda   3
Ingresso nuovo cliente al secondo 44:  10. 

Eseguendo il codice possiamo notare che con un solo sportello aperto si può formare una coda piuttosto lunga di clienti in attesa di essere serviti, tanto che allo scadere dei 60 secondi di simulazione nell'esempio mostrato rimagono diversi clienti che essendo già in coda e dovendo essere serviti, allungano i tempi totali di esecuzione bel oltre i 60 secondi.

Possiamo quindi provare a simulare la presenza di più sportelli, istanziando più classi Sportello e verificando se le coda migliora:

In [None]:
FINE = False
lock = Lock()
coda = Queue()
ingressi = Ingressi(coda)
sportelli = []

for i in range(3): # attiviamo 3 sportelli
    sportello = Sportello(i+1, coda)
    sportelli.append(sportello)

ingressi.start()
for s in sportelli: s.start()

ingressi.join()
for s in sportelli: s.join()

Ingresso nuovo cliente al secondo  5:   1. Clienti in coda   1
Sportello 1: Servo il cliente   1 per  5sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo  8:   2. Clienti in coda   1
Sportello 3: Servo il cliente   2 per  2sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 13:   3. Clienti in coda   1
Sportello 2: Servo il cliente   3 per  3sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 18:   4. Clienti in coda   1
Sportello 3: Servo il cliente   4 per 10sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 20:   5. Clienti in coda   1
Sportello 2: Servo il cliente   5 per  7sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 25:   6. Clienti in coda   1
Sportello 1: Servo il cliente   6 per  1sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 27:   7. Clienti in coda   1
Sportello 1: Servo il cliente   7 per  6sec. Clienti in coda:  0
Ingresso nuovo cliente al secondo 30:   8. Clienti in coda   1
Sportello 2: Servo il cliente   8 per 10s

Con 3 sportelli attivi la simulazione mostra una coda decisamente migliorata, con pochissimi clienti in attesa e uno sforamento del tempo totale di pochi secondi.

### Esercizio Sportello: varianti

Modificare l'esercizio dello sportello con le seguenti varianti/aggiunte:

1.   Stampare alla fine il numero massimo di cienti in coda
2.   Stampare alla fine il tempo medio di attesa dei clienti in coda, prima di essere serviti allo sportello. Per fare questo si potrebbe registrare in coda non solo il numero ma anche il minuto di ingresso. Al momento del servizio si calcola quanto tempo il cliente ha atteso e si procede con i calcoli della media.
3.   Modificare il programma in modo che possa gestire più code (immaginate all'Ufficio Postale in cui c'è una coda per i pacchi e una per i servizi finanziari). Il produttore deve quindi assegnare il numero ad una coda scelta a caso tra quelle presenti e lo sportello deve avere assegnata una coda specifica.


