# Partie 1 : Ingestion et nettoyage avec Spark (4-5h)

## Etape 1.1 : Exploration initiale

### Charger les donnees de consommation avec PySpark

In [18]:
#Etape 1 – Exploration et chargement Spark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
builder: SparkSession.Builder = SparkSession.builder
from pyspark.sql.functions import udf, col, lit, min, max, avg, desc, round, count, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType

# Creation de la session spark
spark = builder.master('local').appName("demo_rdd").getOrCreate()

df_raw_consommations = spark.read\
            .option("header", "true")\
            .option("inferSchema", "true")\
            .option("sep", ",")\
            .csv("../data/consommations_raw.csv")


df_raw_consommations.show(5)


+-----------+-------------------+------------+------------+-----+
|batiment_id|          timestamp|type_energie|consommation|unite|
+-----------+-------------------+------------+------------+-----+
|    BAT0141|2023-12-21 13:00:00|         gaz|      342.34|  kWh|
|    BAT0080|   08/08/2023 13:00|         gaz|     1256.73|  kWh|
|    BAT0122|06/13/2024 11:00:00|         eau|      133.57|   m3|
|    BAT0033|2023-06-25 00:00:00|         eau|        0.23|   m3|
|    BAT0064|11/29/2024 04:00:00|         gaz|       12.26|  kWh|
+-----------+-------------------+------------+------------+-----+
only showing top 5 rows



### Analyser le schema infere et identifier les problemes de typage

In [22]:
df_raw_consommations.printSchema()
df_raw_consommations.dtypes

# problème de typage :
# - Timestamp est de type string avec pluiseurs formats de dates
# - consommation est de type string avec valuers abérrantes, valleur nulles, manquantes ou mal formatés avec une virgule
df_non_numeric = df_raw_consommations.filter(
    ~F.col("consommation").rlike("^-?[0-9]+[.,]?[0-9]*$")
)

# consomamtion
print(f"Nombre de valeurs non numeriques: {df_non_numeric.count():,}")

# date
print("Exemples de formats de timestamp:")
df_raw_consommations.select("timestamp").distinct().show(20, truncate=False)

root
 |-- batiment_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- type_energie: string (nullable = true)
 |-- consommation: string (nullable = true)
 |-- unite: string (nullable = true)

Nombre de valeurs non numeriques: 38,975
Exemples de formats de timestamp:
+-------------------+
|timestamp          |
+-------------------+
|12/08/2024 04:00:00|
|05/03/2023 05:00   |
|08/15/2024 17:00:00|
|2023-11-25 02:00:00|
|2023-03-11T15:00:00|
|08/08/2023 03:00   |
|01/09/2023 04:00   |
|2023-01-18T22:00:00|
|12/03/2023 16:00:00|
|2024-04-25 23:00:00|
|01/22/2024 05:00:00|
|06/01/2024 20:00:00|
|2024-10-11 03:00:00|
|2023-07-25 15:00:00|
|07/19/2023 23:00:00|
|2023-06-12T07:00:00|
|2024-01-23T02:00:00|
|2024-04-08 19:00:00|
|2023-07-24 19:00:00|
|20/10/2024 17:00   |
|2023-02-13T11:00:00|
|2023-12-07T19:00:00|
|2023-11-04 06:00:00|
|14/06/2023 11:00   |
|2023-02-21 07:00:00|
|2024-06-23 03:00:00|
|2024-09-25T10:00:00|
|02/28/2024 12:00:00|
|07/10/2024 14:00:00|
|01/04

### Calculer les statistiques descriptives par type d'energie

In [28]:
df_air_numeric = df_raw_consommations.withColumn(
    "value_clean",
    F.regexp_replace(F.col("consommation"), ",", ".").cast("double")
)

df_air_numeric.show(5)

# Statistiques par type d'énergie (en ignorant les valeurs nulles)
stats_by_type_energy = df_air_numeric.filter(F.col("value_clean").isNotNull()) \
    .groupBy("type_energie") \
    .agg(
        F.count("*").alias("count"),
        F.round(F.mean("value_clean"), 2).alias("mean"),
        F.round(F.stddev("value_clean"), 2).alias("stddev"),
        F.round(F.min("value_clean"), 2).alias("min"),
        F.round(F.max("value_clean"), 2).alias("max"),
        F.round(F.expr("percentile(value_clean, 0.5)"), 2).alias("median")
    ) \
    .orderBy("type_energie")

