<a href="https://colab.research.google.com/github/lsteffenel/hpc-python/blob/master/dask/introduction_dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![fig_dask](https://miro.medium.com/max/1000/1*D6mSsdWECFLn6wJne4VTjg.png)

# <font color="red">Que représente Dask ?</font>

- Une bibliothèque flexible pour le calcul parallèle en Python qui facilite la construction de workflows intuitifs pour l'ingestion et l'analyse de grands ensembles de données distribués.
- Un outil d'analyse parallèle natif conçu pour s'intégrer parfaitement avec NumPy, Pandas et Scikit-Learn.
- Une bibliothèque de parallélisation *out-of-core* (les données sont lues en mémoire depuis le disque au fur et à mesure des besoins) qui s'intègre parfaitement aux structures de données NumPy et Pandas existantes pour répondre aux besoins suivants :
     * **L'ensemble de données disponible ne tient pas en mémoire d'une seule machine.**
     * **La tâche de traitement des données est chronophage et doit être mise à l'échelle et accélérée.**
- Orchestre des threads ou processus parallèles pour nous et accélère les temps de traitement.
   - Fonctionne en distribuant les gros calculs et en les décomposant en plus petits calculs via un ordonnanceur de tâches et des travailleurs de tâches.

Dask est composé de plusieurs composants et API différents, qui peuvent être catégorisés en trois couches : l'ordonnanceur, les API de bas niveau et les API de haut niveau.

- Dask fournit quelques constructions de haut niveau appelées Dask Bags, Dask DataFrames et Dask Arrays. Elles offrent une interface facile à utiliser pour paralléliser de nombreuses transformations de données typiques dans les workflows d'apprentissage automatique (ML).
- Dask permet la création de graphes d'exécution de tâches hautement personnalisés grâce à leur API Python étendue (par ex., `dask.delayed`) et à l'intégration avec les structures de données existantes.

