In [None]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


In [1]:
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from notebookutils import mssparkutils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

StatementMeta(, 2c9ac686-41f1-42e9-b58e-2473124c1b23, 3, Finished, Available, Finished)

In [1]:
def create_table(config, layer, lakehouse):

    for table in config["tables"]:
        if table["layer"] == layer:  

            schema_fields = [
                StructField(col_name, eval(col_def["type"]), bool(col_def["nullable"]))
                for col_name, col_def in table["schema"].items()
            ]
            schema = StructType(schema_fields)  

            empty_df = spark.createDataFrame([], schema)  

            destination_path = f"abfss://test@onelake.dfs.fabric.microsoft.com/{lakehouse}.Lakehouse/Tables/{table['name']}"
            empty_df.write.format("delta").mode("overwrite").save(destination_path)
            #ajout table rejet
            if layer == "bronze":
                destination_path_reject = f"abfss://test@onelake.dfs.fabric.microsoft.com/{lakehouse}.Lakehouse/Tables/{table['name']}_reject"
                empty_df.write.format("delta").mode("overwrite").save(destination_path_reject)


StatementMeta(, b4fb66bd-95f8-4474-b049-b920b8f03895, 3, Finished, Available, Finished)

In [1]:
def load_data(config, lakehouse, file_format, notebook_name, sources_to_process):
    for table in config["tables"]:
        if "bronze" not in table:
            continue  

        table_name = table["name"]
        source_path = table["bronze"]["source_path"]

        if source_path not in sources_to_process:
            continue

        print(f"\n🚀 Traitement du dossier source : {source_path} pour la table {table_name}")

        file_path_base = f"abfss://test@onelake.dfs.fabric.microsoft.com/{lakehouse}.Lakehouse/{source_path}"

        table_bronze = f"{lakehouse}.{table_name}"
        table_reject = f"{lakehouse}.{table_name}_reject"

        job_start = datetime.now()
        job_id = int(uuid.uuid4().int % (10**10))
        valid_files = 0
        invalid_files = 0
        total_files = 0
        job_status = "INIT"
        job_message = ""

        try:
            files_list = mssparkutils.fs.ls(file_path_base)
            total_files = len(files_list) if files_list else 0

            for file in files_list:
                file_name = file.name
                file_path = f"{file_path_base}/{file_name}"
                flow_id = int(uuid.uuid4().int % (10**10))

                print(f"📂 Lecture du fichier : {file_name}")

                if file_format == "csv":
                    df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", ",").load(file_path)
                elif file_format == "parquet":
                    df = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(file_path)
                else:
                    raise Exception(f"Format de fichier non supporté : {file_format}")

                print(f"✅ {df.count()} lignes chargées depuis {file_name}")

                primary_keys = [col["source_name"] for col in table["silver"]["columns"] if col["is_nullable"] == 0]

                df_valid = None
                df_reject = None
                df_valid_count = 0
                df_reject_count = 0

                if primary_keys:
                    df_valid = df.dropna(subset=primary_keys)
                    df_valid_count = df_valid.count()
                    df_reject = df.subtract(df_valid)
                    df_reject_count = df_reject.count()
                else:
                    df_valid = df
                    df_valid_count = df.count()

                if df_valid_count > 0:
                    df_valid.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(table_bronze)
                    valid_files += 1
                    print(f"💾 Données valides écrites dans {table_bronze}")

                if df_reject_count > 0:
                    df_reject.write.format("delta").option("mergeSchema", "true").mode("overwrite").saveAsTable(table_reject)
                    invalid_files += 1
                    print(f"⚠️ Données rejetées écrites dans {table_reject}")

                job_status = "SUCCESS"
                job_message = "Fichier traité avec succès"

                flow_data = {
                    "job_id": job_id,
                    "flow_id": flow_id,
                    "file_path": file_path,
                    "flow_start": job_start,
                    "flow_end": datetime.now(),
                    "flow_status": job_status,
                    "flow_message": job_message,
                    "accepted_rows": df_valid_count,
                    "warning_rows": 0,
                    "rejected_rows": df_reject_count,
                    "total_rows": df_valid_count + df_reject_count,
                    "year": int(job_start.year),
                    "month": int(job_start.month),
                    "day": int(job_start.day)
                }

                log_flow(flow_data)
                print(f"📝 Log flow ajouté pour {file_name} - Flow ID : {flow_id}")

        except Exception as e:
            job_status = "FAILED"
            job_message = str(e)
            print(f"❌ Erreur lors du traitement de {table_name} : {job_message}")

        # Log final du job d’ingestion
        log_data = {
            "job_id": job_id,
            "orchestration_id": 0,
            "job_name": "load_data",
            "job_type": "ingestion",
            "notebook_name": notebook_name,
            "source_name": source_path,
            "object_name": "",
            "job_start": job_start,
            "job_end": datetime.now(),
            "valid_files": valid_files,
            "invalid_files": invalid_files,
            "total_files": total_files,
            "source_layer": "raw",
            "target_layer": "bronze",
            "target_table_name": table_name,
            "job_status": job_status,
            "job_message": job_message,
            "year": int(job_start.year),
            "month": int(job_start.month),
            "day": int(job_start.day)
        }

        log_job_execution(log_data)
        print(f"📌 Log job ajouté pour {table_name} - Statut : {job_status}")


