# TP3 - Market Basket Analysis
INF8111 - Fouille de données, Eté 2024
### Membres de l'équipe
    - Mohamed Ali LAJNEF  (2404991)
    - Yannis YAHYA (1886365)


In [1]:
import os

In [2]:
ROOT = 'gs://bucket_inf8111_2/data/'

In [3]:
ROOT

'gs://bucket_inf8111_2/data/'

## Date et directives de remise
Vous remettrez l'ensemble des fichiers fichiers nécessaires à la réalisation de TP au sein d'une archive compressée ZIP nommée TP3\_NomDuMembre1\_NomDuMembre2\_NomDuMembre3

Cette archive devra, au minimum, inclure ce notebook, nommé TP3\_NomDuMembre1\_NomDuMembre2\_NomDuMembre3.ipynb

**Date limite: dd mm à hh:mm**.

## 1. Configuration de Spark

#### Exécution avec le notebook Google Colab (fortement conseillé)

In [4]:
import os
!apt-get install openjdk-17-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
!pip install pyspark

[0m

#### Exécution en local



Spark fonctionne sur les systèmes Windows et UNIX (par exemple, Linux, Mac OS). Il est facile d'exécuter Spark localement sur une seule machine - tout ce dont vous avez besoin est d'avoir Java installé sur votre système PATH, ou la variable d'environnement JAVA_HOME pointant vers une installation Java. Il est obligatoire que le **JDK v8** soit installé sur votre système, car Spark ne prend actuellement en charge que cette version. Si ce n'est pas le cas, accédez à [la page Web de Java](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) pour télécharger et installer une machine virtuelle Java. N'oubliez pas de définir la variable d'environnement JAVA_HOME pour utiliser JDK v8 si votre installation ne le fait pas automatiquement.

L'interface entre Python et Spark se fait via **PySpark**, qui peut être installé en exécutant `pip install pyspark` ou configuré en suivant la séquence ci-dessous:

1. D'abord, allez sur http://spark.apache.org/downloads
2. Sélectionnez la dernière version de Spark et le package pré-construit pour Apache Hadoop 2.7
3. Cliquez pour télécharger **spark-2.4.5-bin-hadoop2.7.tgz** et décompressez-le dans le dossier de votre choix.
4. Ensuite, exportez les variables suivantes pour lier PYSPARK (l'interface python de Spark) à votre distribution python dans votre fichier `~/.bash_profile`.

``
export SPARK_HOME=/chemin/ vers / spark-2.4.5-bin-hadoop2.7
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:$SPARK_HOME/python/lib/pyspark.zip:$ PYTHONPATH"
export PYSPARK_PYTHON=/chemin/vers/votre/python3
``

5. Exécutez `source ~./bash_profile` pour effectuer les modifications et redémarrer cette session de notebook jupyter.

#### Test de l'installation
À l'aide du code suivant, vous pouvez tester si Spark est correctement installé.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello ")
df.show()

                                                                                

+-----+
|hello|
+-----+
|spark|
+-----+


### Chargement des données

In [14]:
df_symptoms = spark.read.csv(ROOT + 'SYMPTOMS.csv', header=True, sep=',', inferSchema=True)
print('SYMPTOMS.csv')
df_symptoms.show(5)

                                                                                

SYMPTOMS.csv
+----------+--------------------+
|SYMPTOM_ID|        SYMPTOM_NAME|
+----------+--------------------+
|         0|Cardiac vein diss...|
|         1|Vaccination site ...|
|         2|Blood bicarbonate...|
|         3|  Judgement impaired|
|         4|Sinus node dysfun...|
+----------+--------------------+
only showing top 5 rows


In [15]:
df_observations = spark.read.csv(ROOT + 'OBSERVATIONS_extract.csv', header=True, sep=',', inferSchema=True)
print('OBSERVATIONS.csv')
df_observations.show(5)

                                                                                

OBSERVATIONS.csv
+--------------+--------+----------+
|OBSERVATION_ID|VAERS_ID|SYMPTOM_ID|
+--------------+--------+----------+
|             0|   25002|      9465|
|             0|   25002|      2581|
|             1|   25010|     14978|
|             2|   25011|      2581|
|             2|   25011|      6620|
+--------------+--------+----------+
only showing top 5 rows


In [16]:
df_data = spark.read.csv(ROOT + 'DATA.csv', header=True, sep=',', inferSchema=True)
print('DATA.csv')
df_data.show(5)

                                                                                

DATA.csv
+--------+-----+-------+---+
|VAERS_ID|STATE|AGE_YRS|SEX|
+--------+-----+-------+---+
|   25002|   FR|   82.0|  M|
|   25010|   FR|    1.7|  M|
|   25011|   FR|    5.0|  F|
|   25064|   FR|   NULL|  F|
|   25055|   FR|   54.0|  F|
+--------+-----+-------+---+
only showing top 5 rows


In [17]:
df_vax = spark.read.csv(ROOT + 'VAX.csv', header=True, sep=',', inferSchema=True)
print('VAX.csv')
df_vax.show(5)

VAX.csv
+--------+--------+------------------+--------------------+
|VAERS_ID|VAX_TYPE|          VAX_MANU|            VAX_NAME|
+--------+--------+------------------+--------------------+
|   25002|     PPV|  MERCK & CO. INC.|  PNEUMO (PNEUMOVAX)|
|   25010|     MMR|  MERCK & CO. INC.|MEASLES + MUMPS +...|
|   25011|     HEP|SMITHKLINE BEECHAM|   HEP B (ENGERIX-B)|
|   25055|     HEP|SMITHKLINE BEECHAM|   HEP B (ENGERIX-B)|
|   25056|     HEP|SMITHKLINE BEECHAM|   HEP B (ENGERIX-B)|
+--------+--------+------------------+--------------------+
only showing top 5 rows


In [18]:
df_observations.createOrReplaceTempView("OBSERVATIONS") # creates table 'observations'
df_symptoms.createOrReplaceTempView("SYMPTOMS") # creates table 'symptoms'
df_data.createOrReplaceTempView("DATA") # creates table 'data'
df_vax.createOrReplaceTempView("VAX") # creates table 'vax'

## 4. MBA pour le dataset complet (20 points)


Comme vous l'avez probablement remarqué, même pour un ensemble de données moins volumineux, l'algorithme MBA est coûteux en calcul. Pour cette raison, cette fois, nous allons répéter le processus, mais en utilisant maintenant Google Cloud Platform (GCP) pour créer un grand cluster. Toutes les instructions pour créer un cluster avec spark et comment soumettre un travail seront expliquées dans le laboratoire. Dans tous les cas, vous devez lire les instructions données dans le ``Instruction_GCP.pdf``.

Cette fois, nous travaillerons avec le fichier ``OBSERVATIONS.csv``, qui contient plusieurs millions d'observations.

**PRODUCTION ATTENDUE**

Après avoir exécuté le MBA pour la plus grande collection d'observation, sélectionnez au hasard UN symptôme observé dans ``OBSERVATIONS`` et affichez les règles d'association (nom du symptôme et valeur d'association) de ce symptôme, c'est-à-dire lorsque le symptôme est seul. La sortie doit être formatée dans un tableau, où chaque ligne contenant les informations d'un symptôme associé.
Afin d'accélérer son exécution, vous pouvez créer un second notebook uniquement dédié à cette partie et l'insérer dans le rendu final.

- Affichez l'identifiant (SYMPTOM_ID) et le nom du symptôme (SYMPTOM_NAME) sélectionné au hasard.
- Signalez le temps d'exécution.

**Remarque importante : joignez des captures d'écran de votre sortie et de votre configuration de cluster.**

##### captures d'écran de notre configuration de cluster  :


![Configuration_cluster_1](Configuration_cluster_1.png)
![Configuration_cluster_2](Configuration_cluster_2.png)
![Configuration_cluster_3](Configuration_cluster_3.png)

##### functions  

In [None]:
from itertools import combinations
from copy import deepcopy


def format_tuples(pattern):
    """
    Used for visualizition.
    Transforms tuples to a string since Dataframe does not support column of tuples with different sizes
    (a,b,c) -> '(a,b,c)'
    """
    return (str(pattern[0]), str(pattern[1]))

def map_to_patterns(row):
    products = row.transaction
    products = products.split(',')
    products.sort()
    for i in range(1, 4):
        for pattern in combinations(products, i):
          yield (pattern, 1)
            
def reduce_patterns(v1, v2):
  return v1 + v2

            
            
def map_to_subpatterns(pattern):
    products, count = pattern
    products_list = list(products)
    num_products = len(products_list)

    for i in range(num_products):
        subpattern = deepcopy(products_list)
        removed_product = subpattern.pop(i)
        yield (tuple(subpattern), (removed_product, count))

    yield (products, (None, count))
    
    
def map_to_assoc_rules(rule):
    subpattern, combined_rule = rule ;
    total = 1
    for value in combined_rule :
      if value[0] == None :
        total = value[1]

    new_rule = [(value , count/total) for value,count in combined_rule if value != None]
    yield(subpattern , new_rule)


### algorithm 

In [21]:
%%time
"""
TODO: run the MBA algorithm and print the requested output
"""

df_observations_all = spark.read.csv(ROOT + 'OBSERVATIONS.csv', header=True, sep=',', inferSchema=True)
print('OBSERVATIONS_ALL.csv')
df_observations_all.createOrReplaceTempView("OBSERVATIONS_ALL")
df_observations_all.show(5)



OBSERVATIONS_ALL.csv
+--------------+--------+----------+
|OBSERVATION_ID|VAERS_ID|SYMPTOM_ID|
+--------------+--------+----------+
|             0|   25002|      9465|
|             0|   25002|      2581|
|             1|   25010|     14978|
|             2|   25011|      2581|
|             2|   25011|      6620|
+--------------+--------+----------+
only showing top 5 rows

CPU times: user 3.95 ms, sys: 3.95 ms, total: 7.91 ms
Wall time: 2.56 s


                                                                                

In [22]:
list_symptoms = spark.sql("""
  SELECT
    data.VAERS_ID as VAERS_id,
    CONCAT_WS(', ', COLLECT_LIST(symp.SYMPTOM_NAME)) as transaction
  FROM DATA
  INNER JOIN OBSERVATIONS_ALL observ ON observ.VAERS_ID = data.VAERS_ID
  INNER JOIN SYMPTOMS symp ON observ.SYMPTOM_ID = symp.SYMPTOM_ID
  GROUP BY data.VAERS_ID ORDER BY data.VAERS_ID
""")

In [23]:
import time
start_time = time.time()

symptoms_rdd = list_symptoms.rdd
patterns_rdd = symptoms_rdd.flatMap(map_to_patterns)
patterns_rdd.map(format_tuples).toDF(['patterns', 'occurrences']).sort(['patterns'])

combined_patterns_rdd = patterns_rdd.reduceByKey(reduce_patterns)
combined_patterns_rdd.map(format_tuples).toDF(['patterns', 'combined_occurrences']).sort(['patterns'])

subpatterns_rdd = combined_patterns_rdd.flatMap(map_to_subpatterns)
subpatterns_rdd.map(format_tuples).toDF(['subpatterns', 'rules']).sort(['subpatterns'])

combined_rules = subpatterns_rdd.map(lambda x:(x[0] , [x[1]])).reduceByKey(lambda x,y : x+y) 
combined_rules.map(format_tuples).toDF(['subpatterns', 'combined_rules']).sort(['subpatterns'])

assoc_rules = combined_rules.flatMap(map_to_assoc_rules)
assoc_rules.map(format_tuples).toDF(['patterns', 'association_rules']).sort(['patterns']).show(5, truncate=False)


end_time = time.time()
# Afficher le temps d'exécution
execution_time = end_time - start_time




+----------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|patterns                                                                                

                                                                                

In [None]:
hours = int(execution_time // 3600)  
minutes = int((execution_time % 3600) // 60)  
seconds = execution_time % 60  

print(f"Temps d'exécution pour le grand jeu de donnée : {hours} heures, {minutes} minutes, {seconds:.2f} secondes")

Temps d'exécution pour le grand jeu de donnée : 0 heures, 41 minutes, 7.31 secondes


In [None]:
random_symptom_id = spark.sql("""
    SELECT symptoms.SYMPTOM_ID , symptoms.SYMPTOM_NAME
    FROM OBSERVATIONS_ALL
    INNER JOIN symptoms ON OBSERVATIONS_ALL.symptom_id = symptoms.symptom_id
    ORDER BY RAND()
    LIMIT 1
""")

random_symptom_id_value = random_symptom_id.collect()[0]["SYMPTOM_ID"]
random_symptom_name = " " +random_symptom_id.collect()[0]["SYMPTOM_NAME"]

                                                                                

In [None]:
random_symptom_name

' Product administered to patient of inappropriate age'

In [None]:
single_rule_list = assoc_rules.filter(lambda rule: len(rule[0]) == 1  and rule[0][0] == random_symptom_name)
df = single_rule_list.toDF(["symptom", "rules"])

row = single_rule_list.toDF(["symptoms", "association_rule"]).first()
rules = row["association_rule"]


                                                                                

In [None]:
rules_values = [(rule["_1"], rule["_2"]) for rule in row["association_rule"]]

In [35]:
print(f'association rules for the symptom  : {random_symptom_name} of ID {random_symptom_id_value}')
rules_df = spark.createDataFrame(rules_values, ["rules", "value"])
sorted_rules_df = rules_df.orderBy("value", ascending=False)
row_count = rules_df.count()

sorted_rules_df.show(row_count,  truncate  = False )


association rules for the symptom  :  Product administered to patient of inappropriate age of ID 11414
+------------------------------------------------------------------+---------------------+
|rules                                                             |value                |
+------------------------------------------------------------------+---------------------+
|No adverse event                                                  |0.58504944207566     |
| Wrong product administered                                       |0.09090084369046539  |
|Incorrect dose administered                                       |0.08990293023677765  |
| No adverse event                                                 |0.07756509117300191  |
|Extra dose administered                                           |0.030209561825274425 |
|Inappropriate schedule of product administration                  |0.028576612537421754 |
| Pyrexia                                                          |0.02721582