In [None]:
!pip install pyspark numpy pandas py4j

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, dayofweek, dayofmonth, month, year
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.\
    builder.\
    appName("pyspark-notebook").\
    master("spark://spark-master:7077").\
    config("spark.executor.memory", "512m").\
    getOrCreate()

In [None]:
flightprices = spark.read.csv(
    "./extrait_flight.csv", header=True, inferSchema=True)

In [None]:
first15_columns = flightprices.columns[:15]
flightprices = flightprices.select(*first15_columns)
flightprices.show(n=10)

In [None]:
# Liste des colonnes de date, des colonnes numériques et des colonnes booléennes
bool_columns = ["isBasicEconomy", "isRefundable", "isNonStop"]
date_columns = ["searchDate", "flightDate"]
numeric_columns = ["elapsedDays", "baseFare",
                   "totalFare", "seatsRemaining", "totalTravelDistance"]
string_columns = ["legId", "startingAirport",
                  "destinationAirport", "fareBasisCode", "travelDuration"]

In [None]:
# Conversion des colonnes booléennes en entiers
for bool_col in bool_columns:
    flightprices = flightprices.withColumn(
        bool_col, F.col(bool_col).cast("integer"))

In [None]:
# Conversion des chaînes de caractères de date en type de date
for date_col in date_columns:
    flightprices = flightprices.withColumn(
        date_col, to_date(flightprices[date_col], 'yyyy-MM-dd'))

# Extraction des composantes de la date
for date_col in date_columns:
    flightprices = flightprices.withColumn(date_col + "_year", year(date_col))\
        .withColumn(date_col + "_month", month(date_col))\
        .withColumn(date_col + "_dayOfMonth", dayofmonth(date_col))\
        .withColumn(date_col + "_dayOfWeek", dayofweek(date_col))

In [None]:
# Création de la liste des colonnes pour l'assembleur de vecteurs
date_features = [col + "_" + feature for col in date_columns for feature in [
    "year", "month", "dayOfMonth", "dayOfWeek"]]
assembler_inputs = [encoder.getOutputCol()
                    for encoder in encoders] + date_features + numeric_columns

In [None]:
# Assemblage des vecteurs de fonctionnalités
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

# Modèle de régression linéaire
lr = LinearRegression(featuresCol="features", labelCol="totalFare")

# Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, lr])

In [None]:
# Division des données
(train_data, test_data) = flightprices.randomSplit([0.8, 0.2])

In [None]:
# Entraînement du modèle
model = pipeline.fit(train_data)

# Prédictions
predictions = model.transform(test_data)

In [None]:
# Évaluation
evaluator = RegressionEvaluator(
    labelCol="totalFare", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")