In [9]:
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_replace
from pyspark.sql.window import Window


spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Lecture des fichiers CSV
df_site = spark.read.option("header", True).option("sep", ";").csv("Files/Site_Information_2022_8_1 (1).csv")
df_method = spark.read.option("header", True).option("sep", ";").csv("Files/Methods_2022_8_1 (1).csv")
df_ltm = spark.read.option("header", True).option("sep", ";").csv("Files/LTM_Data_2022_8_1 (1).csv")


df_site = df_site.withColumn("LATDD", regexp_replace("LATDD", ",", ".").cast("float")) \
                 .withColumn("LONDD", regexp_replace("LONDD", ",", ".").cast("float")) \
                 .withColumn("SITE_ELEV", regexp_replace("SITE_ELEV", ",", ".").cast("float"))


df_site.write.mode("overwrite").saveAsTable("silver_dim_site")


StatementMeta(, 7fedccda-c039-4569-a0c0-535bfc019d03, 11, Finished, Available, Finished)

In [10]:
df_site_clean = df_site.dropDuplicates(["SITE_ID"])
df_site_clean.write.mode("overwrite").saveAsTable("silver_dim_site_clean")

df_method = df_method.withColumn("END_YEAR", F.col("END_YEAR").cast("int"))

df_method.write.mode("overwrite").saveAsTable("silver_dim_method_scd2")

window_method = Window.partitionBy("PROGRAM_ID", "PARAMETER").orderBy(F.desc("END_YEAR"))
df_method_scd1 = df_method.withColumn("row_num", F.row_number().over(window_method)) \
                          .filter("row_num = 1").drop("row_num")
df_method_scd1.write.mode("overwrite").saveAsTable("silver_dim_method_scd1")

df_ltm = df_ltm.withColumn("DATE_SMP", F.to_timestamp("DATE_SMP", "MM/dd/yyyy HH:mm:ss"))

df_ltm.write.format("delta").mode("overwrite").saveAsTable("silver_ltm_data_scd2")

window_ltm = Window.partitionBy("SITE_ID").orderBy(F.desc("DATE_SMP"))
df_ltm_scd1 = df_ltm.withColumn("row_num", F.row_number().over(window_ltm)) \
                    .filter("row_num = 1").drop("row_num")
df_ltm_scd1.write.format("delta").mode("overwrite").saveAsTable("silver_ltm_data_scd1")


StatementMeta(, 7fedccda-c039-4569-a0c0-535bfc019d03, 12, Finished, Available, Finished)

In [11]:
def nettoyer_table(df, nom_table):
    df = df.dropna(how="all")
    df = df.select([c for c in df.columns if df.select(c).dropna().count() > 0])

    valeurs_aberrantes = ["", "NA", "null", "-1"]

    # Remplacement explicite des valeurs aberrantes par null
    for col_name in df.columns:
        df = df.withColumn(
            col_name,
            F.when(F.col(col_name).isin(valeurs_aberrantes), None).otherwise(F.col(col_name))
        )

    exclusions = [
        "SITE_ID", "DATE_SMP", "PROGRAM_ID", "PARAMETER",
        "METHOD", "METHOD_DESCRIPTION", "LATDD", "LONDD", "SITE_ELEV",
        "SAMPLE_TYPE", "SAMPLE_LOCATION", "WATERBODY_TYPE"
    ]

    # Cast uniquement les colonnes pertinentes après nettoyage
    for col_name in df.columns:
        if col_name not in exclusions:
            df = df.withColumn(col_name, F.col(col_name).cast("float"))

    if "DATE_SMP" in df.columns:
        df = df.withColumn("DATE_SMP", F.to_timestamp("DATE_SMP"))

    df = df.dropDuplicates()
    df.write.mode("overwrite").saveAsTable(f"{nom_table}_clean")


StatementMeta(, 7fedccda-c039-4569-a0c0-535bfc019d03, 13, Finished, Available, Finished)

In [12]:
nettoyer_table(spark.table("silver_ltm_data_scd1"), "silver_ltm_data_scd1")
nettoyer_table(spark.table("silver_ltm_data_scd2"), "silver_ltm_data_scd2")
nettoyer_table(spark.table("silver_dim_method_scd1"), "silver_dim_method_scd1")
nettoyer_table(spark.table("silver_dim_method_scd2"), "silver_dim_method_scd2")
nettoyer_table(spark.table("silver_dim_site"), "silver_dim_site_clean")


StatementMeta(, 7fedccda-c039-4569-a0c0-535bfc019d03, 14, Finished, Available, Finished)

In [13]:
from pyspark.sql.functions import regexp_replace, col

df_fait = spark.table("silver_ltm_data_scd1_clean")
df_site = spark.table("silver_dim_site_clean_clean")
df_method = spark.table("silver_dim_method_scd1_clean").withColumn(
    "PARAMETER",
    regexp_replace(col("PARAMETER").cast("string"), "_UEQ_L|_MG_L|_UG_L|_UM_CM|_DEG_C|_APP|_TD|_STVL", "")
)

param_cols = [c for c in df_fait.columns if c not in [
    "SITE_ID", "PROGRAM_ID", "DATE_SMP", "SAMPLE_TYPE", "SAMPLE_LOCATION", "WATERBODY_TYPE"
]]

df_fait_long = df_fait.selectExpr(
    "SITE_ID", "PROGRAM_ID", "DATE_SMP", "SAMPLE_TYPE", "SAMPLE_LOCATION", "WATERBODY_TYPE",
    f"stack({len(param_cols)}, " + ", ".join([f"'{c}', `{c}`" for c in param_cols]) + ") as (PARAMETER, VALUE)"
)

df_fait_long = df_fait_long.withColumn(
    "PARAMETER",
    regexp_replace(col("PARAMETER").cast("string"), "_UEQ_L|_MG_L|_UG_L|_UM_CM|_DEG_C|_APP|_TD|_STVL", "")
)

df_fait_long = df_fait_long.withColumn("year", F.year("DATE_SMP")) \
                           .withColumn("month", F.month("DATE_SMP")) \
                           .withColumn("day", F.dayofmonth("DATE_SMP"))

df_fait_long = df_fait_long.withColumn("SITE_ID", F.trim(col("SITE_ID").cast("string")))
df_site = df_site.withColumn("SITE_ID", F.trim(col("SITE_ID").cast("string")))

df_joint = df_fait_long.join(df_method, on=["PROGRAM_ID", "PARAMETER"], how="left")
df_joint = df_joint.join(df_site.drop("PROGRAM_ID"), on="SITE_ID", how="left")

df_gold = df_joint.select(
    "SITE_ID", "LATDD", "LONDD", "SITE_ELEV",
    "DATE_SMP", "year", "month", "day",
    "PROGRAM_ID", "PARAMETER", "VALUE",
    "METHOD", "METHOD_DESCRIPTION",
    "SAMPLE_TYPE", "SAMPLE_LOCATION", "WATERBODY_TYPE"
)
df_gold = df_gold.dropDuplicates()
df_gold.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("gold_water_quality")


StatementMeta(, 7fedccda-c039-4569-a0c0-535bfc019d03, 15, Finished, Available, Finished)

In [1]:
df = spark.sql("SELECT * FROM LAKEHOUSE.gold_water_quality where value is NULL  ")
display(df)

StatementMeta(, a1acf737-3681-49e4-a5b5-e42f20bcc3a5, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 80cf586e-a705-4546-8607-7131d5beba44)