StatementMeta(, e6ec0f7d-e199-4f43-b843-ef4e632d18ab, 3, Finished, Available, Finished)

In [1]:
def insert_bronze_to_silver(config, mode, notebook_name):

    for table in config["tables"]:
        if "silver" not in table or "bronze" not in table:
            continue  

        bronze_table = f"lakehouse_bronze.{table['name']}"
        silver_table = f"lakehouse_silver.{table['name']}"
        job_start = datetime.now()
        job_id = int(uuid.uuid4().int % (10**10))  
        flow_id = int(uuid.uuid4().int % (10**10))

        df_valid = None
        df_reject = None
        df_valid_count = 0
        df_reject_count = 0
        table_name = table["name"]



        try:
            
            df_bronze = spark.read.format("delta").load(f"abfss://test@onelake.dfs.fabric.microsoft.com/lakehouse_bronze.Lakehouse/Tables/{table['name']}")

            
            transformation_dict = {col["source_name"]: col["target_name"] for col in table["silver"]["columns"]}
            df_silver = df_bronze.select([col(c).alias(transformation_dict[c]) for c in transformation_dict])

            print(f" Colonnes transformées : {df_silver.columns}")

        
            if mode == "SCD0":
                df_silver.write.format("delta").mode("append").option("mergeSchema", "true").saveAsTable(silver_table)
                job_status, job_message = "SUCCESS", "Ajout des nouvelles données"
                print(f" Mode SCD0 :{job_message}")

            elif mode == "SCD1":
                df_silver.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(silver_table)
                job_status, job_message = "SUCCESS", "Écrasement des anciennes données"
                print(f"✅ Mode SCD1 : {job_message}")

        except Exception as e:
            job_status, job_message = "FAILED", str(e)
            print(f"{job_message}")

            df_valid_count = df_valid.count() if df_valid is not None else 0
            df_reject_count = df_reject.count() if df_reject is not None else 0

      
        log_data = {
            "job_id": job_id,  
            "orchestration_id": 0,  
            "job_name": "insert_bronze_to_silver",
            "job_type": "ingestion",
            "notebook_name": notebook_name,
            "source_name": bronze_table,
            "object_name": "",
            "job_start": job_start,
            "job_end": datetime.now(),
            "valid_files": 0, 
            "invalid_files": 0,  
            "total_files": 0,  
            "source_layer": "bronze",
            "target_layer": "silver",
            "target_table_name": table["name"],
            "job_status": job_status,
            "job_message": job_message,
            "year": int(job_start.year),
            "month": int(job_start.month),
            "day": int(job_start.day)
        }

        log_job_execution(log_data)  
        
        flow_data = {
            "job_id": job_id,
            "flow_id": flow_id,
            "file_path":f"abfss://test@onelake.dfs.fabric.microsoft.com/lakehouse_bronze.Lakehouse/Tables/{table['name']}",
            "flow_start":job_start,
            "flow_end":datetime.now(),
            "flow_status":job_status,
            "flow_message":job_message,
            "accepted_rows":df_valid_count,
            "warning_rows":0,
            "rejected_rows":df_reject_count,
            "total_rows":df_valid_count + df_reject_count,
            "year": int(job_start.year),
            "month": int(job_start.month),
            "day": int(job_start.day)
        }

        log_flow(flow_data)
        print(f"log flow ajouté avec succès pour {table_name}")


StatementMeta(, 761efd84-b69d-4c2b-beb5-8993450bbbce, 3, Finished, Available, Finished)

