# Traitements parallèles et MapReduce avec `dask`

Dans cette séance vous allez utiliser le module `dask` pour effectuer des traitements parallèles dans le cadre du modèle MapReduce.

## Exercice #1 : fonction de type Map avec `dask`

Le module `dask` ne fournit pas de fonction de type `map` utilisable directement sur les structures de données itérables de base de Python (le module fournit cependant ce type de fonction pour des types de données propres au module tel que les *Dask Bags* ou les *Dask Dataframes*).

Le but de l'exercice est de créer et tester une fonction `dask_map()` qui soit interchangeable avec la fonction `map()` de Python mais qui permette d'avoir une parallélisation des calculs. 

*Remarque : pour mettre en évidence l'avantage des calculs parallèles, pour cet exercice certaines fonction de traitement sont rendues volontairement lentes.*

**Question 1.1**

Proposer une fonction `dask_map()` qui, dans son utilisation la plus simple, prend en premier argument le nom d'une fonction, en second argument un itérable et qui applique la fonction à chaque élément de l'itérable. La fonction renverra une liste de résultats, similaire à l'itérateur retourné par la fonction `map()`.

Les calculs indépendants devront se faire en parallèle grâce aux fonctionnalités du module `dask`.

In [86]:
import dask

def dask_map(f, iterable):
    f_delayed = dask.delayed(f)
    return dask.delayed([f_delayed(x) for x in iterable]).compute()


**Question 1.2**

Tester votre fonction sur le calcul "lent" de la norme d'un vecteur en utilisant la fonction `carre_slow()` fournie ci-dessous. Afficher les temps de calcul avec `%%time` et comparer avec l'utilisation de la fonction `map()` (en utilisant également `carre_slow()`).

In [99]:
import time
from math import sqrt
from operator import add
from functools import reduce

def freeze(t):
    st = time.time()
    et = st
    while et - st < t:
        et = time.time()

def carre_slow(x1):
    freeze(1)
    return x1**2

vecteur1 = [1, 2, 3, 4]

In [90]:
%%time
# code pour tester avec dask_map()
norm1_dask = sqrt(reduce(add, dask_map(carre_slow, vecteur1)))
print(norm1_dask)

5.477225575051661
CPU times: total: 1.17 s
Wall time: 1.18 s


In [91]:
%%time
# code pour tester avec map()
res_map = sqrt(reduce(add, map(carre_slow, vecteur1)))
print(res_map)


5.477225575051661
CPU times: total: 3.98 s
Wall time: 4 s


**Question 1.3** 

