In [0]:
#load data from fact table
df = spark.table("goldwind.fact")


+-------------+----------+--------+----------+---------+----------+--------------+---------------+
|production_id|   date_id| time_id|turbine_id|status_id|wind_speed|wind_direction|energy_produced|
+-------------+----------+--------+----------+---------+----------+--------------+---------------+
|            1|2024-06-01|00-00-00|         1|        4|  18.44936|            SE|     1786.91843|
|            2|2024-06-01|00-00-00|         2|        4|  16.34197|             E|     1156.38082|
|            3|2024-06-01|00-00-00|         3|        4|  24.16424|             E|     1216.49768|
|            4|2024-06-01|00-10-00|         1|        4|  13.03947|            NE|     1888.39496|
|            5|2024-06-01|00-10-00|         2|        4|   4.00397|            SE|     1052.36231|
|            6|2024-06-01|00-10-00|         3|        6|   5.22807|             N|            0.0|
|            7|2024-06-01|00-20-00|         1|        6|  20.38065|            SE|            0.0|
|         

In [0]:

# 📘 Notebook ML - Prédiction de l'énergie produite par les turbines (version enrichie avec toutes les dimensions)

# 1. Chargement des données GOLD et dimensions
fact = spark.table("goldwind.fact")
dim_turbine = spark.table("goldwind.dim_turbine")
dim_date = spark.table("goldwind.dim_date")
dim_time = spark.table("goldwind.dim_time")
dim_status = spark.table("goldwind.dim_status")

# 2. Jointures pour enrichir les données
fact_joined = fact \
    .join(dim_turbine, "turbine_id", "left") \
    .join(dim_date, fact.date_id == dim_date.date_id, "left") \
    .join(dim_time, fact.time_id == dim_time.time_id, "left") \
    .join(dim_status, "status_id", "left")

# 3. Ajout de la colonne saison (facultatif mais utile) je devais normalement le mettre dans mon fichier silver lol
from pyspark.sql.functions import when, col

fact_joined = fact_joined.withColumn(
    "season",
    when(col("month").isin(12, 1, 2), "winter")
    .when(col("month").isin(3, 4, 5), "spring")
    .when(col("month").isin(6, 7, 8), "summer")
    .when(col("month").isin(9, 10, 11), "autumn")
)

# 4. Nettoyage et filtrage des données
fact_filtered = fact_joined.filter(col("energy_produced") > 0).dropna(subset=[
    "wind_speed", "wind_direction", "turbine_id", "status_id", "energy_produced",
    "hour", "month", "capacity", "responsible_department"
])
display(fact_filtered)



'\n# 1. Chargement des données GOLD et dimensions\nfact = spark.table("goldwind.fact")\ndim_turbine = spark.table("goldwind.dim_turbine")\ndim_date = spark.table("goldwind.dim_date")\ndim_time = spark.table("goldwind.dim_time")\ndim_status = spark.table("goldwind.dim_status")\n\n# 2. Jointures pour enrichir les données\nfact_joined = fact     .join(dim_turbine, "turbine_id", "left")     .join(dim_date, fact.date_id == dim_date.date_id, "left")     .join(dim_time, fact.time_id == dim_time.time_id, "left")     .join(dim_status, "status_id", "left")\n\n# 3. Ajout de la colonne saison (facultatif mais utile) je devais normalement le mettre dans mon fichier silver lol\nfrom pyspark.sql.functions import when, col\n\nfact_joined = fact_joined.withColumn(\n    "season",\n    when(col("month").isin(12, 1, 2), "winter")\n    .when(col("month").isin(3, 4, 5), "spring")\n    .when(col("month").isin(6, 7, 8), "summer")\n    .when(col("month").isin(9, 10, 11), "autumn")\n)\n\n# 4. Nettoyage et filtr

In [0]:


# 4. Nettoyage et filtrage des données
fact_filtered = fact_joined.filter(col("energy_produced") > 0).dropna(subset=[
    "wind_speed", "wind_direction", "turbine_id", "status_id", "energy_produced",
    "hour", "month", "capacity", "responsible_department"
])

# 5. Encodages nécessaires
from pyspark.ml.feature import StringIndexer, VectorAssembler

indexer_dir = StringIndexer(inputCol="wind_direction", outputCol="wind_dir_indexed")
indexer_dept = StringIndexer(inputCol="responsible_department", outputCol="responsible_dept_indexed")
indexer_season = StringIndexer(inputCol="season", outputCol="season_indexed")

indexed = indexer_dir.fit(fact_filtered).transform(fact_filtered)
indexed = indexer_dept.fit(indexed).transform(indexed)
indexed = indexer_season.fit(indexed).transform(indexed)

# 🔧 Conversion de types pour VectorAssembler
indexed = indexed.withColumn("hour", col("hour").cast("int"))
indexed = indexed.withColumn("month", col("month").cast("int"))
indexed = indexed.withColumn("capacity", col("capacity").cast("int"))

# 6. Assemblage des features enrichies
assembler = VectorAssembler(
    inputCols=[
        "wind_speed",
        "wind_dir_indexed",
        "turbine_id",
        "status_id",
        "hour",
        "month",
        "capacity",
        "responsible_dept_indexed",
        "season_indexed"
    ],
    outputCol="features"
)
final_data = assembler.transform(indexed).select("features", "energy_produced")
display(final_data)