In [1]:
def insert_silver_to_gold():
   
    df_silver_employee = spark.read.table("lakehouse_silver.employee")
    df_silver_performance = spark.read.table("lakehouse_silver.performance_rating")
    df_silver_satisfaction = spark.read.table("lakehouse_silver.satisfied_level")
    df_silver_rating = spark.read.table("lakehouse_silver.rating_level")
    df_silver_education = spark.read.table("lakehouse_silver.education_level")

    df_gold_employee = (
        df_silver_employee.join(df_silver_education, df_silver_employee["education"] == df_silver_education["education_level_id"], "left").join(df_silver_performance, "employee_id", "left").select(
            df_silver_employee["employee_id"],
            df_silver_employee["first_name"],
            df_silver_employee["last_name"],
            df_silver_employee["gender"],
            df_silver_employee["age"],
            df_silver_employee["department"],
            df_silver_employee["job_role"],
            df_silver_education["education_level"],
            df_silver_employee["salary"],
            df_silver_employee["attrition"],
            df_silver_performance["job_satisfaction"],
            df_silver_performance["manager_rating"],
            df_silver_performance["work_life_balance"]
        )
    )

    df_gold_employee.write.format("delta").mode("overwrite").saveAsTable("lakehouse_gold.gold_employee")
    print(" Données insérées dans `gold_employee`.")

    df_gold_performance = (
        df_silver_performance.join(df_silver_employee, "employee_id", "left").join(df_silver_satisfaction, df_silver_performance["job_satisfaction"] == df_silver_satisfaction["satisfaction_id"], "left").join(df_silver_rating, df_silver_performance["self_rating"] == df_silver_rating["rating_id"], "left").select(
            df_silver_performance["performance_id"],
            df_silver_performance["employee_id"],
            df_silver_performance["review_date"],
            df_silver_employee["job_role"],
            df_silver_employee["salary"],
            df_silver_performance["job_satisfaction"],
            df_silver_satisfaction["satisfaction_level"],
            df_silver_performance["self_rating"],
            df_silver_rating["rating_level"],
            df_silver_performance["manager_rating"],
            df_silver_performance["training_opportunities_within_year"],
            df_silver_performance["work_life_balance"]
        )
    )

    df_gold_performance.write.format("delta").mode("overwrite").saveAsTable("lakehouse_gold.gold_performance_rating")
    print(" Données insérées dans `gold_performance_rating`.")


StatementMeta(, 216943ad-87f8-40b5-813e-7b50aa5a9892, 3, Finished, Available, Finished)

In [13]:
def log_job_execution(log_data):
    workspace_name = mssparkutils.env.getWorkspaceName()
    monitoring_table = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/monitoring.Lakehouse/Tables/job_execution"

    schema = StructType([
        StructField("job_id", LongType(), True),
        StructField("orchestration_id", LongType(), True),
        StructField("job_name", StringType(), True),
        StructField("job_type", StringType(), True),
        StructField("notebook_name", StringType(), True),
        StructField("source_name", StringType(), True),
        StructField("object_name", StringType(), True),
        StructField("job_start", TimestampType(), True),
        StructField("job_end", TimestampType(), True),
        StructField("valid_files", IntegerType(), True),
        StructField("invalid_files", IntegerType(), True),
        StructField("total_files", IntegerType(), True),
        StructField("source_layer", StringType(), True),
        StructField("target_layer", StringType(), True),
        StructField("target_table_name", StringType(), True),
        StructField("job_status", StringType(), True),
        StructField("job_message", StringType(), True),
        StructField("year", ShortType(), True),
        StructField("month", ShortType(), True),
        StructField("day", ShortType(), True)
    ])

    try:
        df_monitoring = spark.createDataFrame([log_data], schema=schema)
        df_monitoring.write.format("delta").mode("append").save(monitoring_table)

        print(f" Log ajouté avec succès pour {log_data['target_table_name']}")

    except Exception as e:
        print(f" Erreur lors de l'insertion du log : {str(e)}")


StatementMeta(, 2e4a4595-ad9e-4a9d-9da2-94a8741eac23, 15, Finished, Available, Finished)

