# Recherche d'Information et traitement de données massives

# Lab 5 : Recherche d’information sur le Web : passage à l'échelle

<div class="alert alert-block alert-danger">
<b>IMPORTANT:</b> Assurez-vous que votre notebook utilise un kernel python 3 !
</div>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


  + Ce Lab s'intéresse au problème de passage à l'échelle des données du Web, à l'aide d'une approche reposant sur le paradigme de MapReduce. Vous allez notamment écrire un algorithme dans le cadre de MapReduce.
  + Plusieurs exercices d'approfondissement facultatifs vous sont aussi proposés.


## EXERCICES : écriture d'algorithmes en MapReduce
### Avant propos : un bref rappel de MapReduce
Comme nous l'avons vu dans le cours 5, Map Reduce est un **modèle de programmation** (ou patron de programmation) qui fournit un cadre pour automatiser le calcul distribué sur des données massives. Ce cadre propose d'écrire tout traitement à l'aide de deux opérations `map` et  `reduce` et de la représentation des données sous la forme de paires `(clé,valeur)`.
MapReduce est aussi un **framework d'exécution** qui permet l'éxécution distribuée des programmes écrits selon ce cadre, et cela de manière totalement transparente selon le schéma rappelé ci-dessous.


<img src="https://zupimages.net/up/21/18/iohi.png" width="500" height="500" />

1.  Un certain nombre de tâches *Map* sont alimentées par une ou
    plusieurs partitions de données en provenance d'un système de
    fichiers distribués (par exemple GFS, HDFS, S3).  Ces tâches *Map* transforment ces données en une séquence de paires clés-valeurs. C'est le développeur qui détermine comment sont calculées les paires
    clés-valeurs en fonction des données en entrée en écrivant le code dans la fonction `map()`.
2.  Les paires clés-valeurs sont collectées par un contrôleur maître et triées par clés.
    Les paires sont redirigées vers les tâches *Reduce* de façon à ce que toutes les paires
    qui ont la même clé soient redirigées vers la même tâche *Reduce*.
3.  Les tâches *Reduce* traitent les clés une par une. Elles agrègent/combinent les valeurs associées
    aux clés selon le code spécifié dans la fonction `reduce()`.



Dans le cours, nous avons vu comment écrire en MapReduce un programme permettant de compter le nombre d'occurences des mots d'une collection donnée (programme **WordCount**). C'est une tâche qui peut sembler très simple à écrire quand on travaille sur une collection de petite taille. Vous avez d'ailleurs proposé une solution non-distribuée pour cette tâche pour la collection TIME (étape de filtrage par mots fréquents ou calcul de la pondération TF). En python, nous pouvons écrire cela très simplement, par exemple comme dans le programme ci-dessous.


In [None]:
def count_frequency(collection):
    tokens_count={}
    for doc_id in collection:
        for token in collection[doc_id]:
            if token in tokens_count.keys():
                tokens_count[token]+=1
            else:
                tokens_count[token]=+1
    return tokens_count

Dans le cas où l'on considère une grosse collection dont le stokage et les traitements nécéssitent d'être distribués, alors cette tâche devient plus diffile à écrire, notamment car il faut prendre en compte la distribution des données et des traitements. Le modèle MapReduce apporte une solution à cela en proposant d'écrire ce type de programme selon le principe suivant :

+ On considère que le document ou la partie du document est donné sous la forme d'une paire (clé,valeur) avec comme clé, l'identifiant du document et comme valeur le contenu textuel du document.

+ Étant donnée une collection d’items, appliquer à chaque item un processus de transformation individuelle (étape `MAP`) qui produit des valeurs intermédiaires étiquetées. Dans le cas du WordCount, il s'agit juste de prendre chaque token du document ou de la partie du document et de la transformer en la paire (mot,1) comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/5cu8.jpg" width="500" height="500" />


+ Regrouper ces valeurs intermédiaires par étiquette (étape faite par le framework `SHUFFLE AND SORT`). On aura dans le cas du WordCount en sortie de cette étape en ensemble de paires (mot, [1,1,1,1..]) avec comme clés les différents mots du documents et comme valeur une liste des 1-occurence des mots dans le document considéré.
+ Appliquer une fonction d'agrégation à chaque groupe (étape `REDUCE`).Dans le cas du Wordcount il s'agit juste de sommer la liste des occurences comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/dabe.jpg" width="500" height="500" />


