# Calcolo parallelo

# Indice

1. [Web scraping](#scraping)<br>
2. [Programmazione sequenziale](#sequenziale)<br>
3. [Programmazione concorrente - *multithreading*](#multithreading)<br>
    3.1 [La classe `Thread`](#thread)<br>
    3.2 [Confrontare i tempi di esecuzione](#tempi)<br>
4. [Programmazione concorrente - *multiprocessing*](#multiprocessing)<br>
    4.1 [La classe `Process`](#process)<br>
    4.2 [La classe `Pool`](#pool)<br>
5. [Scegliere gli iperparametri ottimali](#iperparametri)<br>

In [None]:
import inspect
import multiprocessing as mp
import numpy as np
import os
import pandas as pd
import threading
import time


%load_ext autoreload
%autoreload 2

# 1. Web scraping <a id=scraping> </a>

In [None]:
from msbd.scraping import ottieni_contenuto_url

print(inspect.getsource(ottieni_contenuto_url))

### Estratte il contenuto di una pagina

In [None]:
contenuto = ottieni_contenuto_url("https://www.didattica.unipd.it/off/2016/LT/SC/SC2094/000ZZ/SCP4063754/N0")

In [None]:
N = 5

righe_non_vuote = [c for c in contenuto.split("\n") if c.strip()]

print("Prime {} righe non vuote:\n".format(N) + ("\n{}" * N).format(*righe_non_vuote[:N]))

# 2. Programmazione sequenziale <a id=sequenziale> </a>

In [None]:
URLS = [
    "https://www.didattica.unipd.it/off/2016/LT/SC/SC2094/000ZZ/SCP4063754/N0",
    "https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python",
    "https://docs.python.org/3.6/library/threading.html",
    "https://docs.python.org/3.6/library/multiprocessing.html",
]

### Esercizio

Utilizzare *list comprehension* (o un ciclo *for*) e la funzione `ottieni_contenuto_url()` per completare la funzione `ottieni_contenuto_urls_sequenziale()` in `msbd/modello_lineare/scraping.py`. Il risultato della funzione deve essere la lista dei contenuti degli URL passati come argomento.

In [None]:
from msbd.scraping import ottieni_contenuto_urls_sequenziale

print(inspect.getsource(ottieni_contenuto_urls_sequenziale))

In [None]:
inizio = time.time()

contenuti = ottieni_contenuto_urls_sequenziale(URLS)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

# 3. Programmazione concorrente - *multithreading* <a id=multithreading> </a>

## 3.1 La classe [Thread](https://docs.python.org/3.6/library/threading.html#threading.Thread) <a id=thread> </a>

In [None]:
from msbd.scraping import ottieni_contenuto_urls_threading

print(inspect.getsource(ottieni_contenuto_urls_threading))

In [None]:
inizio = time.time()

contenuti = ottieni_contenuto_urls_threading(URLS)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

### Esercizio

Descrivere cosa è cambiato rispetto all'esecuzione sequenziale.

## 3.2 Confrontare i tempi di esecuzione <a id=tempi> </a>

### Tempo di esecuzione approccio sequenziale

In [None]:
%timeit -r 7 -n 1 ottieni_contenuto_urls_sequenziale(URLS, verboso=False)

#### Alternativa:

In [None]:
import timeit

REPEAT = 7
NUMBER = 1

tempi = timeit.repeat("ottieni_contenuto_urls_sequenziale(URLS, verboso=False)", 
                      setup="from __main__ import ottieni_contenuto_urls_sequenziale, URLS",
                      repeat=REPEAT, number=NUMBER)

print("{:.2f} s ± {:.0f} ms per loop (mean ± std. dev. of {} runs, {} loop each)".format(
    np.mean(tempi), 1000 * np.std(tempi), REPEAT, NUMBER))

### Tempo di esecuzione approccio multithreading

In [None]:
%timeit -r 7 -n 1 ottieni_contenuto_urls_threading(URLS, verboso=False)

# 4. Programmazione concorrente - *multiprocessing* <a id=multiprocessing> </a>

In [None]:
N_CPU = mp.cpu_count()

print("# CPU: {}".format(N_CPU))

## 4.1 La classe [Process](https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#multiprocessing.Process) <a id=process> </a>

### Esercizio

Completare la funzione `ottieni_contenuto_urls_multiprocessing()` in `msbd/modello_lineare/scraping.py`.

> Suggerimento: imitare la funzione `ottieni_contenuto_urls_threading()` sostituendo alla classe `Thread` contenuta nella libreria `threading` la classe `Process` contenuta nella libreria `multiprocessing` (già importata in `scraping.py` come `mp`).

In [None]:
from msbd.scraping import ottieni_contenuto_urls_multiprocessing

print(inspect.getsource(ottieni_contenuto_urls_multiprocessing))

In [None]:
inizio = time.time()

contenuti = ottieni_contenuto_urls_multiprocessing(URLS)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

### Esercizio

1. Descrivere le differenze rispetto al risultato ottenuto con l'approccio sequenziale;
2. Descrivere le differenze rispetto al risultato ottenuto con l'approccio multi thread.

### Esercizio

Utilizzare il comando magico `%timeit` per calcolare il tempo di esecuzione di `ottieni_contenuto_urls_multiprocessing(URLS, verboso=False)`

In [None]:
# ============== YOUR CODE HERE ==============
raise NotImplementedError
# ============================================

## 4.2 La classe [Pool](https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#multiprocessing.pool.Pool) <a id=pool> </a>

In [None]:
inizio = time.time()

pool = mp.Pool(processes=N_CPU)
contenuti = [pool.map(ottieni_contenuto_url, URLS)]
pool.close()
pool.join()

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

In [None]:
from nltk.tokenize import TweetTokenizer
from nltk.stem.snowball import SnowballStemmer

In [None]:
# leggere il data set
tweets = pd.read_csv("datasets/twitter/train.csv", encoding="latin")["SentimentText"].tolist()
# creare il tokenizer
tokenizer = TweetTokenizer(preserve_case=False, reduce_len=True, strip_handles=True)
# creare lo stemmer
stemmer = SnowballStemmer("english")
# creare una funzione per dividere il tweet in token ridotti alla radice
def tweet_analyzer(tweet): return [stemmer.stem(t) for t in tokenizer.tokenize(tweet)]

### Approccio sequenziale

In [None]:
inizio = time.time()

tweets_preproc = [tweet_analyzer(tweet) for tweet in tweets]

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

print(tweets_preproc[42])

### Approccio parallelo

### Esercizio

Utilizzare la classe `Pool`per parallelizzare l'analisi dei tweet.

In [None]:
inizio = time.time()

# ============== YOUR CODE HERE ==============
raise NotImplementedError
tweets_preproc = []
# ============================================

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

print(tweets_preproc[42])

### Linee guida generali sulla scelta della classe più appropriata

>1. **Thread**: numero di task medio-basso, molte operazioni di I/O, utilizzo della CPU relativamente basso;
>2. **Process**: numero di task medio-basso, utilizzo intensivo della CPU;
>3. **Pool**: numero di task alto, utilizzo intensivo della CPU.

# 5. Scegliere gli iperparametri ottimali <a id=iperparametri> </a>

In [None]:
from msbd.preprocessamento import OttenereDummy
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.model_selection import ParameterGrid
from sklearn.model_selection import ShuffleSplit
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier

In [None]:
# leggere il data set
dati = pd.read_csv("datasets/titanic/train.csv")
# dividere la X dalla y
X, y = dati.drop(columns="Survived").copy(), dati["Survived"].copy()
# definire una pipeline di classificazione
clf = Pipeline([
    ("ottenere_dummy", OttenereDummy(drop_first=True)),
    ("imputer", SimpleImputer(strategy="mean")), 
    ("tree", DecisionTreeClassifier())
])
# griglia su cui eseguire la ricerca
griglia = {
    'tree__max_depth': np.arange(1, 18),
    'tree__min_samples_leaf': 2 ** np.arange(9),
}
# dividiere i dati in training e test
splitter = ShuffleSplit(n_splits=1, test_size=0.25, random_state=42)
train_indices, val_indices = next(splitter.split(X, y))
X_train, y_train = X.iloc[train_indices], y.iloc[train_indices]
X_val, y_val = X.iloc[val_indices], y.iloc[val_indices]

### Approccio sequenziale

### Esercizio

Effetturare una *grid search* come visto nel notebook [Alberi di decisione](11_alberi_di_decisione.ipynb).

In [None]:
inizio = time.time()

risultati = []

# ============== YOUR CODE HERE ==============
raise NotImplementedError
# ============================================

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

risultati = pd.DataFrame(risultati)
risultati.sort_values("accuracy_score", ascending=False, inplace=True)
risultati.reset_index(drop=True, inplace=True)

risultati.head(5)

### Approccio parallelo

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
inizio = time.time()

gscv = GridSearchCV(
    estimator=clf, 
    param_grid=griglia,
    scoring="accuracy",
    cv=splitter, 
    n_jobs=N_CPU, 
    return_train_score=False
)

gscv.fit(X, y)

fine = time.time()
print("Durata: {:.2f}s".format(fine - inizio))

risultati = pd.DataFrame(gscv.cv_results_)
risultati.sort_values("split0_test_score", ascending=False, inplace=True)
risultati.reset_index(drop=True, inplace=True)

risultati.head(5)