Ceci est un code po comment créer un modèle Spark ML pour prédire si une station Velib sera pleine ou non.

In [0]:
#Import des bibliothèques nécessaires : Nous aurons besoin de Spark SQL pour charger les données à partir d'un fichier CSV, de Spark ML pour construire notre modèle et de PySpark pour accéder à toutes les fonctionnalités Spark

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.functions import when



In [0]:
# File location and type
file_location = "/FileStore/tables/velib_data_ML.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

#Créez une session Spark : nous allons créer une session Spark pour charger les données à partir d'un fichier CSV.
spark = SparkSession.builder.appName('velib_prediction').getOrCreate()

# Chargement des données dans un DataFrame
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

station_id,name,latitude,longitude,capacity,stationCode,numBikesAvailable,num_bikes_available_types_mechanical,num_bikes_available_types_ebike,numDocksAvailable,last_reported
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
653222953,Mairie de Rosny-sous-Bois,48.871256519012,2.4865807592869,30,31104,24,12,12,6,1677427268
36255,Toudouze - Clauzel,48.87929591733507,2.3373600840568547,21,9020,1,0,1,20,1677427220
37815204,Mairie du 12ème,48.840855311763,2.3875549435616,30,12109,7,7,0,21,1677427156
17278902806,Rouget de L'isle - Watteau,48.778192750803,2.3963020229163,0,44015,0,0,0,0,1675935337
251039991,Cassini - Denfert-Rochereau,48.837525839067,2.3360354080796,25,14111,5,1,4,20,1677427036
85002689,Jourdan - Stade Charléty,48.819428333369,2.3433353751898,60,14014,15,9,6,44,1677427307
2515829865,Basilique,48.93626891059109,2.3588666820200914,22,32017,17,8,9,3,1677427276
516709288,Charonne - Robert et Sonia Delaunay,48.855907555969,2.3925706744194,20,11104,2,0,2,18,1677427053
120827885,Messine - Place Du Pérou,48.87544803396074,2.315508019010038,12,8026,3,1,2,9,1677427292


In [0]:
# Create a view or table
temp_table_name = "velib_data_ML_csv"
df.createOrReplaceTempView(temp_table_name)

In [0]:
## Prétraitement des données

# Ici nous avons un DataFrame nommé "df" et  nous souhaitons afficher toutes les données de la ligne de la station avec l'id 213688169
# Nous utiliserons la méthode filter() pour filtrer le DataFrame en fonction de la condition "id = 213688169"
# Filtrer le DataFrame pour obtenir les données de la ligne 213688169
df_filtré = df.filter(df["station_id"] == 213688169)

#supprimer les valeurs manquantes et les valeurs aberrantes
df_filtré = df_filtré.filter(col("numDocksAvailable").isNotNull())
df_filtré= df_filtré.filter(col("last_reported") > 0)
df_filtré= df_filtré.filter(col("capacity") > 0)

# Afficher les données de la ligne filtrée
display(df_filtré)

station_id,name,latitude,longitude,capacity,stationCode,numBikesAvailable,num_bikes_available_types_mechanical,num_bikes_available_types_ebike,numDocksAvailable,last_reported
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881
213688169,Benjamin Godard - Victor Hugo,48.865983,2.275725,35,16107,2,0,2,33,1677426881


In [0]:
# Check the data types of the columns
df_filtré.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- capacity: string (nullable = true)
 |-- stationCode: string (nullable = true)
 |-- numBikesAvailable: string (nullable = true)
 |-- num_bikes_available_types_mechanical: string (nullable = true)
 |-- num_bikes_available_types_ebike: string (nullable = true)
 |-- numDocksAvailable: string (nullable = true)
 |-- last_reported: string (nullable = true)



In [0]:
# Créer un nouveau DataFrame avec des colonnes pertinentes "last_reported", "numDocksAvailable"
df2 = df_filtré.select("last_reported", "numDocksAvailable")
display(df2)

last_reported,numDocksAvailable
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33
1677426881,33


In [0]:
# Change the data type of the 'last_reported' and 'numDockAvailable' columns to integer
df2 = df2.withColumn("last_reported", col("last_reported").cast("integer"))
df2 = df2.withColumn("numDocksAvailable", col("numDocksAvailable").cast("integer"))

In [0]:
# Check the data types of the columns
df2.printSchema()


root
 |-- last_reported: integer (nullable = true)
 |-- numDocksAvailable: integer (nullable = true)



In [0]:
#Créer une nouvelle colonne 'fullness' qui indique si la station est pleine ou non. Si le nombre de docks dans la station est égale à 0, nous considérons qu'elle est pleine.
df2 = df2.withColumn("fullness", when(df2.numDocksAvailable == 0, 1).otherwise(0))
#Cela créera une nouvelle colonne appelée "fullness" qui aura la valeur 1 si la colonne "numDocksAvailable" est égale à 0 et la valeur 0 sinon


Créez le modèle Spark ML : Nous allons utiliser LogisticRegression pour entraîner notre modèle.

In [0]:
# Créer un assembleur de vecteur pour regrouper les colonnes en une seule colonne
assembler = VectorAssembler(inputCols=["last_reported", "numDocksAvailable"], outputCol="features")


In [0]:
# Change the data type of the 'last_reported' and 'numDockAvailable' columns to integer
df2 = df2.withColumn("last_reported", col("last_reported").cast("integer"))
df2 = df2.withColumn("numDocksAvailable", col("numDocksAvailable").cast("integer"))

In [0]:
df2.printSchema()


root
 |-- last_reported: integer (nullable = true)
 |-- numDocksAvailable: integer (nullable = true)
 |-- fullness: integer (nullable = false)



In [0]:
# Appliquer l'assembleur de vecteur au DataFrame
df3 = assembler.transform(df2)

In [0]:
# Créer un modèle de régression logistique
lr = LogisticRegression(featuresCol='features', labelCol='fullness')

In [0]:
# Sélectionner les colonnes "features" et "fullness" pour former le nouveau DataFrame
df4 = df3.select("features", "fullness")

In [0]:
display(df3)

last_reported,numDocksAvailable,fullness,features
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"
1677426881,33,0,"Map(vectorType -> dense, length -> 2, values -> List(1.677426881E9, 33.0))"


In [0]:
# Diviser le DataFrame en ensembles d'entraînement et de test (70/30)
trainingData, testData = df4.randomSplit([0.7, 0.3], seed=1234)

In [0]:
# Entraîner le modèle sur les données d'entraînement
lrModel = lr.fit(trainingData)

In [0]:
# Faire des prédictions sur les données de test
predictions = lrModel.transform(testData)

# Afficher les prédictions et les valeurs réelles
predictions.select("prediction", "fullness").show()

# Calculer la précision du modèle
accuracy = predictions.filter(predictions.prediction == predictions.fullness).count() / float(predictions.count())

# Afficher la précision du modèle
print("Accuracy:", accuracy)

+----------+--------+
|prediction|fullness|
+----------+--------+
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
|       0.0|       0|
+----------+--------+

Accuracy: 1.0