![fig_layers](http://bicortex.com/bicortex/wp-content/post_content//2019/06/Dask_APIs_Architecture.png)  
**Source de l'image** : bicortex.com

Le diagramme ci-dessous décrit les étapes suivies par Dask pour manipuler les données.

- L'opération est décomposée en une séquence d'opérations sur des partitions plus petites de nos données (sans avoir à charger tout l'ensemble de données en mémoire).
- Dask lit chaque partition au fur et à mesure des besoins et calcule les résultats intermédiaires.
- Les résultats intermédiaires sont agrégés dans le résultat final.
- Dask gère toute cette séquence en interne pour nous.
- Sur une seule machine, Dask peut utiliser des threads ou des processeurs pour paralléliser ces opérations.

![fig_proc](https://www.nvidia.com/content/dam/en-zz/Solutions/glossary/data-science/dask/dask-pic1.png)  


**Avantages de l'utilisation de Dask**

- Entièrement implémenté en Python et met nativement à l'échelle NumPy, Pandas et scikit-learn.
- Peut être utilisé efficacement pour travailler avec des ensembles de données moyens sur une seule machine et de grands ensembles de données sur un cluster.
- Peut être utilisé comme un framework général pour paralléliser la plupart des objets Python.
- Présente une configuration et un coût de maintenance très faibles.

>Dask fournit des collections de haut niveau Array, Bag et DataFrame qui imitent NumPy, les listes et Pandas mais peuvent opérer en parallèle sur des ensembles de données qui ne rentrent pas en mémoire principale. Les collections de haut niveau de Dask sont des alternatives à NumPy et Pandas pour les grands ensembles de données.


**Rappel sur les processus et les threads**

- Un **processus** est une exécution de programme.
- Un **thread** est une séquence d'exécution unique au sein du processus.
- Un processus peut contenir plusieurs threads.
- Les threads sont utilisés pour des tâches légères, tandis que les processus sont utilisés pour des tâches plus « lourdes ».


**Python classique ne peut exécuter qu'un seul thread à la fois.**

>Dask offre une méthode facile et cohérente pour paralléliser les calculs qui s'étend d'un simple ordinateur portable à des clusters de milliers de cœurs. Il repose sur un ordonnanceur de tâches qui distribue les appels de fonctions Python sur plusieurs threads, processus ou nœuds de cluster.


![threads](https://pediaa.com/wp-content/uploads/2018/07/Difference-Between-Process-and-Thread-Comparison-Summary-684x1024.jpg)  
**Source de l'image** : pediaa.com


### Installation / importation de modules


In [None]:
!python -m pip install dask[complete]

In [None]:
!pip install memory_profiler

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import dask
import dask.array as da
import dask.dataframe as dd
from dask.diagnostics import ProgressBar

In [None]:
print(f"Numpy version:  {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Dask   version: {dask.__version__}")

In [None]:
from memory_profiler import memory_usage
import memory_profiler
%load_ext memory_profiler

## Détermination des informations du système

In [None]:
import math
def convert_size(size):
    """
      Convert from KB to another unit.
    """
    if (size == 0):
       return '0B'
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size,1024)))
    p = math.pow(1024,i)
    s = round(size/p,2)
    return " ".join([str(s),size_name[i]])

In [None]:
import platform
import psutil

print("="*20, "System Information", "="*20)
uname = platform.uname()
print(f"           System: {uname.system}")
print(f"        Node Name: {uname.node}")
print(f"          Release: {uname.release}")
print(f"          Version: {uname.version}")
print(f"          Machine: {uname.machine}")
print(f"        Processor: {uname.processor}")
print("="*20, "CPU Information", "="*20)
cpufreq = psutil.cpu_freq()
print("# logical cores = # physical cores times # threads ")
print("                    that can run on each physical core.")
print(f"   Physical cores: {psutil.cpu_count(logical=False)}")
print(f"    Logical cores: {psutil.cpu_count(logical=True)}")
print(f"Current frequency: {psutil.cpu_freq().current}")
print(f"    Min frequency: {psutil.cpu_freq().min}")
print(f"    Max frequency: {psutil.cpu_freq().max}")
print("="*20, "Memory Information", "="*20)
svmem = psutil.virtual_memory()
print(f"     Total memory: {convert_size(svmem.total)}")
print(f" Available memory: {convert_size(svmem.available)}")
svmem = psutil.virtual_memory()
print("="*60)

Sur Colab on a généralement 1 coeur physique (2 coeurs logiques). Ça permettra de faire quelques tests de parallélisme, mais limités. Si vous lancez le code sur votre propre machine, vous pouvez obtenir des résultats bien plus intéressants.

### Configuration de la barre de progression

- Vous pouvez utiliser la barre de progression intégrée de Dask pour suivre l'avancement de n'importe quel appel `get()` ou `compute()`.
- Ici, nous utiliserons l'enregistrement global où la barre de progression s'affichera pour tous les calculs.


In [None]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

# <font color="red">Paralléliser le code avec `dask.delayed`</font>

- Une méthode simple pour paralléliser le code.
- Permet aux utilisateurs de retarder les appels de fonctions dans un graphe de tâches avec dépendances.
- Des systèmes comme `dask.dataframe` sont construits avec `dask.delayed`.


**Exemple Simple**

Considerez les fonctions suivantes :

In [None]:
import time

def increment(x):
    time.sleep(1.0)
    return x + 1

def double(x):
    time.sleep(1.0)
    return 2 * x

def add(x, y):
    time.sleep(1.0)
    return x + y

In [None]:
%%time

x = increment(1)
y = increment(2)
z = add(x, y)

Regardez surtout le temps d'exécution (**Wall time**). Nous allons utiliser ça comme paramètre pour les comparaisons futures.

Pour paralléliser ce code en Dask, nous allons utiliser le décorateur `dask.delayed` sur les fonctions `increment` et `add`.
- En décorant les fonctions, nous enregistrons ce que nous voulons calculer sous forme de tâches dans des graphes qui seront exécutés plus tard sur du matériel parallèle.


In [None]:
xd = dask.delayed(increment)(1)
yd = dask.delayed(increment)(2)
zd = dask.delayed(add)(xd, yd)
zd

- Quand nous appelons la version retardée en passant les arguments, exactement comme avant, la fonction originale n'est pas encore exécutée.
- Un objet *delayed* est créé, qui garde la trace de la fonction à appeler et des arguments à lui passer.
- Nous utilisons la méthode `visualize` (qui repose sur le package `graphviz`) qui fournit une représentation visuelle des opérations effectuées.

**ATTENTION** : la création des *delayed* ne lance pas automatiquement leur exécution.

In [None]:
zd.visualize(rankdir='LR')

- Notez que nous n'avons pas encore calculé **total** physiquement.
- Nous devons appliquer la méthode `compute` pour obtenir le résultat.
- <font color="red">C'est seulement à ce moment que les données sont chargées en mémoire pour les calculs</font>.
- Les calculs sont effectués en utilisant un pool de threads local.

In [None]:
%%time
dask.compute(zd)

**Utilisation de `delayed` dans des boucles**

Considérez le code séquentiel avec deux boucles *for* :


In [None]:
%%time

n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = increment(x)
    z = double(y)
    out.append(z)

total = 0
for z in out:
    total = add(total, z)

total

Nous pouvons paralléliser le code ci-dessus en utilisant le décorateur `delayed` :


In [None]:
n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = dask.delayed(increment)(x)
    z = dask.delayed(double)(y)
    out.append(z)

totald = 0
for z in out:
    totald = dask.delayed(add)(totald, z)

totald

Nous pouvons aussi obtenir la représentation visuelle via un graphe de tâches.


In [None]:
totald.visualize()

In [None]:
%%time
dask.compute(totald)

### Exercice 1

Utilisez le décorateur `delayed` pour paralléliser le code ci-dessous :


In [None]:
def is_odd(x):
    return x%2

In [None]:
%%time

n = 10
data = [i+1 for i in range(n)]

results = list()

for x in data:
    if is_odd(x):
        y = double(x)
    else:
        y = increment(x)
    results.append(y)

total = sum(results)
print(total)

In [None]:
#VOTRE CODE ICI

### Exemple : Mots palindromiques

- Un mot palindromique est un mot dont les caractères se lisent de la même façon à l'envers qu'à l'endroit.
- Quelques exemples de palindromes sont `redivider`, `deified`, `civic`, `radar`, `level`, `rotor`, `kayak`, `reviver`, `racecar`, `madam`, et `refer`.

Nous voulons trouver le nombre de palindromes dans une liste de mots.


In [None]:
def is_palindrome(s):
    return s.upper() == s.upper()[::-1]

In [None]:
list_words = [
    'complete', 'abstraction', 'from', 'compass', 'sights', 'sounds',
    'Human', 'shapes', 'interferences', 'troubles', 'joys', 'were',
    'they', 'were', 'there', "man", 'seemed', 'shaded', 'hemisphere',
    'globe', 'sentient', 'being', 'save', 'himself', "rather",
    "Abba", "Aibohphobia", "Bib", "Bob", "Civic", "Deified",
    "Detartrated", "Dewed", "Eve", "Hannah", "Kayak", "Level",
    "Madam", "Malayalam", "Minim", "Mom", "Murdrum", "Noon", "Nun",
    "Otto", "Peep", "Pop", "Racecar", "Radar", "Redder", "Refer",
    "Repaper", "Rotator", "Rotavator", "Rotor", "Sagas",
    "Sis", "Solo", "Stats", "Tattarrattat", "Tenet",
    'redivider', 'deified', 'civic', 'radar', 'level',
    'Being', 'not', 'without', 'frequent', 'consciousness',
    'that', 'there', 'was', 'some', 'charm', 'this', 'life', 'stood',
    'still', 'after', 'looking', 'sky', 'useful', 'instrument',
    'regarded', 'appreciative', 'spirit', 'work', 'art',
    'superlatively', 'beautiful', 'moment', 'seemed',
    'impressed', 'with', 'speaking', 'loneliness', 'scene',
    "brother", "system", "SISteR", "TEXT", "paREnts", "python",
    "Numpy", "Dask", "PanDaS"
]

len(list_words)

**Code simple en Python**

In [None]:
%%time
palindromes_py = [is_palindrome(s) for s in list_words]
total_py = sum(palindromes_py)
total_py

**Avec Dask delayed**

In [None]:
palindromes_da = [dask.delayed(is_palindrome)(s) for s in list_words]
total_da = dask.delayed(sum)(palindromes_da)

In [None]:
total_da.visualize()

In [None]:
%%time
result = total_da.compute()
result

Quelle horreur, Dask semble bien plus lent !!!
La raison est que le traitement avec `delayed` n'a pas la bonne granularité.

On peut essayer d'utiliser une structure de données propre à Dask (les `bag`), our voir si ça peut s'améliorer.

Si nous utilisons Dask Bag, nous effectuons les mêmes calculs plus rapidement :


In [None]:
import dask.bag as db
bag = db.from_sequence(list_words)
bag.map(is_palindrome).visualize()

In [None]:
%time
result= sum(bag.map(is_palindrome).compute())
result

**<font color="red">Leçons à retenir</font>**

- Le décorateur `delayed` ajoute une surcharge.
- Il est préférable de ne pas l'utiliser quand une tâche nécessite peu de temps.
- Appelez `delayed` sur la fonction et non sur le résultat.
- Décomposez les calculs en de nombreuses pièces. Vous obtenez le parallélisme en ayant de nombreux appels *delayed*, et non en utilisant un seul : Dask ne regardera pas à l'intérieur d'une fonction décorée avec `delayed` pour paralléliser ce code en interne.


### Exercice 2

Utilisez Dask pour paralléliser le code ci-dessous (calculs de `pi`) :


In [None]:
%%time

import random

def approximate_pi(num_samples):
    num_points_circ = 0

    for i in range(num_samples):
        # Select an arbitrary point in [-1,1]x[-1,1]
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)

        # Check if the point is inside the circle
        if x**2 + y**2 < 1.0:
            num_points_circ += 1

    return 4 * num_points_circ / num_samples