### Concevoir un programme selon le cadre MapReduce : un peu de méthodologie

Pour faciliter l'écriture de programmes selon le cadre MapReduce, il est souvent nécessaire de se poser d'abord  les questions ci-dessous.

 + De quelle nature sont les documents en entrée ? Comment les représenter sous une forme `(clé, valeur)` ?
 + Quelles sont les groupes visés ? Quelles sont les valeurs intermédiaires que je cherche à produire depuis mon document d'entrée ?
 + Quelle est la valeur finale ? Quelle est la nature de l'agrégation pour produire cette valeur finale ? 

Une fois les réponses à ces questions claires, il suffira ensuite :
 + D'écrire la fonction de `map` qui prend en entrée un document et qui produit une séquence de paires `(clé, valeur)`.
 + D'avoir en tête que ces différentes paires sont collectées par le framework et triées par clés pour donner une entrée à la tâche d'agrégation l'ensemble des paires qui ont la même clé. 
 + D'écrire la fonction `reduce` qui prend en entrée une paire `(clé, liste(valeurs))` en spécifiant le traitement d'agrégation voulu. 

Le schéma d'éxécution et sa trace sur le problème du WordCount vous est donné ci-dessous, pour rappel.

<img src="https://zupimages.net/up/21/18/otvx.jpg" width="500" height="500" />

<img src="https://zupimages.net/up/21/18/lsoz.jpg" width="500" height="500" />

Nous allons maintenant appliquer le cadre MapReduce au calcul de la pondération `TF-IDF`.

Dans le cas où l'on considère une grosse collection dont le stokage et les traitements nécéssitent d'être distribués, alors cette tâche devient plus diffile à écrire, notamment car il faut prendre en compte la distribution des données et des traitements. Le modèle MapReduce apporte une solution à cela en proposant d'écrire ce type de programme selon le principe suivant :

+ On considère que le document ou la partie du document est donné sous la forme d'une paire (clé,valeur) avec comme clé, l'identifiant du document et comme valeur le contenu textuel du document.

+ Étant donnée une collection d’items, appliquer à chaque item un processus de transformation individuelle (étape `MAP`) qui produit des valeurs intermédiaires étiquetées. Dans le cas du WordCount, il s'agit juste de prendre chaque token du document ou de la partie du document et de la transformer en la paire (mot,1) comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/5cu8.jpg" width="500" height="500" />


+ Regrouper ces valeurs intermédiaires par étiquette (étape faite par le framework `SHUFFLE AND SORT`). On aura dans le cas du WordCount en sortie de cette étape en ensemble de paires (mot, [1,1,1,1..]) avec comme clés les différents mots du documents et comme valeur une liste des 1-occurence des mots dans le document considéré.
+ Appliquer une fonction d'agrégation à chaque groupe (étape `REDUCE`).Dans le cas du Wordcount il s'agit juste de sommer la liste des occurences comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/dabe.jpg" width="500" height="500" />


### Concevoir un programme selon le cadre MapReduce : un peu de méthodologie

Pour faciliter l'écriture de programmes selon le cadre MapReduce, il est souvent nécessaire de se poser d'abord  les questions ci-dessous.

 + De quelle nature sont les documents en entrée ? Comment les représenter sous une forme `(clé, valeur)` ?
 + Quelles sont les groupes visés ? Quelles sont les valeurs intermédiaires que je cherche à produire depuis mon document d'entrée ?
 + Quelle est la valeur finale ? Quelle est la nature de l'agrégation pour produire cette valeur finale ? 

Une fois les réponses à ces questions claires, il suffira ensuite :
 + D'écrire la fonction de `map` qui prend en entrée un document et qui produit une séquence de paires `(clé, valeur)`.
 + D'avoir en tête que ces différentes paires sont collectées par le framework et triées par clés pour donner une entrée à la tâche d'agrégation l'ensemble des paires qui ont la même clé. 
 + D'écrire la fonction `reduce` qui prend en entrée une paire `(clé, liste(valeurs))` en spécifiant le traitement d'agrégation voulu. 

Le schéma d'éxécution et sa trace sur le problème du WordCount vous est donné ci-dessous, pour rappel.