Modifier la fonction `dask_map()` pour pouvoir également gérer le cas ou l'on fournit une liste d'itérables séparés par des virgules (en nombre quelconque) après le nom de la fonction (la fonction doit alors avoir plusieurs arguments d'entrée). Dans ce cas, la fonction fournie est appliquée à chaque groupe d'éléments pris position par position dans les différents itérables, comme le fait la fonction `map()`.

*Indication 1 : on rappelle que l'on peut récupérer tout ou partie des arguments passés à une fonction sous forme d'un tuple en utilisant l'opérateur "splat" (ou "étoile" : `*` ) dans la définition de la fonction.*

*Tester le code ci-dessous :*

In [92]:
def ma_fonction(x, *args):
    print(f"x={x}")
    print(f"args={args}")

ma_fonction(1, [2,3], (4,5))

x=1
args=([2, 3], (4, 5))


*Indication 2 : d'autre part, on rappelle que l'on peut décompacter ("unpack") les éléments d'un itérable en utilisant le même opérateur `*` devant le nom de l'itérable. Cela permet notamment de fournir les éléments de l'itérable comme arguments individuels d'une fonction lors de son appel.*

*Tester le code ci-dessous :*

In [93]:
def ma_fonction2(x, ma_liste, mon_tuple):
    print(f"x={x}")
    print(f"ma_liste={ma_liste}")
    print(f"mon_tuple={mon_tuple}")

param = [1, [2,3], (4,5)]

ma_fonction2(*param)


x=1
ma_liste=[2, 3]
mon_tuple=(4, 5)


In [95]:
# cellule pour le code de votre nouvelle fonction dask_map()
def dask_map(*args):
    f, iterables = args[0], args[1:]
    f_delayed = dask.delayed(f)
    return dask.delayed([f_delayed(*x) for x in zip(*iterables)]).compute()

**Question 1.4**

Tester cette nouvelle fonction `dask_map()` sur le calcul "lent" du produit scalaire de deux vecteurs en utilisant la fonction `produit_slow()` fournie ci-dessous. Afficher les temps de calcul avec `%%time` et comparer avec l'utilisation de la fonction `map()` (en utilisant également `produit_slow()`).

In [100]:
def produit_slow(x,y):
    freeze(1)
    return x*y

vecteur2 = [4, 3, 2, 1]

In [101]:
%%time
# code pour tester avec dask_map()
prod_dask = reduce(add, dask_map(produit_slow, vecteur1, vecteur2))
print(prod_dask)

20
CPU times: total: 1.06 s
Wall time: 1.06 s


In [102]:
%%time
# code pour tester avec map()
prod_map = reduce(add, map(produit_slow, vecteur1, vecteur2))
print(prod_map)

20
CPU times: total: 4.02 s
Wall time: 4.01 s


**Question 1.5**

Ecrire une fonction `sim_cos_dask_map()` qui calcule et renvoie la similarité cosinus entre deux vecteurs en utilisant des processus MapReduce impliquant les fonctions `dask_map()`, `reduce()`, `carre_slow()`et `produit_slow()`.

In [103]:
def sim_cos_dask_map(v1, v2):
 
    norm1 = reduce(add, dask_map(carre_slow,v1))
    norm2 = reduce(add, dask_map(carre_slow,v2))
    prod = reduce(add, dask_map(produit_slow, v1, v2))

    return prod/sqrt(norm1*norm2)

**Question 1.6**

Tester cette fonction `sim_cos_dask_map()` sur les vecteurs `vecteur1` et `vecteur2` en affichant les temps de calcul avec `%%time` et comparer aux temps de calcul obtenus avec la fonction `sim_cos_map()` fournie ci-dessous qui utilise la fonction classique `map()`.

In [104]:
def sim_cos_map(v1, v2):
    norm1 = reduce(add, map(carre_slow,v1))
    norm2 = reduce(add, map(carre_slow,v2))
    prod = reduce(add, map(produit_slow, v1, v2))

    return prod/sqrt(norm1*norm2)

In [105]:
%%time
# code pour le test de sim_cos_dask_map()
sim_cos_dask_map(vecteur1,vecteur2)

CPU times: total: 3.33 s
Wall time: 3.34 s


0.6666666666666666

In [106]:
%%time
# code pour le test de sim_cos_dask_map()
sim_cos_map(vecteur1,vecteur2)

CPU times: total: 12 s
Wall time: 12 s


0.6666666666666666

**Question 1.7**

Expliquez pourquoi l'implémentation du calcul de la similarité cosinus proposée dans `sim_cos_dask_map()` n'est pas optimale vis-à-vis des fonctionnalités de parallélisation et d'ordonnacement des calculs de  `dask`.

Proposer une nouvelle implémentation de `dask_map()` (à renommer) et de `sim_cos_dask_map()` (à renommer également) qui soient optimales. Tester et comparer les temps de calcul des deux implémentations.

In [107]:
def dask_map_delayed(*args):
    f, iterables = args[0], args[1:]
    f_delayed = dask.delayed(f)
    return dask.delayed([f_delayed(*x) for x in zip(*iterables)])


def sim_cos_dask_delayed(v1, v2):
    reduce_delayed = dask.delayed(reduce)
    sqrt_delayed = dask.delayed(sqrt)
    norm1_delayed = reduce_delayed(add, dask_map_delayed(carre_slow,v1))
    norm2_delayed = reduce_delayed(add, dask_map_delayed(carre_slow,v2))
    prod_delayed = reduce_delayed(add, dask_map_delayed(produit_slow, v1, v2))
    res_delayed = prod_delayed/sqrt_delayed(norm1_delayed*norm2_delayed)
    return res_delayed.compute()


In [108]:
%%time

sim_cos_dask_delayed(vecteur1,vecteur2)

CPU times: total: 1.92 s
Wall time: 1.92 s


0.6666666666666666

## Exercice #2 : wordcount en MapReduce parallélisé

Reprendre l'exercice du wordcount qui utilise la fonction générique `map_reduce()` mais cete fois-ci en parallélisant les calculs des étapes de type Map grâce à la fonction `dask_map()` écrite précédemment. On ajoutera les instructions nécessaires pour ralentir artificiellement les fonctions impliquées dans les étapes Map (par exemple : 2 secondes d'attente pour le `mapper_word()` et 0.5 seconde d'attente pour le `reducer_word()`). Utiliser les fichiers d'entrée `sample0.txt`, `sample1.txt` et `sample2.txt` générés lors de l'exercice précédent sur wordcount.

Pour pouvoir facilement comparer le gain de temps apporté par la parallélisation des calculs, on pourra ajouter à la fonction `map_reduce()` un paramètre booléen `parallelise` (valeur par défaut à `False`) permettant de basculer entre l'utilisation de la fonction `map()` classique (i.e. sans parallélisation) et de la fonction `dask_map()` parallélisant les cacluls.

Les codes de l'exercie de base du wordcount en MapReduce vous sont fournis ci-dessous (à modifier pour l'exercice).

