# **Comprendre MapReduce**

## **Map**

```map``` prend en argument une fonction et une collection et retourne une collection. La fonction étant appliquée sur chaque élément de la collection.

**Exo1:** Utiliser la fonction ```map``` pour multiplier par 2 les éléments d'une liste. Faire une version en définissant une fonction et une seconde version en utilisant une fonction anonyme ```lambda```

In [1]:
a = [1,2,3,5,7]

b = map(lambda x : x * 2, a)
list(b)

[2, 4, 6, 10, 14]

**Exo2:** Créer une liste de 1000 entiers aléatoires. À l'aide de la fonction `map` retourner une collection qui contient `True` si le nombre était pair et `False` sinon 

In [2]:
import numpy as np
l = np.random.randint(10**5, size=10) #size=1000
print(l)
print(list(map(lambda x: x%2 == 0, l)))

[43603 52571 51354 16311 47282 96796 89059 27850 20111 39560]
[False, False, True, False, True, True, False, True, False, True]


#### **Pour faire simple, la seule différence entre le ```map``` de python et celui de spark c'est que le ```map``` de ce dernier découpe le calul sur plusieurs machines pour paralléliser le traitement.**

## **Reduce**

La fonction ```reduce``` prend en entrée une collection et retourne une réduction de celle ci en lui appliquant une fonction d'agrégation itérative, c'est-à-dire, une fonction qui lit les valeurs de la liste de gauche à droite et ne renvoie qu'une seule valeur agrégée.  
La fonction d'agrégation doit donc prendre 2 arguments et ne renvoyer qu'une seule valeur.

Par exemple, ```reduce``` permet de calculer la somme des éléments d'une liste, ce qu'on va faire (enfin vous allez faire) de suite.

**Exo3:** calculer la somme de la liste ```a``` :
>1. en utilisant une boucle ```for```
>2. en utilisant la fonction ```reduce``` du module ```functools```

In [3]:
a = [1, 2, 3, 4, 5]

In [4]:
tot = 0
for x in a:
    tot += x
tot

15

In [5]:
from functools import reduce
tot = reduce(lambda x,y: x+y, a)
tot

15

**Exo4:** générer une liste de 20 entiers entre 0 et 1000 et calculer le maximum de cette liste à l'aide de la fonction ```reduce```

In [6]:
l = np.random.randint(10**3, size=20)
print(l)
print(reduce(lambda x,y : x if x>y else y, l))

[879 158 787  92  53 698 987 707 120 266 953  55 506  61 990   1   0 767
 461 259]
990


