<a href="https://colab.research.google.com/github/wafaabek/Big_data/blob/main/Spark_MLlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### 1-Stocker le contenu de tous les fichiers dans un DataFrame

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession

# Créez une session Spark
spark = SparkSession.builder.appName("TP4_Spark_MLlib").getOrCreate()



# Étape 2 : Afficher le schéma du DataFrame

In [None]:
data_path="/content/data"
df = spark.read.option("header", "true").csv(data_path)
# Affichez le schéma du DataFrame
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



# Étape 3 : Remplir les valeurs manquantes avec 0

In [None]:
# Remplir les valeurs manquantes (NaN) par 0
df = df.na.fill(0)

# Afficher le schéma mis à jour
df.printSchema()


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



# Étape 4 : Ajouter une colonne "day_of_week"

In [None]:
from pyspark.sql.functions import date_format, unix_timestamp

df = df.withColumn("day_of_week", date_format(unix_timestamp(df["InvoiceDate"], "yyyy-MM-dd HH:mm:ss").cast("timestamp"), "EEEE"))
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|     Monday|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|     Monday|
|   537226|    22052|VINTAGE CARAVAN G...|      25|2010-12-06 08:34:00|     0.42|   15987.0|United Kingdom|     Monday|
|   537226|    22705|   WRAP GREEN PEARS

# Étape 5 : Diviser les données en ensemble d'apprentissage et de test

In [None]:
from pyspark.sql.functions import col
train_df = df.filter(col("InvoiceDate") < '2010-12-13')
test_df = df.filter(col("InvoiceDate") >= '2010-12-13')
# Afficher le nombre de lignes dans chaque ensemble
print("Nombre de lignes dans l'ensemble d'apprentissage :", train_df.count())
print("Nombre de lignes dans l'ensemble de test :", test_df.count())

Nombre de lignes dans l'ensemble d'apprentissage : 26732
Nombre de lignes dans l'ensemble de test : 18676


# Etape 6:Créer un StringIndexer pour la colonne "day_of_week"

In [None]:
from pyspark.ml.feature import StringIndexer
day_of_week_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_encoded")
indexed_df = day_of_week_indexer.fit(df).transform(df)

# Display the resulting DataFrame with the new 'day_of_week_encoded' column
indexed_df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|day_of_week_encoded|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-------------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|                2.0|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|                2.0|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|     Monday|                2.0|
|   537226|    20802|SMALL GLASS SUNDA...|       6|2010-12-06 08:34:00|     1.65|   15987.0|United Kingdom|     Monday|                2.0|
|   537226|    22052

# Étape 7 : Utiliser OneHotEncoder pour éviter l'ordre implicite

In [None]:
from pyspark.ml.feature import  OneHotEncoder
encoder = OneHotEncoder(inputCol="day_of_week_encoded", outputCol="day_of_week_onehot")
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

# Afficher le DataFrame avec la nouvelle colonne 'day_of_week_onehot'
encoded_df.select("day_of_week", "day_of_week_encoded", "day_of_week_onehot").show(truncate=False)

+-----------+-------------------+------------------+
|day_of_week|day_of_week_encoded|day_of_week_onehot|
+-----------+-------------------+------------------+
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])     |
|Monday     |2.0                |(5,[2],[1.0])

# Étape 8 : Créer un VectorAssembler avec UnitPrice, Quantity et day_of_week_encoded

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

# Convertir les colonnes en type Double
encoded_df = encoded_df.withColumn("UnitPrice", col("UnitPrice").cast(DoubleType()))
encoded_df = encoded_df.withColumn("Quantity", col("Quantity").cast(DoubleType()))


# Assembler les colonnes
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["UnitPrice", "Quantity", "day_of_week_onehot"], outputCol="features")

# Appliquer l'assembler
assembled_df = assembler.transform(encoded_df)

# Afficher les colonnes pour vérifier
assembled_df.select("UnitPrice", "Quantity", "day_of_week_onehot", "features").show(5)