def mean(*args):
    return sum(args) / len(args)

number_samples = 10**6
number_experiments = 10

pi_approx = mean(*[approximate_pi(number_samples) for i in range(number_experiments)])

print("Approximation of Pi: {}".format(pi_approx))

In [None]:
#VOTRE CODE ICI

# <font color="red">Dask Array</font>

- Les tableaux Dask coordonnent de nombreux tableaux NumPy, organisés en **chunks** dans une grille.
    - **Parallèle** : Utilise tous les cœurs de votre ordinateur
    - **Plus grand que la mémoire** : Permet de travailler sur des ensembles de données plus grands que la mémoire disponible en décomposant votre tableau en de nombreuses petites pièces, en opérant sur ces pièces dans un ordre qui minimise l'empreinte mémoire de votre calcul, et en fluxant efficacement les données depuis le disque.
    - **Algorithmes par blocs** : Effectue de gros calculs en réalisant de nombreux plus petits calculs
- Ils supportent un large sous-ensemble de l'API NumPy.

![fig_array](https://miro.medium.com/max/1388/1*JfQnXJ5_R104bPyE8_XhwQ.png)


**Créer un tableau Dask**

- Créez un tableau 20000x20000 de nombres aléatoires, représenté par de nombreux tableaux NumPy de taille 1000x1000 (ou plus petits si le tableau ne peut pas être divisé uniformément).
- Il y a 400 (20x20) tableaux NumPy de taille 1000x1000.


In [None]:
x = da.random.random((10000, 40000), chunks=(1000, 1000))
x

Le tableau :
- Fait 2,98 Gb
- Est organisé en 400 **chunks** de tableaux NumPy `1000x1000`.
- Chaque chunk fait 7,64 Mb

Des informations similaires peuvent être obtenues via :


In [None]:
print(f"     Type: {type(x)}")
print(f"    Shape: {x.shape}")
print(f"     Size: {x.size}")
print(f"Num bytes: {x.nbytes} B or {convert_size(x.nbytes)}")
print(f"   Chunks: {x.chunks}")

Nous pouvons utiliser la syntaxe NumPy :


In [None]:
y = 2.0 + x.T
y.shape

In [None]:
mu = x.mean(axis=0)
mu

In [None]:
z = y[::2, 5000:].mean(axis=1)
z

In [None]:
z.visualize(rankdir="LR")

Utilisez la fonction **`compute()`** si vous voulez votre résultat sous forme de tableau NumPy.


In [None]:
mu[0].compute()

In [None]:
w = z.compute()
print(type(w), w.shape )

**Faire persister les données en mémoire**

- Si vous disposez de la RAM disponible pour votre jeu de données, vous pouvez persister les données en mémoire.
- Cela permet aux calculs futurs d'être beaucoup plus rapides.


In [None]:
%time y.sum().compute()

In [None]:
y = y.persist()

In [None]:
%time y[0, 0].compute()

In [None]:
%time y.sum().compute()

**Usage mémoire Numpy versus Dask**

In [None]:
def f_numpy():
    x = np.random.normal(10, 0.1, size=(20000, 20000))
    y = x.mean(axis=0)[::100]

`%%memit`

- Mesure l'utilisation mémoire d'une seule instruction.
- Fournit la mémoire maximale et la croissance incrémentielle de la mémoire


In [None]:
%%memit
f_numpy()

In [None]:
%%time
f_numpy()

In [None]:
def f_dask():
    x = da.random.normal(10, 0.1, size=(20000, 20000),
                         chunks=(1000, 1000))
    y = x.mean(axis=0)[::100].compute()

In [None]:
%%memit
f_dask()

In [None]:
%%time
f_dask()

On voit que Dask utilise moins de mémoire, mais son temps d'exécution n'est pas si impressionnant.

Redimensionner la taille des chunks peut améliorer les performances :


In [None]:
def f_dask2():
    x = da.random.normal(10, 0.1, size=(20000, 20000),
                         chunks=(2000, 500))
    y = x.mean(axis=0)[::100].compute()

In [None]:
%%time
f_dask2()

**Dask a terminé plus rapidement, mais a utilisé plus de temps CPU total car Dask a pu paralléliser de manière transparente le calcul grâce à la taille des chunks.**


**<font color="red">Points à considérer</font>**

- Si vos données tiennent en RAM et que vous n'êtes pas limité par les performances, alors utiliser NumPy peut être le bon choix. Dask ajoute une couche de complexité supplémentaire qui peut gêner.
- **Si vous cherchez seulement des accélérations plutôt que de la scalabilité, envisagez d'utiliser Numba pour manipuler les tableaux NumPy.**
- Comment choisir la taille des chunks ?
     - Trop petits : surcharge importante.
     - Mal alignés avec les données : lecture inefficace.
     - Il est recommandé d'avoir une taille de chunk d'au moins 100 Mb.
     - Choisissez une taille de chunk suffisamment grande pour réduire le nombre de chunks que Dask doit gérer (ce qui affecte la surcharge) mais aussi assez petite pour que plusieurs puissent tenir en mémoire simultanément. Dask aura souvent autant de chunks en mémoire que deux fois le nombre de threads actifs.

**Éviter la sur-souscription des threads**
     
- Par défaut, Dask exécute autant de tâches concurrentes que vous avez de cœurs logiques.
- Il suppose que chaque tâche consomme environ un cœur.
- Beaucoup de bibliothèques de calcul (utilisées par Dask) sont elles-mêmes multi-threadées, ce qui peut causer des conflits et de mauvaises performances.
- Pour de meilleures performances, nous devons spécifier explicitement l'utilisation d'un seul thread :

```bash
   export OMP_NUM_THREADS=1
   export MKL_NUM_THREADS=1
   export OPENBLAS_NUM_THREADS=1


## <font color="red">Profilage mémoire</font>

- Nous utilisons le package `memory_profiler` pour suivre l'utilisation mémoire.
- Il est entièrement écrit en Python et surveille le processus qui exécute du code Python ainsi que l'utilisation mémoire ligne par ligne.
- Nous utilisons la fonction `memory_usage()` et passons le paramètre `interval` pour la fréquence de mesure de l'utilisation mémoire.


In [None]:
def sum_with_numpy():
    # Serial implementation
    np.arange(10**8).sum()

def sum_with_dask():
    # Parallel implementation
    work = da.arange(10**8).sum()
    work.compute()

memory_numpy = memory_usage(sum_with_numpy, interval=0.01)
memory_dask = memory_usage(sum_with_dask, interval=0.01)

# Plot results
plt.plot(memory_numpy, label='numpy')
plt.plot(memory_dask, label='dask')
plt.xlabel('Time step')
plt.ylabel('Memory / MB')
plt.legend(loc='best')
plt.show()

Vous pouvez aussi utiliser les options de profilage de Dask :


In [None]:
from dask.diagnostics import Profiler, ResourceProfiler
work = da.arange(10**8).sum()
with Profiler() as prof, ResourceProfiler(dt=0.001) as rprof:
    result2 = work.compute()

from bokeh.plotting import output_notebook
from dask.diagnostics import visualize
visualize([prof,rprof], output_notebook())

In [None]:
with ResourceProfiler(dt=0.001) as rprof2:
    result = np.arange(10**8).sum()
visualize([rprof2], output_notebook())

# <font color="red">Dask DataFrames</font>

- Pandas est excellent pour les ensembles de données tabulaires qui tiennent en mémoire.
- Dask devient utile quand l'ensemble de données à analyser est plus grand que la RAM de votre machine.
- Dask DataFrames :
     - Coordonnent de nombreux DataFrames Pandas, partitionnés le long d'un index.
     - Supportent un large sous-ensemble de l'API Pandas.
- Une opération sur un Dask DataFrame déclenche de nombreuses opérations Pandas sur les DataFrames Pandas constitutifs, de manière à prendre en compte le parallélisme potentiel et les contraintes mémoire.
- Parmi les opérations très rapides avec les Dask DataFrames :
     - Opérations arithmétiques (multiplication ou addition à une Series)
     - Agrégations courantes (`mean`, `min`, `max`, `sum`, etc.)
     - Appel de `apply`
     - Appel de `value_counts()`, `drop_duplicates()` ou `corr()`
     - Filtrage avec `loc`, `isin`, et sélection ligne par ligne



### <font color="green">Exemple : Dataset des vols NYC</font>

Ce dataset contient des données concernant des vols (années 1990) au départ des trois aéroports de la région de New York.


télécharger les données :

In [None]:
import urllib.request

print("\t Downloading NYC dataset...", end="\n", flush=True)

url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, header = urllib.request.urlretrieve(url, "nycflights.tar.gz")

print("\t Done!", flush=True)

In [None]:
!ls -lrt

Extraire les fichiers `.csv` du fichier tar.gz :

In [None]:
import tarfile

with tarfile.open(filename, mode="r:gz") as flights:
     flights.extractall("data/")

In [None]:
!ls -lrt data/nycflights

Charger tous les fichiers dans le dataset, d'un seul coup:

In [None]:
import os

df = dd.read_csv(os.path.join("data", "nycflights", "*.csv"),
                parse_dates={"Date": [0, 1, 2]})
df

- La représentation de l'objet DataFrame (aussi appelé *schéma*) ne contient pas de données.
- `pandas.read_csv` lit l'intégralité du fichier avant d'inférer les types de données.
- `dask.dataframe.read_csv` ne lit qu'un échantillon du début du fichier (ou du premier fichier). Ces types de données inférés sont ensuite appliqués lors de la lecture de toutes les partitions.


Nous pouvons essayer d'afficher les premières lignes :

In [None]:
df.head()

Par contre, si on essaye d'afficher les dernières lignes, on peut tomber sur une erreur :

In [None]:
df.tail()

En effet, il y a un problème avec les types de données de quelques colonnes.
- Les types de données inférés à partir de l'échantillon sont incorrects.
- Nous pouvons le corriger en relisant les fichiers et en spécifiant les types de données appropriés.


In [None]:
df = dd.read_csv(os.path.join("data", "nycflights", "*.csv"),
                parse_dates={"Date": [0, 1, 2]},
                dtype={'TailNum': str,
                       'CRSElapsedTime': float,
                       'Cancelled': bool})

In [None]:
df.tail()

### <font color="blue">Effectuer des opérations comme avec les `Pandas DataFrames`</font>


**Valeur maximale d'une colonne** :

- Nous voulons maintenant calculer le maximum de la colonne `DepDelay`.
- Avec `Pandas`, nous ferions une boucle sur chaque fichier pour trouver les maximums individuels, puis trouver le maximum final sur tous les maximums individuels.
- `dask.dataframe` nous permet d'écrire du code semblable à Pandas qui opère sur des ensembles de données plus grands que la mémoire, en parallèle.


In [None]:
df.DepDelay.max().visualize()

In [None]:
%time df.DepDelay.max().compute()

Si nous faisons la même chose en `Pandas`, nous obtiendrons :


In [None]:
%%time

import glob

list_files = glob.glob("data/nycflights/*csv")

maxes = list()
for file_name in list_files:
    pddf = pd.read_csv(file_name)
    maxes.append(pddf.DepDelay.max())

final_max = max(maxes)

print("Final Maximum: ", max(maxes))

**Plotting**

In [None]:
df[df.Dest == 'PIT'].compute().plot(kind='scatter',
                                    x="DayOfWeek",
                                    y="DepDelay")

**Autres Operations et statistiques sur les vols**

Nombre de vols non annulés:

In [None]:
len(df[~df.Cancelled])

Nombre de vols non-annulés au départ de chaque aéroport :

In [None]:
df[~df.Cancelled].groupby('Origin').Origin.count().compute()

Retard moyen au départ pour chaque jour de la semaine :

In [None]:
df.groupby("DayOfWeek").DepDelay.mean().compute()

Regroupement par destinations et comptage :


In [None]:
df.groupby("Dest").count().compute()

Moyenne des retards par destination.

In [None]:
df.groupby("Dest")["ArrDelay"].mean().compute()

Total des vols en retard supérieur à 30 minutes, par destination.

In [None]:
df[df.ArrDelay+df.DepDelay>30.0].groupby("Dest").Dest.count().compute()

**Partage des résultats intermédiaires**

- Nous effectuons parfois la même opération plusieurs fois.
- Pour la plupart des opérations, `dask.dataframe` hache les arguments, permettant aux calculs dupliqués d'être partagés et ne calculés qu'une seule fois.


In [None]:
non_cancelled = df[~df.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

Ici, on appelle chaque transformation séparemment.

In [None]:
%%time
mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

Maintenant, nous passons les deux transformations à un seul appel `compute` :

In [None]:
%%time

mean_delay_res, std_delay_res = da.compute(mean_delay, std_delay)

Les graphes de tâches des deux résultats sont fusionnés lors de l'appel à `dask.compute`, permettant aux opérations partagées d'être effectuées une seule fois au lieu de deux.


### Exercice 3

- Considérez le code ci-dessous qui calcule le retard moyen au départ par aéroport.
- Parallélisez le code en utilisant Dask.


In [None]:
%%time

sum_delays = list()
count_delays = list()

for file_name in list_files:
    pddf = pd.read_csv(file_name)
    by_origin = pddf.groupby('Origin')
    loc_total = by_origin.DepDelay.sum()
    loc_count = by_origin.DepDelay.count()
    sum_delays.append(loc_total)
    count_delays.append(loc_count)

total_delays = sum(sum_delays)
n_flights = sum(count_delays)
mean_delays = total_delays / n_flights
print("Mean delays: {}".format(mean_delays))

In [None]:
#VOTRE CODE ICI

# <font color="red">Ordonnanceurs de tâches</font>

- Après que Dask a généré les graphes de tâches, il doit les exécuter sur du matériel parallèle.
- C'est le rôle d'un ordonnanceur de tâches.
- Il existe différents ordonnanceurs de tâches. Chacun consomme un graphe de tâches et calcule le même résultat, mais avec différentes caractéristiques de performance.

![schedulers](https://docs.dask.org/en/latest/_images/dask-overview.svg)

**Source de l'image** : [https://docs.dask.org/en/latest/](https://docs.dask.org/en/latest/)

Les réseaux Dask sont composés de trois éléments :
- **Ordonnanceur centralisé** : Gère les travailleurs et assigne les tâches qu'ils doivent accomplir.
- **Travailleurs** : Sont des threads, processus, ou machines séparées dans un cluster. Ils exécutent les calculs du graphe de calcul : effectuent les calculs, conservent les résultats, et communiquent les résultats entre eux.
- **Un ou plusieurs clients** : notebooks Jupyter ou scripts qui interagissent avec les utilisateurs et soumettent le travail à l'ordonnanceur pour exécution sur les travailleurs.

![networks](https://miro.medium.com/max/700/0*9JHQAjTVoKbm2f4X.png)  
**Source de l'image** : [Steven Gon](https://gongster.medium.com/dask-an-introduction-and-tutorial-b42f901bcff5)

Pour exécuter les graphes de tâches, il existe deux types d'ordonnanceurs :
* **Machine unique** : Fournit des fonctionnalités de base sur un processus local ou un pool de threads. Il est simple et économique à utiliser, mais ne peut être utilisé que sur une seule machine et ne scale pas.
* **Distribué** : Offre plus de fonctionnalités, mais nécessite un peu plus d'effort pour la configuration. Il peut fonctionner localement ou distribué sur un cluster.


## <font color="blue">Ordonnanceurs pour une machine unique</font>

Considérez l'exemple suivant :


In [None]:
n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = dask.delayed(increment)(x)
    z = dask.delayed(double)(y)
    out.append(z)

totald = 0
for z in out:
    totald = dask.delayed(add)(totald, z)

**Synchonous**

- L'ordonnanceur `synchrous` est une exécution mono-thread de tous les calculs dans le thread local, sans aucun parallélisme.
- Il est utile pour le débogage ou le profilage.


In [None]:
%time totald.compute(scheduler='synchronous')

**Threads locaux**

Utilise `multiprocessing.pool.ThreadPool`


Utiliser tous les coeurs.

In [None]:
%time totald.compute(scheduler='threads')

Utiliser une partie des coeurs (NB : sur Colab vous n'avez que 2 coeurs, donc ça ne changera pas grand chose)

In [None]:
%time totald.compute(scheduler='threads', num_workers=2)

Il y a aussi le mot clé `single-threaded`, équivalent à `num_workers = 1`.

In [None]:
%time totald.compute(scheduler='single-threaded')

**Processus locaux**

- L'ordonnanceur multiprocessing exécute les calculs avec un `multiprocessing.Pool` local.
- Chaque tâche et toutes ses dépendances sont envoyées à un processus local, exécutées, puis leur résultat est renvoyé au processus principal.
- Le transfert de données vers les processus distants et en retour peut introduire des pénalités de performance, particulièrement lorsque les données transférées entre processus sont volumineuses.
- L'ordonnanceur multiprocessing est un excellent choix lorsque les workflows sont relativement linéaires, sans transferts significatifs de données inter-tâches, et lorsque les entrées et sorties sont petites, comme des noms de fichiers et des comptes.


In [None]:
import multiprocessing
print (multiprocessing.cpu_count())

Utiliser tous les coeurs:

In [None]:
%time result = totald.compute(scheduler='processes')

Utiliser un nombre spécifique de coeurs :

In [None]:
%time result = totald.compute(scheduler='processes', num_workers=2)

### Threads ou Processus ?

- **Utilisez l'ordonnanceur threadé** si votre calcul est dominé par du code non-Python, comme principalement le cas lors de l'utilisation de données numériques dans des tableaux NumPy, DataFrames Pandas, ou tout autre projet basé sur C/C++/Cython de l'écosystème.
   - Il est léger.
   - Peu de surcharge.
   - Le transfert de données entre tâches n'est pas coûteux car tout se passe dans le même processus.
- **Utilisez l'ordonnanceur multiprocessing** si votre calcul est dominé par le traitement d'objets Python purs comme des chaînes, dictionnaires ou listes.
   - Il est léger.
   - Chaque tâche et toutes ses dépendances sont envoyées à un processus local, exécutées, puis leur résultat est renvoyé au processus principal.
   - Le transfert de données vers les processus distants et en retour peut introduire des pénalités de performance, particulièrement lorsque les données transférées entre processus sont volumineuses.
   - Excellent choix quand les workflows sont relativement linéaires, sans transferts significatifs de données inter-tâches, et quand les entrées et sorties sont petites, comme des noms de fichiers et des comptes.


## <font color="blue">Ordonnanceur distribué</font>

- L'ordonnanceur distribué Dask peut être configuré sur un cluster ou exécuté localement sur une machine personnelle.
- C'est un ordonnanceur de tâches dynamique, distribué et géré de manière centralisée.
     - Le processus central `dask-scheduler` coordonne les actions de plusieurs processus `dask-worker` répartis sur plusieurs machines et les requêtes concurrentes de plusieurs clients.
     - L'ordonnanceur est asynchrone et piloté par événements, répondant simultanément aux requêtes de calcul de multiples clients et suivant l'avancement de multiples travailleurs.
     - La nature asynchrone et pilotée par événements le rend flexible pour gérer concurrentement une variété de charges de travail provenant de multiples utilisateurs tout en gérant une population de travailleurs fluide avec des pannes et ajouts.
     - Les travailleurs communiquent entre eux pour le transfert massif de données via TCP.
- Pour configurer `dask.distributed`, nous devons créer une instance client en appelant la classe `Client` depuis `dask.distributed`.
- Cela créera en interne un ordonnanceur Dask et des travailleurs Dask.
- Nous obtiendrons le **lien du tableau de bord** où nous pouvons analyser les tâches s'exécutant en parallèle.
- Nous pouvons passer un nombre de travailleurs (via l'argument `n_workers`) et le nombre de threads par processus travailleur (via `threads_per_worker`).
- Dès la création d'un client, Dask commence automatiquement à l'utiliser.


In [None]:
from dask.distributed import Client
client = Client()
client = Client(n_workers=3, threads_per_worker=4)
client.cluster

Si vous êtes dans Google Colab, nous devons créer un tunnel pour rediriger le tableau de bord. Nous utiliserons le service [localXpose](https://localxpose.io/signup)

C'est un service gratuit mais qui nécessite une inscription. Créez un compte et allez dans le menu "Access" pour copier votre propre token. Collez-le dans le code ci-dessous.

**Si vous ne voulez pas créer un compte, vous pouvez sauter cette partie (ou alors executez Dask dans votre propre machine).**


In [None]:
!pip install loclx-colab

In [None]:
import loclx_colab.loclx as lx
port = 8787 # The service port that you want to expose
access_token = 'YOUR TOKEN' # Your LocalXpose token here
url = lx.http_tunnel_start(port, access_token)
print(f"Your service is exposed to this URL: https://{url}")

Parfois, l'URL ne s'affiche pas du premier coup. Vous pouvez imprimer la liste des adresses avec la commande ci-dessous. Copiez l'URL et collez-la dans un nouvel onglet de navigateur web.


In [None]:
# To list all live created tunels
print(lx.http_tunnel_status())

Avec le dashboard, vous pouvez suivre l'éxécution des appels suivants :

In [None]:
import random

def random_slow_add(x, y):
    time.sleep(random.randrange(3,10))
    return x + y

In [None]:
results = list()

for x in data:
    y = dask.delayed(random_slow_add)(x, 1)
    results.append(y)

total = dask.delayed(sum)(results)

In [None]:
%time result = total.compute()
result

Eteindre le cluster:

In [None]:
client.close()

**<font color="red">Points à considérer</font>**

- Chaque tâche Dask a une surcharge (environ 1 ms). Si vous avez beaucoup de tâches, cette surcharge peut s'accumuler. Il est judicieux de donner à chaque tâche plus de quelques secondes de travail.
- Pour mieux comprendre les performances de votre programme, consultez la documentation [Dask Performance Diagnostics](https://distributed.dask.org/en/latest/diagnosing-performance.html). Vous pouvez aussi visionner la [vidéo](https://docs.dask.org/en/stable/diagnostics-distributed.html) pour apprendre à grouper votre travail en moins de tâches plus substantielles. Cela peut signifier appeler les opérations paresseuses d'un coup au lieu individuellement. Cela peut aussi impliquer une repartition de vos DataFrame(s).
- Une bonne règle empirique pour choisir le nombre de threads par travailleur Dask est la racine carrée du nombre de cœurs par nœud.
     - En général, plus de threads par travailleur sont bons pour un programme qui passe la plupart de son temps dans NumPy, SciPy, Numba, etc., et moins de threads par travailleur sont meilleurs pour des programmes plus simples qui passent la plupart de leur temps dans l'interpréteur Python.
- L'ordonnanceur Dask s'exécute sur un seul thread, donc lui assigner son propre nœud est un gaspillage.
- Il n'y a pas de limite stricte à l'évolutivité de Dask. La surcharge des tâches finira toutefois par submerger votre calcul selon la durée de chaque tâche.


## <font color="red">Documents de Référence</font>

- <a href="https://docs.dask.org/en/latest/why.html">Pourquoi Dask ?</a>
- <a href="https://github.com/dask/dask-tutorial">Tutoriel</a>
- <a href="https://www.manning.com/books/data-science-with-python-and-dask">Data Science with Python and Dask</a>
- <a href="https://www.manifold.ai/dask-and-machine-learning-preprocessing-tutorial">Dask and Machine Learning: Preprocessing Tutorial</a>
- <a href="https://carpentries-incubator.github.io/lesson-parallel-python/aio/index.html">Parallel Programming in Python</a>
