In [0]:
# ------------------------------
# Notebook: 05_etl_pipeline_ultimate
# Purpose: Ultimate working ETL process - CAST ALL COLUMNS
# ------------------------------

from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

print("=== ULTIMATE ETL PIPELINE ===")
print(f"Execution started at: {datetime.now()}")

def simple_log(message, status="INFO"):
    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] {status}: {message}")

simple_log("Starting ultimate ETL process with explicit casting")

try:
    # 1️ Nettoyer TOUTES les tables existantes
    print("\n=== CLEANING ALL EXISTING TABLES ===")
    
    tables_to_clean = [
        "dim_student", "dim_course", "dim_activity", "dim_assessment", "dim_time",
        "fact_student_performance", "fact_student_engagement", "fact_assessment_scores"
    ]
    
    for table in tables_to_clean:
        try:
            spark.sql(f"DROP TABLE IF EXISTS {table}")
            simple_log(f"Dropped table: {table}")
        except:
            simple_log(f"Could not drop {table}", "WARNING")
    
    # 2️ Charger les tables sources avec casting explicite
    print("\n=== LOADING SOURCE TABLES WITH EXPLICIT CASTING ===")
    
    student_info = spark.table("student_info")
    courses = spark.table("courses")
    student_vle = spark.table("student_vle")
    student_assessment = spark.table("student_assessment")
    assessments = spark.table("assessments")
    vle = spark.table("vle")
    
    simple_log(f"Loaded source tables")
    
    # 3️ Créer les tables de dimension avec CASTING EXPLICITE
    print("\n=== CREATING DIMENSION TABLES WITH EXPLICIT CASTING ===")
    
    # dim_student - CAST EXPLICITE
    simple_log("Creating dim_student with explicit casting...")
    dim_student = student_info.select(
        col("id_student").cast("long").alias("id_student"),
        col("gender").cast("string").alias("gender"),
        col("region").cast("string").alias("region"),
        col("highest_education").cast("string").alias("highest_education"),
        col("imd_band").cast("string").alias("imd_band"),
        col("age_band").cast("string").alias("age_band"),
        col("disability").cast("string").alias("disability"),
        col("num_of_prev_attempts").cast("int").alias("num_of_prev_attempts")
    ).distinct()
    
    dim_student.write.format("delta").mode("overwrite").saveAsTable("dim_student")
    simple_log(f"Created dim_student: {dim_student.count():,} records")
    
    # dim_course - CAST EXPLICITE
    simple_log("Creating dim_course with explicit casting...")
    dim_course = courses.select(
        col("code_module").cast("string").alias("code_module"),
        col("code_presentation").cast("string").alias("code_presentation"),
        col("module_presentation_length").cast("int").alias("length"),
        when(col("code_presentation").contains("B"), "February")
          .when(col("code_presentation").contains("J"), "October")
          .otherwise("Unknown").cast("string").alias("start_month")
    ).distinct()
    
    dim_course.write.format("delta").mode("overwrite").saveAsTable("dim_course")
    simple_log(f"Created dim_course: {dim_course.count():,} records")
    
    # dim_activity - CAST EXPLICITE
    simple_log("Creating dim_activity with explicit casting...")
    dim_activity = vle.select(
        col("id_site").cast("long").alias("id_site"),
        col("activity_type").cast("string").alias("activity_type"),
        when(col("activity_type").isin(["forum", "discussion", "glossary"]), "Social")
          .when(col("activity_type").isin(["resource", "file", "url", "page"]), "Content")
          .when(col("activity_type").isin(["quiz", "assignment", "subpage", "question"]), "Assessment")
          .when(col("activity_type").isin(["homepage", "folder"]), "Navigation")
          .otherwise("Other").cast("string").alias("activity_category")
    ).distinct()
    
    dim_activity.write.format("delta").mode("overwrite").saveAsTable("dim_activity")
    simple_log(f"Created dim_activity: {dim_activity.count():,} records")
    
    # dim_assessment - CAST EXPLICITE
    simple_log("Creating dim_assessment with explicit casting...")
    dim_assessment = assessments.select(
        col("id_assessment").cast("long").alias("id_assessment"),
        col("code_module").cast("string").alias("code_module"),
        col("code_presentation").cast("string").alias("code_presentation"),
        col("assessment_type").cast("string").alias("assessment_type"),
        col("date").cast("int").alias("assessment_date"),
        col("weight").cast("double").alias("weight"),
        when(col("assessment_type") == "Exam", "Final")
          .when(col("assessment_type") == "TMA", "Tutor Marked")
          .when(col("assessment_type") == "CMA", "Computer Marked")
          .otherwise("Other").cast("string").alias("assessment_category")
    ).distinct()
    
    dim_assessment.write.format("delta").mode("overwrite").saveAsTable("dim_assessment")
    simple_log(f"Created dim_assessment: {dim_assessment.count():,} records")
    
    # 4️ Créer les tables de faits avec CASTING EXPLICITE
    print("\n=== CREATING FACT TABLES WITH EXPLICIT CASTING ===")
    
    # fact_student_performance - CAST EXPLICITE
    simple_log("Creating fact_student_performance with explicit casting...")
    fact_performance = student_info.select(
        col("id_student").cast("long").alias("id_student"),
        col("code_module").cast("string").alias("code_module"),
        col("code_presentation").cast("string").alias("code_presentation"),
        col("final_result").cast("string").alias("final_result"),
        when(col("final_result").isin(["Pass", "Distinction"]), 1).otherwise(0).cast("int").alias("is_passed"),
        when(col("final_result") == "Fail", 1).otherwise(0).cast("int").alias("is_failed"),
        when(col("final_result") == "Withdrawn", 1).otherwise(0).cast("int").alias("is_withdrawn"),
        col("num_of_prev_attempts").cast("int").alias("num_of_prev_attempts"),
        col("studied_credits").cast("int").alias("studied_credits")
    )
    
    fact_performance.write.format("delta").mode("overwrite").saveAsTable("fact_student_performance")
    simple_log(f"Created fact_student_performance: {fact_performance.count():,} records")
    
    # fact_student_engagement - CAST EXPLICITE
    simple_log("Creating fact_student_engagement with explicit casting...")
    
    engagement_base = student_vle.select(
        col("id_student").cast("long").alias("id_student"),
        col("code_module").cast("string").alias("code_module"),
        col("code_presentation").cast("string").alias("code_presentation"),
        col("sum_click").cast("long").alias("sum_click"),
        col("date").cast("int").alias("date"),
        col("id_site").cast("long").alias("id_site")
    )
    
    fact_engagement = engagement_base.groupBy("id_student", "code_module", "code_presentation").agg(
        sum("sum_click").cast("long").alias("total_clicks"),
        countDistinct("date").cast("long").alias("active_days"),
        countDistinct("id_site").cast("long").alias("resources_accessed")
    ).withColumn(
        "engagement_level",
        when(col("total_clicks") > 1000, "High")
          .when(col("total_clicks") > 500, "Medium")
          .otherwise("Low").cast("string")
    )
    
    fact_engagement.write.format("delta").mode("overwrite").saveAsTable("fact_student_engagement")
    simple_log(f"Created fact_student_engagement: {fact_engagement.count():,} records")
    
    # fact_assessment_scores - CAST EXPLICITE
    simple_log("Creating fact_assessment_scores with explicit casting...")
    
    scores_clean = student_assessment.select(
        col("id_student").cast("long").alias("id_student"),
        col("id_assessment").cast("long").alias("id_assessment"),
        when(col("score").isNull(), lit(None))
          .when(col("score").cast("string").rlike("^[0-9.]+$"), col("score").cast("double"))
          .otherwise(lit(None)).cast("double").alias("score"),
        col("date_submitted").cast("int").alias("date_submitted")
    ).filter(col("score").isNotNull())
    
    fact_assessment_scores = scores_clean.join(
        dim_assessment.select("id_assessment", "assessment_type", "assessment_category"),
        "id_assessment", "left"
    )
    
    fact_assessment_scores.write.format("delta").mode("overwrite").saveAsTable("fact_assessment_scores")
    simple_log(f"Created fact_assessment_scores: {fact_assessment_scores.count():,} records")
    
    # 5️ Créer la dimension temps
    print("\n=== CREATING TIME DIMENSION ===")
    
    simple_log("Creating dim_time...")
    
    time_data = [
        (-60, "Early Registration", "Preparation", "Pre-Course"),
        (-30, "Registration Period", "Preparation", "Pre-Course"),
        (-15, "Late Registration", "Preparation", "Pre-Course"),
        (0, "Course Start", "Beginning", "Early Course"),
        (7, "Week 1", "Beginning", "Early Course"),
        (14, "Week 2", "Beginning", "Early Course"),
        (30, "Month 1", "Core Period", "Early Course"),
        (60, "Month 2", "Core Period", "Mid Course"),
        (90, "Month 3", "Core Period", "Mid Course"),
        (120, "Month 4", "Final Stage", "Late Course"),
        (150, "Month 5", "Final Stage", "Late Course"),
        (180, "Month 6", "Final Stage", "Late Course")
    ]
    
    dim_time = spark.createDataFrame(time_data, ["time_id", "academic_period", "learning_stage", "course_phase"])
    
    dim_time = dim_time.select(
        col("time_id").cast("int").alias("time_id"),
        col("academic_period").cast("string").alias("academic_period"),
        col("learning_stage").cast("string").alias("learning_stage"),
        col("course_phase").cast("string").alias("course_phase")
    )
    
    dim_time.write.format("delta").mode("overwrite").saveAsTable("dim_time")
    simple_log(f"Created dim_time: {dim_time.count():,} records")
    
    # 6️ VÉRIFICATION FINALE
    print("\n=== FINAL DATA WAREHOUSE VERIFICATION ===")
    
    dw_tables = {
        "Dimensions": ["dim_student", "dim_course", "dim_activity", "dim_assessment", "dim_time"],
        "Facts": ["fact_student_performance", "fact_student_engagement", "fact_assessment_scores"]
    }
    
    simple_log(" ULTIMATE DATA WAREHOUSE STATUS:")
    total_records = 0
    success_count = 0
    
    for category, tables in dw_tables.items():
        print(f"\n{category}:")
        for table in tables:
            try:
                count = spark.table(table).count()
                total_records += count
                success_count += 1
                print(f"   {table}: {count:,} records")
            except Exception as e:
                print(f"   {table}: ERROR - {str(e)}")
    
    print("\n" + "="*70)
    if success_count == len(dw_tables['Dimensions']) + len(dw_tables['Facts']):
        print(" ULTIMATE ETL PROCESS COMPLETED SUCCESSFULLY! ")
    else:
        print(f"  ETL PROCESS PARTIALLY SUCCESSFUL: {success_count}/8 tables created")
    print("="*70)
    
    print(f"\n DATA WAREHOUSE CREATED:")
    print(f"   Tables: {success_count}/8 successfully created")
    print(f"   Total Records: {total_records:,}")
    
    print(f"\n READY FOR POWER BI DASHBOARDS!")
    print(f"\n Available tables:")
    for table in dw_tables['Dimensions'] + dw_tables['Facts']:
        if spark.catalog.tableExists(table):
            print(f"   • {table}")
    
    print(f"\n⏱  ETL completed at: {datetime.now()}")
    