**Exo5:** importer la fonction ```accumulate``` du module ```itertools```, comprendre comment elle marche, la tester et la comparer avec ```reduce``` sur les 2 exemples précédents (somme et maximum d'une liste). Quelles sont les différences ?

In [7]:
from itertools import accumulate
l = np.random.randint(10**3, size=20)
print(l)
print(list(accumulate(l, lambda x,y : x+y)))
print(list(accumulate(l, lambda x,y : x if x>y else y)))

[ 39  79   0 512 583 536 829 637 703 494 386 386 461 919 513 829 136  81
 325 338]
[39, 118, 118, 630, 1213, 1749, 2578, 3215, 3918, 4412, 4798, 5184, 5645, 6564, 7077, 7906, 8042, 8123, 8448, 8786]
[39, 79, 79, 512, 583, 583, 829, 829, 829, 829, 829, 829, 829, 919, 919, 919, 919, 919, 919, 919]


#### **Comme pour la fonction ```map``` la seule différence entre le ```reduce``` de python et celui de spark c'est que celui de spark découpe en plusieurs morceaux et parallélise le traitement.**

## **Petit bonus : Filter**


**Exo6**: créer une liste avec les valeurs suivantes : [-1, 3, 2, -1, 6, 8] puis utiliser la fonction ```filter``` pour récupérer uniquement les valeurs positives.

In [8]:
a = [-1, 3, 2, -1, 6, 8]
list(filter(lambda x: x>0, a))

[3, 2, 6, 8]

## **Map et Reduce**

On considère le mega big dataset suivant : 

In [9]:
a = ["ceci n'est pas du big data", 'Bonjour voilà du texte',
     "il est en retard tous les jours", "une bonne auberge", "elle est en Grèce"]

**Exo7:** utiliser la fonction ```map``` pour séparer chaque phrase en en mots

In [10]:
def sep(ph):
    return ph.split()

#list(map(sep, a))
list(map(lambda ph: ph.split(), a))

[['ceci', "n'est", 'pas', 'du', 'big', 'data'],
 ['Bonjour', 'voilà', 'du', 'texte'],
 ['il', 'est', 'en', 'retard', 'tous', 'les', 'jours'],
 ['une', 'bonne', 'auberge'],
 ['elle', 'est', 'en', 'Grèce']]

**Exo8:** à l'aide de ```map``` et/ou ```reduce``` renvoyer le nombre total de mots dans ```a```

In [11]:
mapped = map(lambda ph : len(ph.split()), a)
reduce(lambda x,y : x+y, mapped)
#reduce(lambda x,y : x+y, map(lambda ph : len(ph.split()), a))

24

## **WordCount : le "hello world" du MapReduce**

**Exo9:** on va illustrer le fonctionnement de MapReduce pour compter le nombre d'occurence de chaque mot dans la variable ```text``` définie ci-dessous. Il faut donc éxecuter les 4 étapes suivantes (le preprocessing étant un petit supplément) :
>1. SPLIT: découpage du texte en 4 sous-parties
>2. petite étape de preprocessing avec :
>>- suppression de la ponctuation,
>>- passage en minuscules
>>- suppression des mots de 1, 2 ou 3 lettres
>3. MAP: avec la fonction ```map``` renvoyer une liste de (clé, valeur) : ici clé=mot et valeur=occurence=1
>4. SHUFFLE (& SORT): regroupement des résultats : on doit avoir pour chaque clé le couple (clé, [val, val, val,...]) : ici (mot, [1,1,1,...])
>5. REDUCE: sommer les occurences uniques de chaque mot pour obtenir le nombre d'occurrences totales

In [12]:
text = "Si vous voulez mon avis concernant la morosité conjoncturelle, \
je n'exclus pas de réorganiser la simultanéité des hypothèses réalisables, \
avec toute la prudence requise. Eu égard à la fragilité actuelle, il ne faut \
pas s'interdire de se remémorer précisément les organisations matricielles \
opportunes, avec beaucoup de recul. Afin de circonvenir à cette inflexion de \
l'époque actuelle, je recommande d'essayer la somme des stratégies envisageables, \
même si ce n'est pas facile. Vu la dualité de la situation conjoncturelle, il ne \
faut pas négliger de gérer certaines synergies optimales, parce que nous le valons \
bien. Nonobstant la dualité de la situation observée, je n'exclus pas d'inventorier \
la somme des modalités réalisables, parce qu'il est temps d'agir. Si vous voulez mon \
avis concernant la baisse de confiance présente, je n'exclus pas d'inventorier la \
globalité des améliorations pertinentes, très attentivement."

In [13]:
#SPLIT
mots = text.split()
cuts = np.linspace(0, len(mots), 5, dtype=int)
gp_mots = [mots[cuts[i]:cuts[i+1]] for i in range(4)]
text_split = [" ".join(gp_mots[i]) for i in range(4)]
text_split

["Si vous voulez mon avis concernant la morosité conjoncturelle, je n'exclus pas de réorganiser la simultanéité des hypothèses réalisables, avec toute la prudence requise. Eu égard à la fragilité actuelle, il ne faut pas",
 "s'interdire de se remémorer précisément les organisations matricielles opportunes, avec beaucoup de recul. Afin de circonvenir à cette inflexion de l'époque actuelle, je recommande d'essayer la somme des stratégies envisageables, même si ce n'est",
 "pas facile. Vu la dualité de la situation conjoncturelle, il ne faut pas négliger de gérer certaines synergies optimales, parce que nous le valons bien. Nonobstant la dualité de la situation observée, je n'exclus",
 "pas d'inventorier la somme des modalités réalisables, parce qu'il est temps d'agir. Si vous voulez mon avis concernant la baisse de confiance présente, je n'exclus pas d'inventorier la globalité des améliorations pertinentes, très attentivement."]

In [14]:
#PREPROCESSING
import string

def preprocess_text(s):
    s = s.lower()
    s = s.replace("'", " ")
    s = s.translate(str.maketrans("", "", string.punctuation))
    gros_mots = filter(lambda x: len(x)>3, s.split())
    return " ".join(gros_mots)

preprocess_text(text_split[0])

'vous voulez avis concernant morosité conjoncturelle exclus réorganiser simultanéité hypothèses réalisables avec toute prudence requise égard fragilité actuelle faut'

In [15]:
import re

def preprocess_text2(s):
    return " ".join(re.findall(r"\b\w{4,}\b", s.lower()))

preprocess_text2(text_split[0])

'vous voulez avis concernant morosité conjoncturelle exclus réorganiser simultanéité hypothèses réalisables avec toute prudence requise égard fragilité actuelle faut'

In [16]:
#MAP
def mapper(s):
    return [(mot, 1) for mot in preprocess_text(s).split()]

mapped = list(map(mapper, text_split))
mapped

[[('vous', 1),
  ('voulez', 1),
  ('avis', 1),
  ('concernant', 1),
  ('morosité', 1),
  ('conjoncturelle', 1),
  ('exclus', 1),
  ('réorganiser', 1),
  ('simultanéité', 1),
  ('hypothèses', 1),
  ('réalisables', 1),
  ('avec', 1),
  ('toute', 1),
  ('prudence', 1),
  ('requise', 1),
  ('égard', 1),
  ('fragilité', 1),
  ('actuelle', 1),
  ('faut', 1)],
 [('interdire', 1),
  ('remémorer', 1),
  ('précisément', 1),
  ('organisations', 1),
  ('matricielles', 1),
  ('opportunes', 1),
  ('avec', 1),
  ('beaucoup', 1),
  ('recul', 1),
  ('afin', 1),
  ('circonvenir', 1),
  ('cette', 1),
  ('inflexion', 1),
  ('époque', 1),
  ('actuelle', 1),
  ('recommande', 1),
  ('essayer', 1),
  ('somme', 1),
  ('stratégies', 1),
  ('envisageables', 1),
  ('même', 1)],
 [('facile', 1),
  ('dualité', 1),
  ('situation', 1),
  ('conjoncturelle', 1),
  ('faut', 1),
  ('négliger', 1),
  ('gérer', 1),
  ('certaines', 1),
  ('synergies', 1),
  ('optimales', 1),
  ('parce', 1),
  ('nous', 1),
  ('valons', 1),
 

In [17]:
#SHUFFLE & SORT
liste = [couple for subset in mapped for couple in subset]
liste.sort()

prev_wd = None
wd_occur = list()
shuffled_sorted = list()

for k, v in liste:
    if k != prev_wd:
        if prev_wd != None:
            shuffled_sorted.append((prev_wd, wd_occur))
        prev_wd, wd_occur = k, [v]
    else:
        wd_occur.append(v)

shuffled_sorted.append((prev_wd, wd_occur))
shuffled_sorted

[('actuelle', [1, 1]),
 ('afin', [1]),
 ('agir', [1]),
 ('améliorations', [1]),
 ('attentivement', [1]),
 ('avec', [1, 1]),
 ('avis', [1, 1]),
 ('baisse', [1]),
 ('beaucoup', [1]),
 ('bien', [1]),
 ('certaines', [1]),
 ('cette', [1]),
 ('circonvenir', [1]),
 ('concernant', [1, 1]),
 ('confiance', [1]),
 ('conjoncturelle', [1, 1]),
 ('dualité', [1, 1]),
 ('envisageables', [1]),
 ('essayer', [1]),
 ('exclus', [1, 1, 1]),
 ('facile', [1]),
 ('faut', [1, 1]),
 ('fragilité', [1]),
 ('globalité', [1]),
 ('gérer', [1]),
 ('hypothèses', [1]),
 ('inflexion', [1]),
 ('interdire', [1]),
 ('inventorier', [1, 1]),
 ('matricielles', [1]),
 ('modalités', [1]),
 ('morosité', [1]),
 ('même', [1]),
 ('nonobstant', [1]),
 ('nous', [1]),
 ('négliger', [1]),
 ('observée', [1]),
 ('opportunes', [1]),
 ('optimales', [1]),
 ('organisations', [1]),
 ('parce', [1, 1]),
 ('pertinentes', [1]),
 ('prudence', [1]),
 ('précisément', [1]),
 ('présente', [1]),
 ('recommande', [1]),
 ('recul', [1]),
 ('remémorer', [1])

In [18]:
#SHUFFLE & SORT
liste = [couple for subset in mapped for couple in subset]
liste.sort()

from collections import defaultdict
dico = defaultdict(list)

for k,v in liste :
    dico[k].append(v)

shuffled_sorted = list(dico.items())

In [19]:
#REDUCE
def reducer(key, values):
    return (key, sum(values))

reduced = list(map(lambda couple: reducer(couple[0], couple[1]), shuffled_sorted))
reduced

[('actuelle', 2),
 ('afin', 1),
 ('agir', 1),
 ('améliorations', 1),
 ('attentivement', 1),
 ('avec', 2),
 ('avis', 2),
 ('baisse', 1),
 ('beaucoup', 1),
 ('bien', 1),
 ('certaines', 1),
 ('cette', 1),
 ('circonvenir', 1),
 ('concernant', 2),
 ('confiance', 1),
 ('conjoncturelle', 2),
 ('dualité', 2),
 ('envisageables', 1),
 ('essayer', 1),
 ('exclus', 3),
 ('facile', 1),
 ('faut', 2),
 ('fragilité', 1),
 ('globalité', 1),
 ('gérer', 1),
 ('hypothèses', 1),
 ('inflexion', 1),
 ('interdire', 1),
 ('inventorier', 2),
 ('matricielles', 1),
 ('modalités', 1),
 ('morosité', 1),
 ('même', 1),
 ('nonobstant', 1),
 ('nous', 1),
 ('négliger', 1),
 ('observée', 1),
 ('opportunes', 1),
 ('optimales', 1),
 ('organisations', 1),
 ('parce', 2),
 ('pertinentes', 1),
 ('prudence', 1),
 ('précisément', 1),
 ('présente', 1),
 ('recommande', 1),
 ('recul', 1),
 ('remémorer', 1),
 ('requise', 1),
 ('réalisables', 2),
 ('réorganiser', 1),
 ('simultanéité', 1),
 ('situation', 2),
 ('somme', 2),
 ('stratégie

## **Pour : finir moyenne de liste et temps d'éxecution**

**Exo10:** à l'aide des fonctions ```map``` et ```reduce```, on va calculer la moyenne des éléments d'une liste. Et sans utiliser ```len()```...
>1. créer une liste d'entiers aléatoires de taille 10^7
>2. utiliser ```map``` et ```reduce``` pour calculer la moyenne sur cette liste
>3. utiliser ```%%time``` pour mesurer le temps d'éxecution de ce calcul
>4. découper la liste en 5 sous-listes de tailles égales
>5. importer la fonction ```Pool``` de la libraire ```multiprocessing``` et l'utiliser pour paralléliser les calculs sur chaque sous-liste. L'idée est de définir par exemple une fonction MapReduce_average qui reprend votre méthode utilisée pour le calcul de la moyenne et utiliser pool.map(MapReduce_average, liste_splitée)
>6. regarder avec %%time les différences de temps d'éxecution des 2 méthodes

In [20]:
a =  np.random.randint(1000, size=10**7)

In [21]:
%%time
a_mapped = map(lambda val : (val, 1), a)
a_reduced = reduce(lambda x,y : (x[0]+y[0], x[1]+y[1]), a_mapped)
a_reduced[0]/a_reduced[1]

CPU times: user 3.31 s, sys: 3.55 ms, total: 3.32 s
Wall time: 3.31 s


499.5265801

In [45]:
nb_cores = 8
cuts = np.linspace(0, 10**7, nb_cores+1, dtype=int)
a_split = [a[cuts[k]:cuts[k+1]] for k in range(nb_cores)]
a_split

[array([718, 511, 109, ..., 409, 709, 305]),
 array([718,  43,  36, ..., 843, 650,  83]),
 array([344,  90, 785, ..., 536, 188, 433]),
 array([297, 642, 384, ..., 477, 304, 945]),
 array([672, 808, 260, ..., 693, 658, 838]),
 array([ 69, 752, 693, ..., 564, 210,  71]),
 array([287, 852, 212, ..., 282, 611, 123]),
 array([159, 919, 216, ..., 693, 153, 399])]

In [46]:
from multiprocess import Pool
pool = Pool(nb_cores)

In [47]:
def mr_avg(l):
    mapped = map(lambda i : (i,1), l)
    return(reduce(lambda x,y : (x[0]+y[0], x[1]+y[1]), mapped))

In [48]:
%%time
mapped = pool.map(mr_avg, a_split)
reduced = reduce(lambda x,y : (x[0]+y[0], x[1]+y[1]), mapped)
a_reduced[0]/a_reduced[1]

CPU times: user 52.8 ms, sys: 36.7 ms, total: 89.4 ms
Wall time: 1.19 s


499.5265801

In [49]:
pool.close()
pool.terminate()

## **Supplément**

**Exo11:** Calculer en utilisant le paradigme MapReduce le produit d'une matrice M avec un vecteur V.  
Ça peut paraître inutile mais cette opération est derrière l'algorithme du PageRank de Google, c'est d'ailleurs en partie pour ce calcul que MapReduce a été conçu. Dans ce cas, la dimension du problème est le nombre de pages web indexées, soit clairement un problème de Big Data.  
Par ailleurs, on l'a vu dans la régression linéaire notamment mais pas uniquement, ce produit $matrice*vecteur$ est omniprésent dans les problèmes d'optimisation aussi.
>*Quelques indications:*

>0. commencer par vous remettre dans le bain en revoyant comment on calcul un produit matriciel et plus exactement un produit $matrice*vecteur$
>1. générer une matrice aléatoire $M$ de taille (5,6) par exemple et un vecteur de taille 6
>2. transformer votre matrice $M$ en une liste de triplet $(i,j,m_{ij})$
>3. étape map : on peut choisir comme clé le numéro de ligne et l'étape map consistera donc à passer du triplet $(i,j,m_{ij})$ au couple $(i,m_{ij}*v_j)$
>4. étape shuffle&sort : il faut regrouper les couples (clé, valeur) en (clé, liste_de_valeurs), la clé étant ici l'indice de ligne $i$
>5. étape réduce : agréger les résultats en les sommant

*Pour aller plus loin:* cette solution marche convenablement lorsque chaque noeud a la capacité de stocker $V$ localement mais que faire si $V$ est trop grand ? Vous pouvez mettre en pratique votre solution avec les mêmes $M$ et $V$ que précédemment

In [33]:
# génération matrice
M = np.random.randint(10, size=(5,6))
V =  np.random.randint(10, size=6)
print(M, V)

[[7 2 2 7 9 5]
 [5 2 4 0 6 2]
 [0 6 0 5 1 5]
 [7 5 3 7 1 9]
 [8 1 7 0 2 8]] [1 9 8 1 6 7]


In [34]:
#transformation en liste de triplets
M_liste = [(i,j,M[i,j]) for i in range(M.shape[0]) for j in range(M.shape[1])]
M_liste

[(0, 0, 7),
 (0, 1, 2),
 (0, 2, 2),
 (0, 3, 7),
 (0, 4, 9),
 (0, 5, 5),
 (1, 0, 5),
 (1, 1, 2),
 (1, 2, 4),
 (1, 3, 0),
 (1, 4, 6),
 (1, 5, 2),
 (2, 0, 0),
 (2, 1, 6),
 (2, 2, 0),
 (2, 3, 5),
 (2, 4, 1),
 (2, 5, 5),
 (3, 0, 7),
 (3, 1, 5),
 (3, 2, 3),
 (3, 3, 7),
 (3, 4, 1),
 (3, 5, 9),
 (4, 0, 8),
 (4, 1, 1),
 (4, 2, 7),
 (4, 3, 0),
 (4, 4, 2),
 (4, 5, 8)]

In [35]:
#MAP
def prod(triplet):
    i,j,Mij = triplet
    return i, Mij*V[j]

mapped = list(map(prod, M_liste))
mapped

[(0, 7),
 (0, 18),
 (0, 16),
 (0, 7),
 (0, 54),
 (0, 35),
 (1, 5),
 (1, 18),
 (1, 32),
 (1, 0),
 (1, 36),
 (1, 14),
 (2, 0),
 (2, 54),
 (2, 0),
 (2, 5),
 (2, 6),
 (2, 35),
 (3, 7),
 (3, 45),
 (3, 24),
 (3, 7),
 (3, 6),
 (3, 63),
 (4, 8),
 (4, 9),
 (4, 56),
 (4, 0),
 (4, 12),
 (4, 56)]

In [42]:
#SHUFFLE AND SORT
from collections import defaultdict
dico = defaultdict(list)
for k,v in mapped :
    dico[k].append(v)
shuffled = list(dico.items())
shuffled

[(0, [7, 18, 16, 7, 54, 35]),
 (1, [5, 18, 32, 0, 36, 14]),
 (2, [0, 54, 0, 5, 6, 35]),
 (3, [7, 45, 24, 7, 6, 63]),
 (4, [8, 9, 56, 0, 12, 56])]

In [43]:
#REDUCE
def reducer(key, values):
    return (key, sum(values))

reduced = list(map(lambda tup: reducer(tup[0], tup[1]), shuffled))
reduced

[(0, 137), (1, 105), (2, 100), (3, 152), (4, 141)]

In [44]:
#si on veut représenter le vecteur sous forme de liste:
[x[1] for x in reduced]

[137, 105, 100, 152, 141]

*Pour aller plus loin:* cas où $V$ ne peut pas être stocké entièrement dans les noeuds

La solution est assez triviale en théorie: puisque $V$ ne rentre pas dans un noeud, il faut le découper de sorte que chaque noeud s'occupe d'une partie du calcul.  
**Attention** derrière la simplicité de cette solution, il y a toutefois une petite subtilité. En effet, puisqu'on découpe $V$, il faut aussi découper $M$ de manière cohérente.

Dans notre exemple $M$ est de taille (5,6) et $V$ et de taille 6 c'est-à-dire en fait de taille (6,1).  
On découpe $V$ horizontalement en trois vecteurs de taille (2,1).  
Il faut donc pour pouvoir faire le produit découper $M$ "verticalement" en trois matrices de tailles (5,2).
Ensuite on aura plus qu'à regrouper les résultats.

In [51]:
print(M)
print(V)

[[7 2 2 7 9 5]
 [5 2 4 0 6 2]
 [0 6 0 5 1 5]
 [7 5 3 7 1 9]
 [8 1 7 0 2 8]]
[1 9 8 1 6 7]


In [52]:
M_splitted = [M[:,0:2], M[:,2:4],  M[:,4:6]]
V_splitted = [V[0:2], V[2:4], V[4:6]]

#on peut recréer un couple sous-matrice/sous-vecteur qui correspond à l'information stockée dans chaque noeud du cluster
MV = [(M_splitted[k], V_splitted[k]) for k in range(3)]
MV

[(array([[7, 2],
         [5, 2],
         [0, 6],
         [7, 5],
         [8, 1]]),
  array([1, 9])),
 (array([[2, 7],
         [4, 0],
         [0, 5],
         [3, 7],
         [7, 0]]),
  array([8, 1])),
 (array([[9, 5],
         [6, 2],
         [1, 5],
         [1, 9],
         [2, 8]]),
  array([6, 7]))]

In [53]:
#transformation en liste de triplets
def triplets(block):
    M = block[0]
    V = block[1]
    return [(i,j,M[i,j]) for i in range(M.shape[0]) for j in range(M.shape[1])], list(V)

MV_triplets = list(map(triplets, MV))
MV_triplets

[([(0, 0, 7),
   (0, 1, 2),
   (1, 0, 5),
   (1, 1, 2),
   (2, 0, 0),
   (2, 1, 6),
   (3, 0, 7),
   (3, 1, 5),
   (4, 0, 8),
   (4, 1, 1)],
  [1, 9]),
 ([(0, 0, 2),
   (0, 1, 7),
   (1, 0, 4),
   (1, 1, 0),
   (2, 0, 0),
   (2, 1, 5),
   (3, 0, 3),
   (3, 1, 7),
   (4, 0, 7),
   (4, 1, 0)],
  [8, 1]),
 ([(0, 0, 9),
   (0, 1, 5),
   (1, 0, 6),
   (1, 1, 2),
   (2, 0, 1),
   (2, 1, 5),
   (3, 0, 1),
   (3, 1, 9),
   (4, 0, 2),
   (4, 1, 8)],
  [6, 7])]

In [54]:
#MAP
def mapper(block):
    M = block[0]
    V = block[1]
    return [(triplet[0], triplet[2]*V[triplet[1]]) for triplet in M]
    
mapped = list(map(mapper, MV_triplets))
mapped

[[(0, 7),
  (0, 18),
  (1, 5),
  (1, 18),
  (2, 0),
  (2, 54),
  (3, 7),
  (3, 45),
  (4, 8),
  (4, 9)],
 [(0, 16),
  (0, 7),
  (1, 32),
  (1, 0),
  (2, 0),
  (2, 5),
  (3, 24),
  (3, 7),
  (4, 56),
  (4, 0)],
 [(0, 54),
  (0, 35),
  (1, 36),
  (1, 14),
  (2, 6),
  (2, 35),
  (3, 6),
  (3, 63),
  (4, 12),
  (4, 56)]]

In [55]:
# SHUFFLE AND SORT
def shfld(block):
    dico = defaultdict(list)
    list(map(lambda tup : dico[tup[0]].append(tup[1]), block))
    return list(dico.items())

shuffled = list(map(shfld, mapped))
shuffled

[[(0, [7, 18]), (1, [5, 18]), (2, [0, 54]), (3, [7, 45]), (4, [8, 9])],
 [(0, [16, 7]), (1, [32, 0]), (2, [0, 5]), (3, [24, 7]), (4, [56, 0])],
 [(0, [54, 35]), (1, [36, 14]), (2, [6, 35]), (3, [6, 63]), (4, [12, 56])]]

In [56]:
#REDUCE
def reducer(block):
    return list(map(lambda tup: (tup[0], sum(tup[1])), block))

reduced = list(map(reducer, shuffled))
reduced

[[(0, 25), (1, 23), (2, 54), (3, 52), (4, 17)],
 [(0, 23), (1, 32), (2, 5), (3, 31), (4, 56)],
 [(0, 89), (1, 50), (2, 41), (3, 69), (4, 68)]]

In [57]:
#On réitère SHUFFLE AND SORT + REDUCE sur la liste qui regroupe les résultats de chaque noeud
liste = [item for sublist in reduced for item in sublist]
reducer(shfld(liste))

[(0, 137), (1, 105), (2, 100), (3, 152), (4, 141)]

In [58]:
#pour avoir le vecteur final
[c[1] for c in reducer(shfld(liste))]

[137, 105, 100, 152, 141]