+---------+--------+------------------+--------------------+
|UnitPrice|Quantity|day_of_week_onehot|            features|
+---------+--------+------------------+--------------------+
|     2.95|     6.0|     (5,[2],[1.0])|(7,[0,1,4],[2.95,...|
|      2.1|     8.0|     (5,[2],[1.0])|(7,[0,1,4],[2.1,8...|
|     5.95|     2.0|     (5,[2],[1.0])|(7,[0,1,4],[5.95,...|
|     1.65|     6.0|     (5,[2],[1.0])|(7,[0,1,4],[1.65,...|
|     0.42|    25.0|     (5,[2],[1.0])|(7,[0,1,4],[0.42,...|
+---------+--------+------------------+--------------------+
only showing top 5 rows



# Étape 9 : Créer un pipeline

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[day_of_week_indexer, encoder, assembler])


# Etape 11:Transformer les données de l’ensemble d’apprentissage en se basant sur les étapes
## (stages) du pipeline.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# Assuming 'day_of_week' is a column in your DataFrame
string_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_indexed")

# Create a pipeline with the stages
pipeline = Pipeline(stages=[string_indexer])

# Fit the pipeline to the training data
fittedPipeline = pipeline.fit(train_df)

In [None]:
pipeline_model = pipeline.fit(train_df)
train_transformed = pipeline_model.transform(train_df)


# Étape 12 : Créer une instance de KMeans

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=20, featuresCol="features", predictionCol="prediction")


# Étape 13 : Lancer l’apprentissage de KMeans

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

# Assuming 'transformedTraining' contains your data
train_transformed = train_transformed.withColumn("Quantity", col("Quantity").cast(DoubleType()))
train_transformed= train_transformed.withColumn("UnitPrice", col("UnitPrice").cast(DoubleType()))

In [None]:
from pyspark.ml.feature import VectorAssembler

# Assuming 'transformedTraining' contains your data
feature_columns = ['Quantity', 'UnitPrice']  # Add other columns as needed

# Create a vector assembler
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the data using the vector assembler
cluster_input_data = vector_assembler.transform(train_transformed)

# Fit the KMeans model to the assembled features
model = kmeans.fit(cluster_input_data)

# Get the clustering result
clusteredData = model.transform(cluster_input_data)

# Show the result
clusteredData.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-------------------+------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|day_of_week_indexed|    features|prediction|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-------------------+------------+----------+
|   537226|    22811|SET OF 6 T-LIGHTS...|     6.0|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|                4.0|  [6.0,2.95]|         0|
|   537226|    21713|CITRONELLA CANDLE...|     8.0|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|                4.0|   [8.0,2.1]|         0|
|   537226|    22927|GREEN GIANT GARDE...|     2.0|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdom|     Monday|                4.0|  [2.0,5.95]|         0|
|   537226|    2

# 14. Effectuer des prédictions sur l’ensemble de test

In [None]:
 #Assuming 'test_data' is your test set

# Transform the test data using the same VectorAssembler
test_df = test_df.withColumn("Quantity", col("Quantity").cast(DoubleType()))
test_df = test_df.withColumn("UnitPrice", col("UnitPrice").cast(DoubleType()))
test_cluster_input_data = vector_assembler.transform(test_df)

# Use the trained KMeans model to make predictions on the test data
test_clusteredData = model.transform(test_cluster_input_data)

# Show the result
test_clusteredData.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|   features|prediction|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------+----------+
|   539325|    22720|SET OF 3 CAKE TIN...|     3.0|2010-12-17 08:20:00|     4.95|   13004.0|United Kingdom|     Friday| [3.0,4.95]|         0|
|   539325|    22722|SET OF 6 SPICE TI...|     4.0|2010-12-17 08:20:00|     3.95|   13004.0|United Kingdom|     Friday| [4.0,3.95]|         0|
|   539325|    22915|ASSORTED BOTTLE T...|    12.0|2010-12-17 08:20:00|     0.42|   13004.0|United Kingdom|     Friday|[12.0,0.42]|         5|
|   539325|    22922|FRIDGE MAGNETS US...|    12.0|2010-12-17 08:20:00|     0.85|   13004.0|United Kingdom|     Friday|[12.0,0.85]|         5|

# 15. Calculer le coefficient de silhouette

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

# Assuming 'test_clusteredData' contains the test set with cluster assignments
# 'prediction' column should contain the cluster assignments

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette_score = evaluator.evaluate(test_clusteredData)
print(f"Silhouette Score: {silhouette_score}")

Silhouette Score: 0.7478770959778697
