# Map / Reduce

Le but de cet exercice est de simuler l'exécution d'un traitement Map / Reduce, en fournissant les programmes "mapper" et "reducer" à une fonction Python qui va émuler une plateforme big data. L'intérêt est de comprendre comment écrire ces mappers et reducers.

Concrètement, il s'agit de traiter les fichiers de séries temporelles de machines du TP 1. Pour rappel, chaque machine a un ID (ex. `NV_1`) et plusieurs fichiers (ex. `NV_1.csv` à `NV_26.csv`) qui mis bout à bout constituent 1 semaine de séries temporelles variées.

Chaque fichier a la structure suivante :
```
timestamp,valeur1,valeur2,...
```

Le traitement dans son ensemble doit produire, pour chaque machine, les timestamps minimum et maximum rencontrés dans tous les fichiers de la machine. Il y a donc une clef de regroupement : l'ID de machine.

![](Flot.png)

Le notebook vous guide dans l'écriture de ce traitement.

# Import de packages

In [3]:
import glob
import os
import itertools as it
import pandas as pd

On récupère la liste des fichiers de séries temporelles (cf. TP 1)

In [19]:
ts_filenames = glob.glob('data/ts/*.csv')
print(f'{len(ts_filenames)} fichiers de séries')

364 fichiers de séries


# Ecriture des "programmes" de Map et de Reduce
On écrit ces "programmes" sous forme de fonctions, et on laisse ensuite la "plateforme" (ici simulée) les exécuter sous forme de tâches.

Le programme de Map ("mapper") va s'exécuter sur **chaque fichier** indépendamment des autres, et produire en sortie un couple `(clef, valeur)`. Ce sont les résultats intermédiaires. Il prend en paramètre le nom d'un fichier unique.

Le programme de Reduce ("reducer") va s'exécuter sur **chaque ensemble de résultats de Map identifié par une clef** pour l'agréger. Il va travailler de manière indépendante pour chaque valeur de la clef en sortie du Map. Il produit lui aussi un résultat, final cette fois, sous la forme `(clef, valeur)`.

Entre les deux, une étape "cachée" par la plateforme regroupe les résultats intermédiaires qui ont la même valeur de clef, pour les présenter au Reduce. Ce dernier prend donc en entrée 2 paramètres : une valeur de clef, et une liste de valeurs intermédiaires.

Un aspect important de la conception d'un programme Map / Reduce est le choix des clefs, il va déterminer le succès du traitement. Dans notre cas :
- en sortie de Map, la clef sera l'ID de machine, et la valeur le couple `(timestamp mini, timestamp maxi)` du fichier lu
- en sortie de Reduce, la clef sera aussi l'ID de machine, et la valeur le couple `(timestamp mini, timestamp maxi)` de tous les fichiers de la machine

In [30]:
# 1.Programme de Map ("mapper")
#
# La fonction doit :
# - extraire l'ID de machine du nom du fichier (voir TP 1 pour le morceau de code)
# - lire le fichier sous forme de dataframe, et extraire les timestamps min et max
# - renvoyer un tuple Python (machine_id, (min_timestamp, max_timestamp))
#
# Attention aux parenthèses : (min_timestamp, max_timestamp) est un sous-tuple dans le tuple global
#
# Voir la cellule ci-dessous pour un exemple d'entrée et de sortie attendue

def mapper(ts_filename):
    machine_id = os.path.basename(ts_filename).split('_')[0]
    machine_dataframe = pd.read_csv(ts_filename, parse_dates=['timestamp'])
    return (machine_id, (machine_dataframe['timestamp'].min(), machine_dataframe['timestamp'].max()))

Vérification du fonctionnement. On prend le premier fichier de la liste et on le traite unitairement pour inspecter le résultat.

Attendu : `('NA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-01-24 15:59:47')))`

In [31]:
mapper(sorted(ts_filenames)[0])

('NA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-01-24 15:59:47')))

In [35]:
# 2.Programme de Reduce ("reducer")
#
# La fonction reçoit un tuple (machine_id, ranges) en entrée, composé de :
# - machine_id : l'ID d'une machine
# - ranges : une liste de couples (min_timestamp, max_timestamp) qui proviennent tous de la même machine
#
# En sortie, elle doit produire un de tuple (machine_id, (timestamp_min, timestamp_max)) agrégée pour la machine donnée
# timestamp_min = minimum des min_timestamp
# timestamp_max = maximum des max_timestamp
#
# Attention aux parenthèses : (timestamp_min, timestamp_max) est un sous-tuple dans le tuple global
#
# Voir la cellule ci-dessous pour un exemple d'entrée et de sortie attendue

def reducer(machine_and_ranges):
    machine_id, ranges = machine_and_ranges
    
    timestamp_min = min([timestamps[0] for timestamps in ranges])
    timestamp_max = max([timestamps[1] for timestamps in ranges])

    return (machine_id, (timestamp_min, timestamp_max))

On vérifie le fonctionnement sur une entrée simulée (par simplicité, en utilisant des entiers au lieu des dates).