'\n# 4. Nettoyage et filtrage des données\nfact_filtered = fact_joined.filter(col("energy_produced") > 0).dropna(subset=[\n    "wind_speed", "wind_direction", "turbine_id", "status_id", "energy_produced",\n    "hour", "month", "capacity", "responsible_department"\n])\n\n# 5. Encodages nécessaires\nfrom pyspark.ml.feature import StringIndexer, VectorAssembler\n\nindexer_dir = StringIndexer(inputCol="wind_direction", outputCol="wind_dir_indexed")\nindexer_dept = StringIndexer(inputCol="responsible_department", outputCol="responsible_dept_indexed")\nindexer_season = StringIndexer(inputCol="season", outputCol="season_indexed")\n\nindexed = indexer_dir.fit(fact_filtered).transform(fact_filtered)\nindexed = indexer_dept.fit(indexed).transform(indexed)\nindexed = indexer_season.fit(indexed).transform(indexed)\n\n# 🔧 Conversion de types pour VectorAssembler\nindexed = indexed.withColumn("hour", col("hour").cast("int"))\nindexed = indexed.withColumn("month", col("month").cast("int"))\nindexed =

In [0]:
# 7. Split en train/test
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)


In [0]:

# 8. Entraînement et comparaison de plusieurs modèles
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import mlflow
import mlflow.spark

models = {
    "LinearRegression": LinearRegression(labelCol="energy_produced", featuresCol="features"),
    "DecisionTree": DecisionTreeRegressor(labelCol="energy_produced", featuresCol="features"),
    "RandomForest": RandomForestRegressor(labelCol="energy_produced", featuresCol="features")
}

results = []

for name, algo in models.items():
    with mlflow.start_run(run_name=name):
        pipeline = Pipeline(stages=[algo])
        model = pipeline.fit(train_data)
        predictions = model.transform(test_data)

        evaluator = RegressionEvaluator(labelCol="energy_produced", predictionCol="prediction", metricName="rmse")
        rmse = evaluator.evaluate(predictions)

        mlflow.spark.log_model(model, f"model_{name}")
        mlflow.log_metric("rmse", rmse)

        results.append((name, rmse))
        print(f"📊 {name} RMSE: {rmse:.4f}")

# 9. Sélection du meilleur modèle
best_model_name, best_rmse = sorted(results, key=lambda x: x[1])[0]
print(f"\n✅ Meilleur modèle : {best_model_name} avec RMSE = {best_rmse:.4f}")

2025/04/22 12:26:53 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


📊 LinearRegression RMSE: 430.6956


2025/04/22 12:27:36 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


📊 DecisionTree RMSE: 437.4546


2025/04/22 12:28:24 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


📊 RandomForest RMSE: 431.5084

✅ Meilleur modèle : LinearRegression avec RMSE = 430.6956


In [0]:
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
models = {
    "LinearRegression": LinearRegression(labelCol="energy_produced", featuresCol="features"),
    "DecisionTree": DecisionTreeRegressor(labelCol="energy_produced", featuresCol="features"),
    "RandomForest": RandomForestRegressor(labelCol="energy_produced", featuresCol="features")
}

results = []

for name, algo in models.items():
    pipeline = Pipeline(stages=[algo])
    model = pipeline.fit(train_data)
    predictions = model.transform(test_data)
    
    evaluator = RegressionEvaluator(labelCol="energy_produced", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    results.append((name, rmse))
    print(f"📊 {name} RMSE: {rmse:.4f}")
best_model_name, best_rmse = sorted(results, key=lambda x: x[1])[0]
print(f"\n✅ Meilleur modèle : {best_model_name} avec RMSE = {best_rmse:.4f}")


📊 LinearRegression RMSE: 430.8846
📊 DecisionTree RMSE: 426.2068
📊 RandomForest RMSE: 428.5398

✅ Meilleur modèle : DecisionTree avec RMSE = 426.2068


In [0]:

# 10. Réentraînement du meilleur modèle sur toutes les données
best_algo = models[best_model_name]
best_pipeline = Pipeline(stages=[best_algo])
best_model = best_pipeline.fit(final_data)

# 11. Prédictions complètes (batch)
full_predictions = best_model.transform(final_data)

# 12. Sauvegarde des prédictions en Delta table
full_predictions.select("features", "prediction").write.format("delta").mode("overwrite").saveAsTable("goldwind.predictions")

In [0]:
#vue lisble 
# 13. Vue lisible des prédictions
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

def extract_features(vector):
    return vector.toArray().tolist()

extract_udf = udf(extract_features, ArrayType(DoubleType()))

df = full_predictions.withColumn("features_array", extract_udf("features"))
df_lisible = df.selectExpr(
    "features_array[0] as wind_speed",
    "features_array[1] as wind_dir_indexed",
    "features_array[2] as turbine_id",
    "features_array[3] as status_id",
    "features_array[4] as hour",
    "features_array[5] as month",
    "features_array[6] as capacity",
    "features_array[7] as responsible_dept_indexed",
    "features_array[8] as season_indexed",
    "prediction"
)

# Affichage des prédictions lisibles
df_lisible.show()


+----------+----------------+----------+---------+----+-----+--------+------------------------+--------------+------------------+
|wind_speed|wind_dir_indexed|turbine_id|status_id|hour|month|capacity|responsible_dept_indexed|season_indexed|        prediction|
+----------+----------------+----------+---------+----+-----+--------+------------------------+--------------+------------------+
|  18.44936|             3.0|       1.0|      4.0| 0.0|  6.0|  2200.0|                     0.0|           0.0|1277.2685464523315|
|  13.03947|             1.0|       1.0|      4.0| 0.0|  6.0|  2200.0|                     0.0|           0.0|1287.5236234046488|
|   6.28564|             6.0|       1.0|      4.0| 0.0|  6.0|  2200.0|                     0.0|           0.0|  1271.95273040984|
|  12.05173|             1.0|       1.0|      4.0| 0.0|  6.0|  2200.0|                     0.0|           0.0|1288.0139630972706|
|   2.88378|             3.0|       1.0|      4.0| 0.0|  6.0|  2200.0|                    