<img src="https://zupimages.net/up/21/18/otvx.jpg" width="500" height="500" />

<img src="https://zupimages.net/up/21/18/lsoz.jpg" width="500" height="500" />

Nous allons maintenant appliquer le cadre MapReduce au calcul de la pondération `TF-IDF`.

Dans le cas où l'on considère une grosse collection dont le stokage et les traitements nécéssitent d'être distribués, alors cette tâche devient plus diffile à écrire, notamment car il faut prendre en compte la distribution des données et des traitements. Le modèle MapReduce apporte une solution à cela en proposant d'écrire ce type de programme selon le principe suivant :

+ On considère que le document ou la partie du document est donné sous la forme d'une paire (clé,valeur) avec comme clé, l'identifiant du document et comme valeur le contenu textuel du document.

+ Étant donnée une collection d’items, appliquer à chaque item un processus de transformation individuelle (étape `MAP`) qui produit des valeurs intermédiaires étiquetées. Dans le cas du WordCount, il s'agit juste de prendre chaque token du document ou de la partie du document et de la transformer en la paire (mot,1) comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/5cu8.jpg" width="500" height="500" />


+ Regrouper ces valeurs intermédiaires par étiquette (étape faite par le framework `SHUFFLE AND SORT`). On aura dans le cas du WordCount en sortie de cette étape en ensemble de paires (mot, [1,1,1,1..]) avec comme clés les différents mots du documents et comme valeur une liste des 1-occurence des mots dans le document considéré.
+ Appliquer une fonction d'agrégation à chaque groupe (étape `REDUCE`).Dans le cas du Wordcount il s'agit juste de sommer la liste des occurences comme illustré ci-dessous.

<img src="https://zupimages.net/up/21/18/dabe.jpg" width="500" height="500" />


### Concevoir un programme selon le cadre MapReduce : un peu de méthodologie

Pour faciliter l'écriture de programmes selon le cadre MapReduce, il est souvent nécessaire de se poser d'abord  les questions ci-dessous.

 + De quelle nature sont les documents en entrée ? Comment les représenter sous une forme `(clé, valeur)` ?
 + Quelles sont les groupes visés ? Quelles sont les valeurs intermédiaires que je cherche à produire depuis mon document d'entrée ?
 + Quelle est la valeur finale ? Quelle est la nature de l'agrégation pour produire cette valeur finale ? 

Une fois les réponses à ces questions claires, il suffira ensuite :
 + D'écrire la fonction de `map` qui prend en entrée un document et qui produit une séquence de paires `(clé, valeur)`.
 + D'avoir en tête que ces différentes paires sont collectées par le framework et triées par clés pour donner une entrée à la tâche d'agrégation l'ensemble des paires qui ont la même clé. 
 + D'écrire la fonction `reduce` qui prend en entrée une paire `(clé, liste(valeurs))` en spécifiant le traitement d'agrégation voulu. 

Le schéma d'éxécution et sa trace sur le problème du WordCount vous est donné ci-dessous, pour rappel.

<img src="https://zupimages.net/up/21/18/otvx.jpg" width="500" height="500" />

<img src="https://zupimages.net/up/21/18/lsoz.jpg" width="500" height="500" />

Nous allons maintenant appliquer le cadre MapReduce au calcul de la pondération `TF-IDF`.

##  Exercice 1 : Tf-IDF en MapReduce

L'objectif est de calculer `Tf-IDF` pour un ensemble de documents en utilisant le modèle MapReduce. Pour rappel, comme vu dans le cours 1, `Tf-IDF (Term frequency-Inverse Document Frequency)` est une statistique qui traduit le niveau d'importance d'un terme $t$ pour un document $d$ appartenant à une collection (ou un corpus) de taille $N$. Dans cet exercice, on considèrera la formulation mathématique suivante :

$$TF-IDF(t_i,d) = ( \frac{tf_{t_i,d}}{\sum_{t_k \in d} tf_{t_k,d}}  ) \times \log \left( \frac{N}{df_t}\right)$$

où $tf_{t,d}$ est le nombre d'occurrence du terme $t$ dans le document $d$, $N$ le nombre de documents de la collection et $df_t$ le nombre de documents dans lesquels $t$ est présent.