Attendu : `('MACHINE', (1, 12))`

In [37]:
# Cette entrée de Reduce suppose que le mapper a produit les résultats suivants :
# - (MACHINE, (5, 8))
# - (MACHINE, (1, 4))
# - (MACHINE, (3, 12))
# - (MACHINE, (2, 9))
# C'est l'étape cachée qui les met en ordre différemment

fake_input = ('MACHINE', [(5, 8), (1, 4), (3, 12), (2, 9)])

reducer(fake_input)

('MACHINE', (1, 12))

# Exécution globale
On simule ici un moteur de calcul basé sur Map / Reduce, en utilisant les programmes du mapper et du reducer, pour produire le résultat attendu.

Cette fonction est prédéfinie, pas besoin de la retoucher, elle est utilisée à la fin.

In [38]:
# La fonction prend en paramètres :
# - la liste des données d'entrée du traitement
# - la fonction du mapper
# - la fonction du reducer
#
# Elle retourne une liste de résultats du reducer

def appel_plateforme_map_reduce(data, mapper, reducer):
    get_key = lambda x: x[0]
    return [
        reducer((k, [y for _, y in v]))
        for k, v in it.groupby(
            sorted(map(mapper, data), key=get_key),
            get_key
        )
    ]

Si tout s'est bien passé, le résultat doit être similaire à ceci :
```
[('NA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:02'))),
 ('NA2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:43'))),
 ('NP1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:41'))),
 ('NU1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:55'))),
 ('NU2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:57'))),
 ('NU3', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:55'))),
 ('VA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:47'))),
 ('VA2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:22'))),
 ('VF1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:50'))),
 ('VM1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:18'))),
 ('VP1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:39'))),
 ('VP2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:34'))),
 ('VU1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:58'))),
 ('VU2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:57')))]
```

In [39]:
appel_plateforme_map_reduce(ts_filenames, mapper, reducer)

[('NA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:02'))),
 ('NA2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:43'))),
 ('NP1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:41'))),
 ('NU1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:55'))),
 ('NU2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:57'))),
 ('NU3', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:55'))),
 ('VA1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:47'))),
 ('VA2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:22'))),
 ('VF1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:50'))),
 ('VM1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:18'))),
 ('VP1', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:39'))),
 ('VP2', (Timestamp('2024-01-24 08:00:00'), Timestamp('2024-02-01 23:59:34'))),
 ('VU1', (Timestamp('2024-01-24 08:00:00

# Raffinements

## Filtre

Changer le traitement pour qu'il ne traite que les machines "Préparation" (leur nom contient un "P" : exemple "NP1").

Idéalement, faire le test dès le mapper, pour éviter des calculs inutiles sur les timestamps. Il faudra quand même que le mapper renvoie un tuple, sous la forme `(clef, valeur)` pour que la plateforme l'interprète. Vous pouvez jouer sur la clef ou la valeur (vous n'êtes pas obligés d'y mettre un ID de la machine ou des timestamps) pour signaler au reducer qu'il faut ignorer certaines données.

In [44]:
def mapper_filtre(ts_filename):
    machine_id = os.path.basename(ts_filename).split('_')[0]
    
    if 'P' in machine_id:
        machine_dataframe = pd.read_csv(ts_filename, parse_dates=['timestamp'])
        return (machine_id, (machine_dataframe['timestamp'].min(), machine_dataframe['timestamp'].max()))
    else:
        # On renvoie un tuple spécial
        return ('IGNORE', None)

In [45]:
def reducer(machine_and_ranges):
    machine_id, ranges = machine_and_ranges
    
    if machine_id == 'IGNORE':
        # On renvoie None par convention
        return None
    else:
        timestamp_min = min([timestamps[0] for timestamps in ranges])
        timestamp_max = max([timestamps[1] for timestamps in ranges])
        return (machine_id, (timestamp_min, timestamp_max))

## Agrégation différente

Pour chaque machine de type "Préparation", compter le nombre de fois que le nombre de produits scannés n'a pas augmenté d'une mesure à l'autre.

Attention à 2 choses !
- il faut trier les dataframes par timestamp à la lecture car on doit comparer des mesures successives
- le mapper reçoit un nom de fichier unique en entrée et tous les fichiers sont traités indépendamment. C'est le principe de Map/Reduce. Que se passe-t-il si le nombre de produits scannés est le même à la fin du fichier N et au début du fichier N+1, alors qu'ils sont indépendants ?

Exemple : fin de `NP1_10.csv` et début de `NP1_11.csv` :
```
# Fin de NP1_10.csv
...
2024-01-27 15:57:42,9727
2024-01-27 15:58:10,9727
2024-01-27 15:58:40,9728
2024-01-27 15:59:10,9729
2024-01-27 15:59:42,9729   <--

# Début de NP1_11.csv :
2024-01-27 16:00:10,9729   <-- même valeur
2024-01-27 16:00:37,9731
2024-01-27 16:01:04,9731
2024-01-27 16:01:31,9733
2024-01-27 16:02:02,9735
...
```

Le dernier point n'est pas facile à traiter :-)