In [None]:
from itertools import groupby

def partitionner(list_of_list):
    list_of_tuples = sorted([(cle, val) for l in list_of_list for (cle, val) in l], key=lambda x : x[0])
    list_int = []
    for cle, group in groupby(list_of_tuples, key=lambda x : x[0]):
        list_int.append((cle, [g[1] for g in group]))
    return list_int

def map_reduce(data, mapper, reducer, parallelise=False):
    my_map = map
    if parallelise:
        my_map = dask_map
    list_of_list_inter = my_map(mapper, data)
    list_inter = partitionner(list_of_list_inter)
    list_red = my_map(reducer, list_inter)

    return [(cle, val) for l in list_red for (cle, val) in l]

In [117]:
def tokenise(text):
    return text.lower().replace(".", "").split()

import lorem

data_wordcount = ["sample0.txt","sample1.txt", "sample2.txt"]


def mapper_words(cle_val):
    freeze(2)
    filename = cle_val[1]
    with open(filename, 'rt', encoding='utf-8') as ifile:
        content = ifile.read()
    return [(w,1) for w in tokenise(content)]

def reducer_words(cle_val):
    freeze(0.5)
    return [(cle_val[0], len(cle_val[1]))]

data_mr_wordcount = list(enumerate(data_wordcount))


In [119]:
%%time
print(sorted(map_reduce(data_mr_wordcount, mapper_words, reducer_words,parallelise = False),key=lambda x:x[1], reverse=True))

[('labore', 68), ('voluptatem', 61), ('modi', 58), ('amet', 55), ('porro', 54), ('neque', 53), ('eius', 52), ('numquam', 52), ('ipsum', 50), ('adipisci', 48), ('tempora', 48), ('dolor', 47), ('quaerat', 47), ('quiquia', 47), ('aliquam', 46), ('etincidunt', 45), ('dolore', 44), ('est', 44), ('velit', 41), ('consectetur', 40), ('ut', 40), ('dolorem', 39), ('sed', 39), ('non', 37), ('quisquam', 35), ('magnam', 28), ('sit', 26), ('consecteturdolorem', 1), ('nequeconsectetur', 1), ('temporanumquam', 1)]
CPU times: total: 21.1 s
Wall time: 21.1 s


## Exercice #3 : PageRank en MapReduce parallélisé

Reprendre l'exercice du PageRank en utilisant la fonction générique `map_reduce()` modifiée lors de l'exercice 2 ci-dessus. Ajouter un paramètre `parallelise` à la fonction `mr_pagerank()` qui sera transmis dircetement à la fonction `map_reduce()` pour pouvoir basculer d'un traitement sans parallélisation (par défaut) à un traitement avec parallélisation.