Vous pouvez prendre le temps de regarder la correction du Lab2 dans lequel nous avons proposé des solutions pour calculer cette statistique sur la collection TIME.

Pour calculer cette statistique sur une très grosse collection de documents, il est nécéssaire de distribuer son calcul. Nous allons pour cela découper le travail en 3 étapes :

 + **1- Le calcul, pour chaque mot, de son nombre d'occurences par document.** Nous appelerons cette étape `WordFrequenceInDocs`.
 + **2- Le calcul du nombre de mots par documents**. Nous appelerons cette étape `WordCountsForDocs`.
 + **3- La combinaison des 2 informations précédentes pour calculer le TF_IDF**. Nous appelerons cette étape `WordsInCorpusTFIDF`.
 
#### Etape 1 : WordFrequenceInDocs

Il s'agit donc ici de formuler le problème du calcul du nombre d'occurences des mots d'un document avec le cadre MapReduce. C'est un problème très proche du problème du WordCount vu en cours et rappelé ci-dessus et vous devriez pouvoir y répondre rapidement.

**Données d'entrée**

Votre premier travail est de proposer une représentation adéquate de vos données d'entrée (une collection de document) pour le cadre MapReduce. On considère que chaque tâche `MAP` traitera un seul document (ou partie de document) et c'est donc ce document (ou cette partie) qui sera pris comme entrée de la fonction `MAP`.
Que proposez-vous ?

In [None]:
# A compléter 

Input  : ...  #ce qu'on a
Output : ...   #ce qu'on veut


**Fonction MAP**

En vous inspirant du WordCount, écrire en pseudo-code, la fonction MAP pour cette étape. Attention, il faudra pouvoir garder l'information relative à l'identifiant du document considéré.


In [None]:
# A compléter 

Fonction MAP:

... pseudo code ....

**Fonction REDUCE**

L'ensemble des paires (clé,valeur) provenant des différents noeuds `MAP` sont collectées et triées par clés intermédiaires et donc fournies sous cette forme à la tâche `REDUCE`. Ecrire, en pseudo-code la fonction `REDUCE` pour `WordFrequenceInDocs`.



In [None]:
# A compléter 

Fonction REDUCER:

Input : ... #ce qu'on a
Output: ... #ce qu'on veut 

... pseudo code ....

#### Etape 2: WordCountsForDocs

Il s'agit maintenant de calculer le nombre de mots par documents. Deux petites indications :
+ L'entrée de cette tâche sera la sortie de la tâche précédente soit une paire de type 
`((word,doc_id),n)` avec `n` le nombre d'occurence du terme word dans le document `doc_id` soit le tf.
+ Pour cette tâche, il pourrait être intéressant de pouvoir avoir accès aux documents par le mécanisme (clé,valeur).

Ecrire, en pseudo-code, la fonction `MAP`, puis la fonction `REDUCE` vous permettant de faire cette étape.

In [None]:
# A compléter

Fonction MAP:
Input: ...
Output: ...
    
... pseudo code ....
    
    
Fonction REDUCER:
Input: ...
Output: ...
    
... pseudo code ....

#### Etape 3: WordsInCorpusTFIDF

Il s'agit maintenant de combiner les informations précédentes pour calculer le TF-IDF pour chaque terme. On considère à nouveau ici que l'entrée est la sortie de la tâche précédente soit une paire de type 
`((word,doc_id),n/N)` avec `n` le nombre d'occurence du terme word dans le document `doc_id` et `N` le nombre total de mots dans le document `doc_id`. On supposera aussi que le nombre de documents $D$ dans la collection est passé au système sous la forme d'une constante.

Ecrire, en pseudo-code, la fonction `MAP`, puis la fonction `REDUCE` vous permettant de faire cette étape.


In [None]:
# A compléter

Fonction MAP:
Input: ...
Output: ...

... pseudo code ....
    
    
Fonction Reducer:
Input: ...
Output: ...

... pseudo code ....

## Exercice 2 : Construction d'un index inversé en MapReduce

Il s'agit ici de refléchir à comment un index inversé de documents peut être construit de manière distribuée à l'aide de MapReduce. 

1. Ecrivez en pseudo-code la fonction `MAP` en précisant bien le type des données d'entrée.



In [None]:
# A compléter

