# Quelques Exercises faits avec Pyspark

### a) Implémentez une fonction pour générer une liste aléatoire de triplet. Le premier élément doit être un int allant de 0 à n sans doublons. Le second élément est un entier variant de 0 à 50 et le dernier un entier entre 0 et 5.

In [18]:
from pyspark.sql import SparkSession, DataFrame 
from pyspark.sql.functions import min, max, col, coalesce, lit


import random

def generer_liste_triplets(n):
    

    ids = list(range(n+1))          
    random.shuffle(ids)             

    triplets = []

    for i in ids:
        value1 = random.randint(0, 50)
        value2 = random.randint(0, 5)
        triplets.append((i, value1, value2))

    return triplets

liste_triplets = generer_liste_triplets(10)

print(liste_triplets)


[(8, 46, 2), (3, 26, 1), (5, 14, 4), (0, 4, 1), (7, 7, 3), (9, 2, 1), (10, 4, 5), (6, 48, 2), (1, 23, 1), (4, 27, 1), (2, 21, 4)]


### b) Implémentez la fonction init_df(nb_rows:int) -> DataFrame permettant de générer un DataFrame aléatoire de taille nb_rows et ayant trois colonnes : id, value1, value2.

In [19]:
spark = SparkSession.builder \
    .appName("PySparkHomework") \
    .master("local[*]") \
    .getOrCreate()

def init_df(nb_rows:int) -> DataFrame:

    data = generer_liste_triplets(nb_rows-1)
    df = spark.createDataFrame(data,["id", "value1", "value2"])     #Génération de Dataframe aléatoire 

    return df

df_aleatoire = init_df(10)  

df_aleatoire.show()


+---+------+------+
| id|value1|value2|
+---+------+------+
|  9|    48|     0|
|  1|    25|     0|
|  3|    30|     4|
|  8|     4|     5|
|  0|    27|     0|
|  7|     3|     5|
|  5|    38|     4|
|  2|    19|     3|
|  4|    33|     3|
|  6|    41|     4|
+---+------+------+



### c) Implémentez la fonction transformation(df:DataFrame) -> DataFrame qui
### • normalise value1 entre 0 et 1 ;
### • ajoute une colonne ratio = value1 / value2 ;
### • filtre les colonnes dont le ratio est inférieur au ratio median.
### • retourne le résultat

In [20]:
def transformation(df: DataFrame) -> DataFrame:
    # Normalisation de value1
    stats = df.agg(
        min("value1").alias("min_v"),
        max("value1").alias("max_v")
    ).collect()[0]

    min_v = stats["min_v"]
    max_v = stats["max_v"]

    df = df.withColumn(
        "value1",
        (col("value1") - min_v) / (max_v - min_v)
    )

    # Suppression des value2 = 0
    df = df.filter(col("value2") != 0)

    # Ajout du ratio
    df = df.withColumn(
        "ratio",
        col("value1") / col("value2")
    )

    # Médiane du ratio
    median_ratio = df.approxQuantile("ratio", [0.5], 0.0)[0]

    # Filtrage 
    df = df.filter(col("ratio") >= median_ratio)

    return df

df_transfomation = transformation(df_aleatoire)  

df_transfomation.show()

+---+------------------+------+-------------------+
| id|            value1|value2|              ratio|
+---+------------------+------+-------------------+
|  3|               0.6|     4|               0.15|
|  5|0.7777777777777778|     4|0.19444444444444445|
|  4|0.6666666666666666|     3| 0.2222222222222222|
|  6|0.8444444444444444|     4| 0.2111111111111111|
+---+------------------+------+-------------------+



### d) Implémentez la fonction merge(df1, df2) prenant 2 DataFrame un DataFrame représentant la fusion des deux :
### • Les lignes de même IDs voient leurs valeurs ajoutées (pensez à mettre le ratio à jour)
### • Les lignes sans équivalent d’ID sont ajoutées telles quelles.

In [21]:

def merge(df1: DataFrame, df2: DataFrame) -> DataFrame:

    # Jointure complète sur id
    joined = df1.alias("df1").join(
        df2.alias("df2"),
        on="id",
        how="outer"
    )

    # Addition des colonnes (null -> 0)
    result = joined.select(
        col("id"),

        (
            coalesce(col("df1.value1"), lit(0)) +
            coalesce(col("df2.value1"), lit(0))
        ).alias("value1"),

        (
            coalesce(col("df1.value2"), lit(0)) +
            coalesce(col("df2.value2"), lit(0))
        ).alias("value2")
    )

    # Recalcul du ratio
    result = result.withColumn(
        "ratio",
        col("value1") / col("value2")
    )

    return result

x = transformation(init_df(10))

y= merge(init_df(10), x) #fusion du Dataframe init_df(10) et du dataframe x
y.show()

+---+------+------+------------------+
| id|value1|value2|             ratio|
+---+------+------+------------------+
|  0|   0.0|     5|               0.0|
|  1|  32.0|     4|               8.0|
|  2|18.525|     2|            9.2625|
|  3|  49.0|     0|              NULL|
|  4|  38.0|     2|              19.0|
|  5|  43.0|     3|14.333333333333334|
|  6|50.325|     7| 7.189285714285715|
|  7|  47.0|    10|               4.7|
|  8| 11.75|     5|              2.35|
|  9|  28.0|     1|              28.0|
+---+------+------+------------------+