On ajoutera les instructions nécessaires pour ralentir artificiellement les fonctions impliquées dans les étapes Map (par exemple : 0.1 secondes d'attente pour le `pr_mapper` et 0.1 seconde d'attente pour le `pr_reducer`). Utiliser les fichier `graph_web0.txt` à `graph_web5.txt` du graphe réparti fournis lors de l'exercice précédent sur PageRank..

Sachant que le graphe complet utilisé comporte 61 noeuds et en se limitant à 10 itérations de l'algorithme *power iteration*, prévoir un ordre de grandeur du temps nécessaire au calcul des PageRank dans le cas non parallélisé.
Tester et comparer les temps de traitement des deux implémentation en utilisant `%%time`.

*[Votre réponse ici]()*

*--> 10 x (61x0.1  +  61x0.1) = 122 secondes (~ 2 minutes)*

Le code de l'exercie précédent du PageRank en MapReduce vous est fourni ci-dessous (à modifier pour l'exercice).

In [80]:
def get_nodes_and_successors(filename):
    out_list = []
    with open(filename, 'rt', encoding="utf-8") as i_file:
        for line in i_file.readlines():
            line = line.strip().split()
            out_list.append((line[0], line[1:]))
    return out_list

def pr_mapper(my_tuple, alpha, N):
    freeze(0.1)
    (node, successors), pr = my_tuple
    nbs = len(successors)
    outlist = []
    outlist.append((node, (1 - alpha)/(alpha*N))) # pour traiter le cas des noeuds sans prédécesseur (question 4.12)
    contrib = pr/nbs
    for s in successors:
        outlist.append((s,contrib))
    return outlist

def pr_reducer(tuple, alpha):
    freeze(0.1)
    node, lst_pr = tuple
    pr = alpha*reduce(add, lst_pr)  # formule qui prend en compte le traitement spécial des noeuds sans prédécesseur (question 4.12)
    return [(node, pr)]


def mr_pagerank(data_mr, alpha=0.9, max_iter=100, parallelise=False):
    nodes_and_successors  = {x[0]:x[1] for l in map(get_nodes_and_successors, data_mr) for x in l}
    N = len(nodes_and_successors)
    R = [((node, successors), 1/N) for node,successors in nodes_and_successors.items()]
    my_pr_mapper = lambda x : pr_mapper(x, alpha, N)
    my_pr_reducer = lambda x : pr_reducer(x, alpha)
    for i in range(max_iter):
        print(f'iteration {i}')
        mr_res = map_reduce(R, my_pr_mapper, my_pr_reducer, parallelise=parallelise)
        R = [((node, nodes_and_successors[node]), pr) for node, pr in mr_res]

    return mr_res

data_mr = [f"graph_web_{i}.txt" for i in range(6)]

In [83]:
%%time
print(sorted(mr_pagerank(data_mr, alpha=0.9, max_iter=10, parallelise=True), key=lambda x: x[1], reverse=True))

iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
iteration 8
iteration 9
[('PAGE_20', 0.04345946128024342), ('PAGE_01', 0.039154034134838554), ('PAGE_39', 0.039039074303924046), ('PAGE_52', 0.035192725114315655), ('PAGE_06', 0.03176441412309123), ('PAGE_10', 0.031511640152138864), ('PAGE_42', 0.03105228751230894), ('PAGE_37', 0.030036167960026387), ('PAGE_16', 0.030010298413175734), ('PAGE_38', 0.02616859598522506), ('PAGE_30', 0.024913164017164317), ('PAGE_60', 0.023506291127890742), ('PAGE_32', 0.02307078315084029), ('PAGE_45', 0.022554318776542563), ('PAGE_24', 0.02253524161091848), ('PAGE_40', 0.021929739570579816), ('PAGE_50', 0.021225351812112356), ('PAGE_05', 0.021138169846874034), ('PAGE_29', 0.021134255899218773), ('PAGE_12', 0.02029488214060633), ('PAGE_11', 0.01970598020472508), ('PAGE_25', 0.017816287636312406), ('PAGE_07', 0.01740435596969446), ('PAGE_08', 0.0160255227845255), ('PAGE_55', 0.015781064233683383), ('PAGE_59', 0.0