Input: ...
Output: ...

Fonction MAP:

... pseudo code ....
    


2. Ecrivez en pseudo-code la fonction `REDUCE`.

In [None]:
# A compléter

Fonction REDUCER:

Input : ...
Output: ...


... pseudo code ....

3. Un test sur machine

Pour vous permettre de mettre en oeuvre ce mécanisme de manière concrète, vous allez appliquer cela à l'indexation d'une collection [books.json](../Data/books.json).
On cherche à produire un fichier inversé qui indique pour chaque mot, la liste des livres dans lesquels il apparaît en utilisant le cadre MapReduce.

Pour cela, nous vous fournissons dans le répertoire [Utils](./Utils) un fichier [Lab4.py](./Utils/Lab4.py) qui contient un ensemble de fonctions qui vous seront utiles.




Importer les fonctions utiles de ce module python à l'aide de la commande ci-dessous.

In [None]:
from Utils.Lab4 import *

A l'aide de la fonction `readData(filename)` de ce module, charger et afficher le contenu du fichier [books.json](../Data/books.json).

In [None]:
# A compléter

...


Ecrire la fonction `mapper` nécessaire à la construction de l'index inversé.

In [None]:
# A compléter

def mapper(data):
    ... code pyhton ...


Ecrire la fonction `reducer` nécessaire à la construction de l'index inversé.

In [None]:
# A compléter

def reducer(data): 
    ... code pyhton ...
    

Tester vos deux fonctions en appliquant le code ci-dessous

In [None]:
def invertedIndexExample(filename):
    m = MapReduce(mapper, reducer)
    results = m(readData(filename))
    for w, b in results:
        print("mot : ", w, "livres : ", b)
    return
    
invertedIndexExample('./Data/books.json')

## Exercice 3 : Requête SQL en MapReduce (optionnel)

On cherche à reproduire la requête :

    SELECT *
    FROM Orders, LineItem
    WHERE Order.order_id = LineItem.order_id;

Le fichier de données est [records.json](./Data/records.json). Vous devez réaliser la jointure sur le numéro de commande qui aparaît en colonne 1.
Vous devez écrire les fonctions `mapper()` et `reducer()` :


Ecrire la fonction `mapper` nécessaire à la préparation des données pour la jointure


In [None]:
# A compléter

def mapper2(data):
    ... code python ...


Ecrire la foncion `reducer2` nécessaire à l'agégation des résultats :


In [None]:
# A compléter

def reducer2(data): 
    ... code pyhton ...

In [None]:
def joinExample(filename):
    m = MapReduce(mapper2, reducer2)
    results = m(readData(filename))
    results = itertools.chain.from_iterable(results)
    for r in results:
        print(r)
    return

joinExample('./Data/records.json')

doit produire les lignes suivantes :

    ['order', '1', '36901', 'O', '173665.47', '1996-01-02', '5-LOW', 'Clerk#000000951', '0', 
     'nstructions sleep furiously among ', 'line_item', '1', '155190', '7706', '1', '17', '21168.23', 
     '0.04', '0.02', 'N', 'O', '1996-03-13', '1996-02-12', '1996-03-22', 'DELIVER IN PERSON', 
     'TRUCK', 'egular courts above the']
    ['order', '1', '36901', 'O', '173665.47', '1996-01-02', '5-LOW', 'Clerk#000000951', '0', 
     'nstructions sleep furiously among ', 'line_item', '1', '67310', '7311', '2', '36', '45983.16', 
     '0.09', '0.06', 'N', 'O', '1996-04-12', '1996-02-28', '1996-04-20', 'TAKE BACK RETURN', 
     'MAIL', 'ly final dependencies: slyly bold ']
    ['order', '1', '36901', 'O', '173665.47', '1996-01-02', '5-LOW', 'Clerk#000000951', '0', 
     'nstructions sleep furiously among ', 'line_item', '1', '63700', '3701', '3', '8', '13309.60', 
     '0.10', '0.02', 'N', 'O', '1996-01-29', '1996-03-05', '1996-01-31', 'TAKE BACK RETURN', 
     'REG AIR', 'riously. regular, express dep']
    ...

Vous remarquerez que la fonction `mapper()` doit toujours retourner une liste. Ceci est dû à la structure de la classe MapReduce.