except Exception as e:
    print(f"\n ULTIMATE ETL PROCESS FAILED: {str(e)}")
    simple_log(f"Critical error: {str(e)}", "ERROR")
    
    print(f"\n🔧 Ultimate debugging:")
    print(f"   1. All tables dropped and recreated with explicit casting")
    print(f"   2. All columns explicitly cast to avoid type conflicts")
    print(f"   3. Using simple overwrite mode")

print(f"\n=== ULTIMATE ETL EXECUTION FINISHED ===")


=== ULTIMATE ETL PIPELINE ===
Execution started at: 2025-11-09 21:39:19.320579
[21:39:19] INFO: Starting ultimate ETL process with explicit casting

=== CLEANING ALL EXISTING TABLES ===
[21:39:19] INFO: Dropped table: dim_student
[21:39:20] INFO: Dropped table: dim_course
[21:39:20] INFO: Dropped table: dim_activity
[21:39:20] INFO: Dropped table: dim_assessment
[21:39:20] INFO: Dropped table: dim_time
[21:39:21] INFO: Dropped table: fact_student_performance
[21:39:21] INFO: Dropped table: fact_student_engagement
[21:39:21] INFO: Dropped table: fact_assessment_scores

=== LOADING SOURCE TABLES WITH EXPLICIT CASTING ===
[21:39:21] INFO: Loaded source tables

=== CREATING DIMENSION TABLES WITH EXPLICIT CASTING ===
[21:39:21] INFO: Creating dim_student with explicit casting...
[21:39:24] INFO: Created dim_student: 30,279 records
[21:39:24] INFO: Creating dim_course with explicit casting...
[21:39:27] INFO: Created dim_course: 22 records
[21:39:27] INFO: Creating dim_activity with explicit