In [1]:
def log_flow(log_flow_data):
    workspace_name = mssparkutils.env.getWorkspaceName()
    monitoring_table = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/monitoring.Lakehouse/Tables/flow"

    schema = StructType([
        StructField("job_id", LongType(), True),
        StructField("flow_id", LongType(), True),
        StructField("file_path", StringType(), True),
        StructField("flow_start", TimestampType(), True),
        StructField("flow_end", TimestampType(), True),
        StructField("flow_status", StringType(), True),
        StructField("flow_message", StringType(), True),
        StructField("accepted_rows", IntegerType(), True),
        StructField("warning_rows", IntegerType(), True),
        StructField("rejected_rows", IntegerType(), True),
        StructField("total_rows", IntegerType(), True),
        StructField("year", ShortType(), True),
        StructField("month", ShortType(), True),
        StructField("day", ShortType(), True)
    ])

    try:
        
        df_flow = spark.createDataFrame([log_flow_data], schema=schema)
        df_flow.write.format("delta").mode("append").save(monitoring_table)

        print(f"✅ Log ajouté avec succès pour {log_flow_data['file_path']}")

    except Exception as e:
        print(f"❌ Erreur lors de l'insertion du log dans `flow` : {str(e)}")


StatementMeta(, 1b82d624-f09b-4e90-95bb-77c8d0121286, 3, Finished, Available, Finished)

In [1]:
def log_orchestration(log_data):
    workspace_name = mssparkutils.env.getWorkspaceName()
    monitoring_table = f"abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/monitoring.Lakehouse/Tables/orchestration"

    schema = StructType([
        StructField("orchestration_id", LongType(), True),
        StructField("suplementary_run_id", StringType(), True),
        StructField("orchestration_name", StringType(), True),
        StructField("data_product_name", StringType(), True),
        StructField("domain_name", StringType(), True),
        StructField("entity_name", StringType(), True),
        StructField("environment", StringType(), True),
        StructField("workspace_name", StringType(), True),
        StructField("user_name", StringType(), True),
        StructField("orchestration_start", TimestampType(), True),
        StructField("orchestration_end", TimestampType(), True),
        StructField("orchestration_status", StringType(), True),
        StructField("orchestration_message", StringType(), True),
        StructField("year", ShortType(), True),
        StructField("month", ShortType(), True),
        StructField("day", ShortType(), True)
    ])

    try:
        df_log = spark.createDataFrame([log_data], schema=schema)
        df_log.write.format("delta").mode("append").save(monitoring_table)
        print(f"📌 Log orchestration ajouté avec statut : {log_data['orchestration_status']}")
    except Exception as e:
        print(f"❌ Erreur lors de l'insertion du log dans `orchestration` : {str(e)}")

StatementMeta(, 91d2663e-08bb-4673-8e67-2a436926517d, 3, Finished, Available, Finished)

In [2]:
def extract_sources_from_yaml(config, yaml_key="tables_gold"):
    orchestration_start = datetime.now()
    user_name = getpass.getuser()
    workspace_name = mssparkutils.env.getWorkspaceName()
    environment = "dev"
    data_product_name = "load_employee_data"
    domain_name = "HR"
    entity_name = "employee"
    orchestration_name = "extract_sources"
    orchestration_id = int(uuid.uuid4().int % (10**10))
    suplementary_run_id = str(uuid.uuid4())
    sources = []

    try:
        if yaml_key in config and config[yaml_key]:
            for table in config[yaml_key]:
                if "sources" in table:
                    for source in table["sources"]:
                        if "source_path" in source:
                            sources.append(source["source_path"])
            orchestration_status = "SUCCESS"
            orchestration_message = f"{len(sources)} sources extraites avec succès"
        else:
            orchestration_status = "FAILED"
            orchestration_message = f"La clé '{yaml_key}' est absente ou vide dans le YAML"
    except Exception as e:
        orchestration_status = "FAILED"
        orchestration_message = str(e)

    orchestration_end = datetime.now()

    log_data = {
        "orchestration_id": orchestration_id,
        "suplementary_run_id": suplementary_run_id,
        "orchestration_name": orchestration_name,
        "data_product_name": data_product_name,
        "domain_name": domain_name,
        "entity_name": entity_name,
        "environment": environment,
        "workspace_name": workspace_name,
        "user_name": user_name,
        "orchestration_start": orchestration_start,
        "orchestration_end": orchestration_end,
        "orchestration_status": orchestration_status,
        "orchestration_message": orchestration_message,
        "year": orchestration_start.year,
        "month": orchestration_start.month,
        "day": orchestration_start.day
    }

    log_orchestration(log_data)
    return list(set(sources))


StatementMeta(, 91d2663e-08bb-4673-8e67-2a436926517d, 4, Finished, Available, Finished)