print("Statistiques par type d'energie:")
stats_by_type_energy.show()

+-----------+-------------------+------------+------------+-----+-----------+
|batiment_id|          timestamp|type_energie|consommation|unite|value_clean|
+-----------+-------------------+------------+------------+-----+-----------+
|    BAT0141|2023-12-21 13:00:00|         gaz|      342.34|  kWh|     342.34|
|    BAT0080|   08/08/2023 13:00|         gaz|     1256.73|  kWh|    1256.73|
|    BAT0122|06/13/2024 11:00:00|         eau|      133.57|   m3|     133.57|
|    BAT0033|2023-06-25 00:00:00|         eau|        0.23|   m3|       0.23|
|    BAT0064|11/29/2024 04:00:00|         gaz|       12.26|  kWh|      12.26|
+-----------+-------------------+------------+------------+-----+-----------+
only showing top 5 rows

Statistiques par type d'energie:
+------------+-------+------+-------+--------+--------+------+
|type_energie|  count|  mean| stddev|     min|     max|median|
+------------+-------+------+-------+--------+--------+------+
|         eau|2573156|204.36|2398.57| -657.01|49999

### Identification des batiments avec le plus de mesures

In [34]:
builder_with_most_measurements = (
    df_raw_consommations
        .groupBy("batiment_id")
        .count()
        .orderBy("count", ascending=False)
)
builder_with_most_measurements.show()

+-----------+-----+
|batiment_id|count|
+-----------+-----+
|    BAT0086|53275|
|    BAT0002|53257|
|    BAT0145|53255|
|    BAT0117|53254|
|    BAT0047|53254|
|    BAT0051|53246|
|    BAT0093|53242|
|    BAT0078|53235|
|    BAT0146|53235|
|    BAT0046|53233|
|    BAT0097|53233|
|    BAT0052|53231|
|    BAT0094|53225|
|    BAT0095|53224|
|    BAT0120|53223|
|    BAT0059|53217|
|    BAT0061|53209|
|    BAT0027|53208|
|    BAT0080|53208|
|    BAT0057|53205|
+-----------+-----+
only showing top 20 rows



### Produire un rapport d'audit de qualite des donnees

In [35]:
# Resume des problemes
total = df_raw_consommations.count()

# Valeurs non numeriques
non_numeric = df_raw_consommations.filter(
    ~F.col("consommation").rlike("^-?[0-9]+[.,]?[0-9]*$")
).count()

# Valeurs avec virgule
with_comma = df_raw_consommations.filter(F.col("consommation").contains(",")).count()

# Valeurs negatives (apres conversion)
negative = df_air_numeric.filter(F.col("value_clean") < 0).count()

# Valeurs aberrantes > 1000
outliers = df_air_numeric.filter(F.col("value_clean") > 1000).count()

# Doublons
duplicates = total - df_raw_consommations.dropDuplicates(["batiment_id", "timestamp", "type_energie"]).count()


print(f"Total enregistrements: {total:,}")
print()
print(f"Problemes identifies:")
print(f"  - Valeurs non numeriques: {non_numeric:,} ({non_numeric/total*100:.2f}%)")
print(f"  - Valeurs avec virgule decimale: {with_comma:,} ({with_comma/total*100:.2f}%)")
print(f"  - Valeurs negatives: {negative:,} ({negative/total*100:.2f}%)")
print(f"  - Valeurs aberrantes (>1000): {outliers:,} ({outliers/total*100:.2f}%)")
print(f"  - Doublons: {duplicates:,} ({duplicates/total*100:.2f}%)")
print(f"  - Formats de dates multiples: 4 formats differents detectes")

Total enregistrements: 7,758,868

Problemes identifies:
  - Valeurs non numeriques: 38,975 (0.50%)
  - Valeurs avec virgule decimale: 925,392 (11.93%)
  - Valeurs negatives: 38,910 (0.50%)
  - Valeurs aberrantes (>1000): 507,037 (6.53%)
  - Doublons: 152,134 (1.96%)
  - Formats de dates multiples: 4 formats differents detectes
