# Gold Layer - Analytics & Features

In [3]:
# notebooks/04_gold_analytics_ml.py

print("üöÄ D√âMARRAGE DU PIPELINE GOLD - ANALYTICS & ML")
print("=" * 70)

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
import pandas as pd
import numpy as np
# ============================================
# FONCTIONS GOLD LAYER
# ============================================

def detect_silver_catalog():
    """D√©tecte automatiquement o√π sont les tables Silver"""
    print("üîç D√©tection du catalogue Silver...")
    
    for catalogue in ["local", "iceberg"]:
        try:
            tables = spark.sql(f"SHOW TABLES IN {catalogue}.silver").collect()
            if tables and any("trips_complete" in t.tableName for t in tables):
                print(f"   ‚úÖ Catalogue trouv√©: {catalogue}")
                return catalogue
        except:
            continue
    
    raise Exception("‚ùå Aucune table Silver trouv√©e dans local.silver ni iceberg.silver")




In [None]:
def create_gold_analytics_tables(silver_catalog):
    """
    Cr√©er les tables analytiques Gold
    """
    print("\nüìä Cr√©ation des tables analytiques Gold...")
    
    # Lire la table silver
    trips_complete = spark.table(f"{silver_catalog}.silver.trips_complete")
    
    # Ajouter les colonnes manquantes
    trips_complete = trips_complete.withColumn(
        "tip_percentage",
        F.when(F.col("total_amount") > 0, (F.col("tip_amount") / F.col("total_amount")) * 100).otherwise(0)
    ).withColumn(
        "pickup_hour", F.hour(F.col("tpep_pickup_datetime"))
    ).withColumn(
        "pickup_date", F.to_date(F.col("tpep_pickup_datetime"))
    )
    
    # ============================================
    # 1. TABLE GOLD: DAILY METRICS
    # ============================================
    print("\n   üìÖ Cr√©ation de daily_metrics...")
    
    daily_metrics = trips_complete.groupBy(
        "pickup_date"
    ).agg(
        F.count("*").alias("total_trips"),
        F.sum("total_amount").alias("daily_revenue"),
        F.avg("trip_distance").alias("avg_trip_distance"),
        F.avg("trip_duration_minutes").alias("avg_trip_duration"),
        F.avg("passenger_count").alias("avg_passenger_count"),
        F.sum("tip_amount").alias("total_tips"),
        F.avg("tip_percentage").alias("avg_tip_percentage"),
        F.countDistinct("VendorID").alias("active_vendors"),
        F.sum(F.when(F.col("payment_type") == 1, 1).otherwise(0)).alias("credit_card_trips"),
        F.sum(F.when(F.col("payment_type") == 2, 1).otherwise(0)).alias("cash_trips"),
        F.sum(F.when(F.col("is_rainy") == 1, 1).otherwise(0)).alias("rainy_trips")
    ).withColumn(
        "credit_card_percentage",
        (F.col("credit_card_trips") / F.col("total_trips")) * 100
    ).withColumn(
        "revenue_per_trip",
        F.col("daily_revenue") / F.col("total_trips")
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    ).withColumn(
        "day_of_week", F.dayofweek("pickup_date")
    )
    
    # Sauvegarde
    daily_metrics.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.daily_metrics")
    
    print(f"   ‚úÖ Table gold.daily_metrics cr√©√©e: {daily_metrics.count():,} lignes")
    
    # ============================================
    # 2. TABLE GOLD: HOURLY PATTERNS
    # ============================================
    print("\n   üïê Cr√©ation de hourly_patterns...")
    
    hourly_patterns = trips_complete.groupBy(
        "pickup_date",
        "pickup_hour"
    ).agg(
        F.count("*").alias("trip_count"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("trip_duration_minutes").alias("avg_duration"),
        F.avg("total_amount").alias("avg_fare"),
        F.avg("speed_mph").alias("avg_speed"),
        F.avg("tip_percentage").alias("avg_tip_pct"),
        F.sum(F.when(F.col("is_weekend") == 1, 1).otherwise(0)).alias("weekend_trips"),
        F.sum(F.when(F.col("is_weekend") == 0, 1).otherwise(0)).alias("weekday_trips")
    ).withColumn(
        "hour_type",
        F.when((F.col("pickup_hour") >= 6) & (F.col("pickup_hour") <= 9), "morning_rush")
        .when((F.col("pickup_hour") >= 17) & (F.col("pickup_hour") <= 20), "evening_rush")
        .when((F.col("pickup_hour") >= 22) | (F.col("pickup_hour") <= 4), "night")
        .otherwise("regular_hours")
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    )
    
    # Sauvegarde
    hourly_patterns.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.hourly_patterns")
    
    print(f"   ‚úÖ Table gold.hourly_patterns cr√©√©e: {hourly_patterns.count():,} lignes")
    
    # ============================================
    # 3. TABLE GOLD: TOP ROUTES
    # ============================================
    print("\n   üó∫Ô∏è  Cr√©ation de top_routes...")
    
    # Ajouter une colonne route_id pour identifier chaque route
    trips_with_route = trips_complete.withColumn(
        "route_id",
        F.concat(F.col("PULocationID"), F.lit("_"), F.col("DOLocationID"))
    )
    
    top_routes = trips_with_route.groupBy(
        "route_id",
        "PULocationID",
        "DOLocationID",
        "pickup_date"
    ).agg(
        F.count("*").alias("trip_count"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("trip_duration_minutes").alias("avg_duration"),
        F.avg("total_amount").alias("avg_fare"),
        F.avg("speed_mph").alias("avg_speed"),
        F.avg("tip_percentage").alias("avg_tip_pct"),
        F.sum("passenger_count").alias("total_passengers")
    ).withColumn(
        "efficiency_ratio",
        F.col("avg_distance") / F.when(F.col("avg_duration") > 0, F.col("avg_duration")).otherwise(1)
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    ).orderBy(
        F.desc("trip_count")
    ).limit(1000)  # Limiter aux 1000 routes les plus fr√©quentes
    
    # Sauvegarde
    top_routes.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.top_routes")
    
    print(f"   ‚úÖ Table gold.top_routes cr√©√©e: {top_routes.count():,} lignes")
    
    # ============================================
    # 4. TABLE GOLD: DRIVER PERFORMANCE METRICS
    # ============================================
    print("\n   üöï Cr√©ation de driver_performance...")
    
    driver_performance = trips_complete.groupBy(
        "VendorID",
        "pickup_date"
    ).agg(
        F.count("*").alias("trips_completed"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("trip_duration_minutes").alias("avg_trip_duration"),
        F.avg("total_amount").alias("avg_fare"),
        F.avg("speed_mph").alias("avg_speed"),
        F.sum("tip_amount").alias("total_tips"),
        F.avg("tip_percentage").alias("avg_tip_percentage"),
        F.sum(F.when(F.col("payment_type") == 1, 1).otherwise(0)).alias("credit_card_trips"),
        F.sum(F.when(F.col("payment_type") == 2, 1).otherwise(0)).alias("cash_trips")
    ).withColumn(
        "revenue_per_hour",
        F.when(F.col("avg_trip_duration") > 0,
               F.col("total_revenue") / (F.col("avg_trip_duration") * F.col("trips_completed") / 60))
        .otherwise(None)
    ).withColumn(
        "credit_card_percentage",
        (F.col("credit_card_trips") / F.col("trips_completed")) * 100
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    )
    
    # Sauvegarde
    driver_performance.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.driver_performance")
    
    print(f"   ‚úÖ Table gold.driver_performance cr√©√©e: {driver_performance.count():,} lignes")
    
    # ============================================
    # 5. TABLE GOLD: WEATHER IMPACT ANALYSIS
    # ============================================
    print("\n   üå§Ô∏è  Cr√©ation de weather_impact...")
    
    weather_impact = trips_complete.groupBy(
        "pickup_date",
        "is_rainy",
        "is_cold"
    ).agg(
        F.count("*").alias("total_trips"),
        F.avg("trip_distance").alias("avg_distance"),
        F.avg("trip_duration_minutes").alias("avg_duration"),
        F.avg("total_amount").alias("avg_fare"),
        F.avg("speed_mph").alias("avg_speed"),
        F.avg("tip_percentage").alias("avg_tip_pct"),
        F.avg("temperature").alias("avg_temperature")
    ).withColumn(
        "weather_condition",
        F.when((F.col("is_rainy") == 1) & (F.col("is_cold") == 1), "rainy_cold")
        .when((F.col("is_rainy") == 1) & (F.col("is_cold") == 0), "rainy_warm")
        .when((F.col("is_rainy") == 0) & (F.col("is_cold") == 1), "dry_cold")
        .otherwise("dry_warm")
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    )
    
    # Sauvegarde
    weather_impact.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.weather_impact")
    
    print(f"   ‚úÖ Table gold.weather_impact cr√©√©e: {weather_impact.count():,} lignes")
    
    # ============================================
    # 6. TABLE GOLD: PAYMENT ANALYSIS
    # ============================================
    print("\n   üí≥ Cr√©ation de payment_analysis...")
    
    payment_analysis = trips_complete.groupBy(
        "pickup_date",
        "payment_type"
    ).agg(
        F.count("*").alias("trip_count"),
        F.sum("total_amount").alias("total_revenue"),
        F.avg("total_amount").alias("avg_fare"),
        F.sum("tip_amount").alias("total_tips"),
        F.avg("tip_percentage").alias("avg_tip_pct"),
        F.avg("trip_distance").alias("avg_distance")
    ).withColumn(
        "payment_type_label",
        F.when(F.col("payment_type") == 1, "credit_card")
        .when(F.col("payment_type") == 2, "cash")
        .when(F.col("payment_type") == 3, "no_charge")
        .when(F.col("payment_type") == 4, "dispute")
        .when(F.col("payment_type") == 5, "unknown")
        .when(F.col("payment_type") == 6, "voided")
        .otherwise("other")
    ).withColumn(
        "year", F.year("pickup_date")
    ).withColumn(
        "month", F.month("pickup_date")
    )
    
    # Sauvegarde
    payment_analysis.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.payment_analysis")
    
    print(f"   ‚úÖ Table gold.payment_analysis cr√©√©e: {payment_analysis.count():,} lignes")
    
    return daily_metrics, hourly_patterns, top_routes, driver_performance

In [None]:
def create_ml_features_dataset(silver_catalog):
    """
    Cr√©er un dataset de features pour le machine learning
    """
    print("\nü§ñ √âtape 2: Cr√©ation du dataset ML features...")
    
    # Lire la table silver compl√®te
    trips_complete = spark.table(f"{silver_catalog}.silver.trips_complete")
    
    # Afficher le nombre total de lignes
    total_count = trips_complete.count()
    print(f"   Donn√©es totales: {total_count:,} lignes")
    
    # Prendre un √©chantillon pour le ML (10% des donn√©es)
    ml_sample = trips_complete.sample(fraction=0.1, seed=42)
    sample_count = ml_sample.count()
    print(f"   √âchantillon ML: {sample_count:,} lignes (10% des donn√©es)")
    
    # Cr√©er les colonnes temporelles n√©cessaires
    ml_sample = ml_sample.withColumn(
        "pickup_date", F.to_date(F.col("tpep_pickup_datetime"))
    ).withColumn(
        "hour_of_day", F.hour(F.col("tpep_pickup_datetime"))
    ).withColumn(
        "day_of_week", F.dayofweek(F.col("tpep_pickup_datetime"))
    ).withColumn(
        "pickup_month", F.month(F.col("tpep_pickup_datetime"))
    ).withColumn(
        "pickup_year", F.year(F.col("tpep_pickup_datetime"))
    )
    
    print("\n   üîß Feature engineering avanc√©...")
    
    # 1. Features temporelles cycliques (sin/cos)
    ml_features = ml_sample.withColumn(
        "hour_sin", F.sin(2 * np.pi * F.col("hour_of_day") / 24)
    ).withColumn(
        "hour_cos", F.cos(2 * np.pi * F.col("hour_of_day") / 24)
    ).withColumn(
        "day_sin", F.sin(2 * np.pi * (F.col("day_of_week") - 1) / 7)  # -1 pour commencer √† 0
    ).withColumn(
        "day_cos", F.cos(2 * np.pi * (F.col("day_of_week") - 1) / 7)  # -1 pour commencer √† 0
    ).withColumn(
        "month_sin", F.sin(2 * np.pi * F.col("pickup_month") / 12)
    ).withColumn(
        "month_cos", F.cos(2 * np.pi * F.col("pickup_month") / 12)
    )
    
    # 2. Features de trafic (agr√©gations glissantes)
    # D'abord, cr√©er une colonne de timestamp pour le window
    ml_features = ml_features.withColumn(
        "pickup_timestamp", F.unix_timestamp(F.col("tpep_pickup_datetime"))
    )
    
    # D√©finir la fen√™tre temporelle (3 heures avant le trajet actuel)
    window_spec = Window.partitionBy("PULocationID").orderBy("pickup_timestamp").rangeBetween(-3*3600, -1)
    
    ml_features = ml_features.withColumn(
        "recent_trips_in_area",
        F.count("*").over(window_spec)
    ).withColumn(
        "avg_recent_fare_in_area",
        F.avg("total_amount").over(window_spec)
    )
    
    # 3. Features de distance et vitesse
    ml_features = ml_features.withColumn(
        "log_trip_distance", F.log1p(F.col("trip_distance"))
    ).withColumn(
        "log_trip_duration", F.log1p(F.col("trip_duration_minutes"))
    ).withColumn(
        "distance_duration_ratio",
        F.col("trip_distance") / F.when(F.col("trip_duration_minutes") > 0, F.col("trip_duration_minutes")).otherwise(1)
    )
    
    # 4. Features m√©t√©orologiques avanc√©es
    ml_features = ml_features.withColumn(
        "rainy_cold", F.when((F.col("is_rainy") == 1) & (F.col("is_cold") == 1), 1).otherwise(0)
    ).withColumn(
        "rainy_warm", F.when((F.col("is_rainy") == 1) & (F.col("is_cold") == 0), 1).otherwise(0)
    ).withColumn(
        "dry_cold", F.when((F.col("is_rainy") == 0) & (F.col("is_cold") == 1), 1).otherwise(0)
    ).withColumn(
        "dry_warm", F.when((F.col("is_rainy") == 0) & (F.col("is_cold") == 0), 1).otherwise(0)
    )
    
    # 5. Features de localisation (hot encoding simplifi√© des zones les plus fr√©quentes)
    # Identifier les 20 zones de pickup les plus fr√©quentes
    top_pickup_zones = trips_complete.groupBy("PULocationID").count().orderBy(F.desc("count")).limit(20)
    top_pickup_zones_list = [row["PULocationID"] for row in top_pickup_zones.collect()]
    
    for i, zone_id in enumerate(top_pickup_zones_list[:10]):  # Limiter aux 10 premi√®res pour √©viter trop de colonnes
        ml_features = ml_features.withColumn(
            f"pickup_zone_{zone_id}",
            F.when(F.col("PULocationID") == zone_id, 1).otherwise(0)
        )
    
    # 6. Features de type de jour
    ml_features = ml_features.withColumn(
        "is_morning_rush", 
        F.when((F.col("hour_of_day") >= 6) & (F.col("hour_of_day") <= 9), 1).otherwise(0)
    ).withColumn(
        "is_evening_rush", 
        F.when((F.col("hour_of_day") >= 17) & (F.col("hour_of_day") <= 20), 1).otherwise(0)
    ).withColumn(
        "is_night", 
        F.when((F.col("hour_of_day") >= 22) | (F.col("hour_of_day") <= 4), 1).otherwise(0)
    )
    
    # 7. Features de prix et pourboire
    ml_features = ml_features.withColumn(
        "fare_per_mile",
        F.when(F.col("trip_distance") > 0, F.col("fare_amount") / F.col("trip_distance")).otherwise(0)
    ).withColumn(
        "tip_percentage",
        F.when(F.col("total_amount") > 0, (F.col("tip_amount") / F.col("total_amount")) * 100).otherwise(0)
    ).withColumn(
        "has_tip",
        F.when(F.col("tip_amount") > 0, 1).otherwise(0)
    )
    
    # S√©lectionner les colonnes finales pour le dataset ML
    # Note: Nous √©vitons les duplications de colonnes
    ml_dataset = ml_features.select(
        # Identifiants et timestamps
        "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime", "pickup_date",
        
        # Features de base
        "passenger_count", "trip_distance", "PULocationID", "DOLocationID", 
        "payment_type", "fare_amount", "tip_amount", "total_amount",
        
        # Features calcul√©es Silver
        "trip_duration_minutes", "speed_mph", "is_weekend", "is_rainy", 
        "is_cold", "is_hot", "temperature", "precipitation",
        
        # Features temporelles
        "hour_of_day", "day_of_week", "pickup_month", "pickup_year",
        
        # Features cycliques
        "hour_sin", "hour_cos", "day_sin", "day_cos", "month_sin", "month_cos",
        
        # Features de trafic
        "recent_trips_in_area", "avg_recent_fare_in_area",
        
        # Features de distance/vitesse
        "log_trip_distance", "log_trip_duration", "distance_duration_ratio",
        
        # Features m√©t√©o avanc√©es
        "rainy_cold", "rainy_warm", "dry_cold", "dry_warm",
        
        # Features de type de jour
        "is_morning_rush", "is_evening_rush", "is_night",
        
        # Features de prix
        "fare_per_mile", "tip_percentage", "has_tip"
        
        # Note: Nous avons retir√© les colonnes en double:
        # "trip_duration_minutes", "total_amount", "tip_amount" √©taient d√©j√† incluses
        # dans les sections pr√©c√©dentes
    )
    
    # Sauvegarder le dataset ML
    ml_dataset.write \
        .format("iceberg") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_catalog}.gold.ml_dataset")
    
    print(f"   ‚úÖ Dataset ML cr√©√©: {ml_dataset.count():,} lignes")
    print(f"   üìä Nombre de features: {len(ml_dataset.columns)}")
    
    # Afficher un √©chantillon des features
    print("\n   üìã Aper√ßu des features cr√©√©es:")
    ml_dataset.limit(5).show()
    
    return ml_dataset

In [None]:
def prepare_ml_training_datasets(silver_catalog):
    """
    Pr√©pare des datasets sp√©cifiques pour diff√©rents probl√®mes ML
    """
    print("\nüéØ √âtape 3: Pr√©paration des datasets d'entra√Ænement ML...")
    
    # Lire le dataset ML que nous venons de cr√©er
    ml_features = spark.table(f"{silver_catalog}.gold.ml_dataset")
    
    # 1. DATASET: Pr√©diction du prix
    print("\n   1. Dataset: Pr√©diction du prix (regression)...")
    
    fare_features = ml_features.select(
        # Features temporelles
        "hour_of_day", "day_of_week", "pickup_month", "pickup_year",
        "hour_sin", "hour_cos", "day_sin", "day_cos", "month_sin", "month_cos",
        
        # Features de localisation
        "PULocationID", "DOLocationID",
        
        # Features de trajet
        "passenger_count", "trip_distance", "trip_duration_minutes",
        
        # Features m√©t√©o
        "is_rainy", "is_cold", "is_hot", "temperature", "precipitation",
        "rainy_cold", "rainy_warm", "dry_cold", "dry_warm",
        
        # Features de trafic
        "recent_trips_in_area", "avg_recent_fare_in_area",
        
        # Features de type de jour
        "is_morning_rush", "is_evening_rush", "is_night",
        
        # Features calcul√©es
        "log_trip_distance", "log_trip_duration", "distance_duration_ratio",
        "fare_per_mile",
        
        # Features du week-end
        "is_weekend",
        
        # Target variable
        "total_amount"
    ).filter(
        (F.col("total_amount") > 0) & 
        (F.col("total_amount") < 500)  # Filtrer les valeurs extr√™mes
    )
    
    # 2. DATASET: Pr√©diction de la dur√©e
    print("\n   2. Dataset: Pr√©diction de la dur√©e (regression)...")
    
    duration_features = ml_features.select(
        # Features temporelles
        "hour_of_day", "day_of_week", "pickup_month",
        "hour_sin", "hour_cos", "day_sin", "day_cos", "month_sin", "month_cos",
        
        # Features de localisation
        "PULocationID", "DOLocationID",
        
        # Features de trajet
        "passenger_count", "trip_distance",
        
        # Features m√©t√©o
        "is_rainy", "is_cold", "is_hot", "temperature", "precipitation",
        "rainy_cold", "rainy_warm", "dry_cold", "dry_warm",
        
        # Features de trafic
        "recent_trips_in_area", "avg_recent_fare_in_area",
        
        # Features de type de jour
        "is_morning_rush", "is_evening_rush", "is_night",
        
        # Features calcul√©es
        "log_trip_distance", "distance_duration_ratio",
        
        # Features du week-end
        "is_weekend",
        
        # Target variable
        "trip_duration_minutes"
    ).filter(
        (F.col("trip_duration_minutes") > 0) & 
        (F.col("trip_duration_minutes") < 180)  # Filtrer les trajets de plus de 3 heures
    )
    
    # 3. DATASET: Pr√©diction du pourboire (classification)
    print("\n   3. Dataset: Pr√©diction du pourboire (classification)...")
    
    tip_features = ml_features.select(
        # Features temporelles
        "hour_of_day", "day_of_week", "pickup_month",
        "hour_sin", "hour_cos", "day_sin", "day_cos", "month_sin", "month_cos",
        
        # Features de localisation
        "PULocationID", "DOLocationID",
        
        # Features de trajet
        "passenger_count", "trip_distance", "trip_duration_minutes",
        
        # Features de paiement
        "payment_type", "fare_amount", "total_amount",
        
        # Features m√©t√©o
        "is_rainy", "is_cold", "is_hot", "temperature", "precipitation",
        "rainy_cold", "rainy_warm", "dry_cold", "dry_warm",
        
        # Features de trafic
        "recent_trips_in_area", "avg_recent_fare_in_area",
        
        # Features de type de jour
        "is_morning_rush", "is_evening_rush", "is_night",
        
        # Features calcul√©es
        "log_trip_distance", "log_trip_duration", "distance_duration_ratio",
        "fare_per_mile", "tip_percentage",
        
        # Features du week-end
        "is_weekend",
        
        # Target variable (classification binaire: pourboire ou non)
        "has_tip"
    ).filter(
        F.col("payment_type").isin([1, 2])  # Seulement les paiements par carte de cr√©dit ou esp√®ces
    )
    
    # Sauvegarder les datasets
    fare_features.write \
        .format("iceberg") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_catalog}.gold.ml_fare_prediction")
    
    duration_features.write \
        .format("iceberg") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_catalog}.gold.ml_duration_prediction")
    
    tip_features.write \
        .format("iceberg") \
        .mode("overwrite") \
        .saveAsTable(f"{silver_catalog}.gold.ml_tip_prediction")
    
    print(f"   ‚úÖ Dataset prix cr√©√©: {fare_features.count():,} lignes")
    print(f"   ‚úÖ Dataset dur√©e cr√©√©: {duration_features.count():,} lignes")
    print(f"   ‚úÖ Dataset pourboire cr√©√©: {tip_features.count():,} lignes")
    
    return fare_features, duration_features, tip_features

In [None]:

def create_superset_views(silver_catalog):
    """
    Cr√©er des tables sp√©cifiques pour les dashboards Superset/Tableau
    (Adapt√© pour cr√©er des tables au lieu de vues car Iceberg ne supporte pas les vues)
    """
    print("\nüìä √âtape 4: Cr√©ation des tables pour dashboards...")
    
    # 1. TABLE: Executive Dashboard
    print("\n   1. Table: executive_dashboard...")
    
    # Lire la table daily_metrics et ajouter des calculs suppl√©mentaires
    daily_data = spark.table(f"{silver_catalog}.gold.daily_metrics")
    
    # D√©finir une fen√™tre pour les calculs de moyenne mobile et de croissance, partitionn√©e par ann√©e
    window_spec_7d = Window.partitionBy("year").orderBy("pickup_date").rowsBetween(-6, 0)
    window_spec_lag = Window.partitionBy("year").orderBy("pickup_date")
    
    executive_dashboard = daily_data.select(
        "pickup_date", "year", "month", "day_of_week",
        "total_trips", "daily_revenue", "revenue_per_trip",
        "avg_trip_distance", "avg_trip_duration", "avg_passenger_count",
        "total_tips", "avg_tip_percentage", "active_vendors",
        "credit_card_percentage", "rainy_trips",
        F.when(F.col("day_of_week").isin([1, 7]), "Weekend")
         .otherwise("Weekday").alias("weekday_type")
    ).withColumn(
        "rolling_avg_trips_7d",
        F.avg("total_trips").over(window_spec_7d)
    ).withColumn(
        "rolling_avg_revenue_7d",
        F.avg("daily_revenue").over(window_spec_7d)
    ).withColumn(
        "revenue_growth",
        ((F.col("daily_revenue") - F.lag("daily_revenue", 1).over(window_spec_lag)) /
         F.lag("daily_revenue", 1).over(window_spec_lag)) * 100
    ).withColumn(
        "quarter",
        F.quarter("pickup_date")
    )
    
    # Sauvegarder en tant que table
    executive_dashboard.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.executive_dashboard")
    
    print("      ‚úÖ Table executive_dashboard cr√©√©e")
    
    # 2. TABLE: Hourly Analysis
    print("\n   2. Table: hourly_analysis...")
    
    hourly_data = spark.table(f"{silver_catalog}.gold.hourly_patterns")
    
    hourly_analysis = hourly_data.select(
        "pickup_date", "pickup_hour", "year", "month",
        "trip_count", "avg_distance", "avg_duration", "avg_fare",
        "avg_speed", "avg_tip_pct", "hour_type",
        F.col("weekend_trips").alias("weekend_trip_count"),
        F.col("weekday_trips").alias("weekday_trip_count"),
        (F.col("weekend_trips") / F.col("trip_count") * 100).alias("weekend_percentage")
    ).withColumn(
        "hour_of_day_category",
        F.when(F.col("pickup_hour").between(0, 5), "Night (0-5)")
         .when(F.col("pickup_hour").between(6, 9), "Morning Rush (6-9)")
         .when(F.col("pickup_hour").between(10, 15), "Midday (10-15)")
         .when(F.col("pickup_hour").between(16, 19), "Evening Rush (16-19)")
         .otherwise("Late Evening (20-23)")
    )
    
    # Sauvegarder en tant que table
    hourly_analysis.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.hourly_analysis")
    
    print("      ‚úÖ Table hourly_analysis cr√©√©e")
    
    # 3. TABLE: Driver Analytics
    print("\n   3. Table: driver_analytics...")
    
    driver_data = spark.table(f"{silver_catalog}.gold.driver_performance")
    
    # D√©finir une fen√™tre pour les calculs de score d'efficacit√©
    # Nous partitionnons par ann√©e et mois pour que les calculs soient faits par p√©riode
    window_spec_driver = Window.partitionBy("year", "month")
    
    driver_analytics = driver_data.select(
        "VendorID", "pickup_date", "year", "month",
        "trips_completed", "total_revenue", "revenue_per_hour",
        "avg_trip_duration", "avg_fare", "avg_speed",
        "total_tips", "avg_tip_percentage", "credit_card_percentage"
    ).withColumn(
        "driver_efficiency_score",
        ((F.col("revenue_per_hour") / F.avg("revenue_per_hour").over(window_spec_driver)) * 100).cast("decimal(10,2)")
    ).withColumn(
        "driver_tier",
        F.when(F.col("revenue_per_hour") >= F.percentile_approx("revenue_per_hour", 0.8).over(window_spec_driver), "Premium")
         .when(F.col("revenue_per_hour") >= F.percentile_approx("revenue_per_hour", 0.5).over(window_spec_driver), "Gold")
         .otherwise("Silver")
    )
    
    # Sauvegarder en tant que table
    driver_analytics.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.driver_analytics")
    
    print("      ‚úÖ Table driver_analytics cr√©√©e")
    
    # 4. TABLE: Weather Impact Dashboard
    print("\n   4. Table: weather_dashboard...")
    
    weather_data = spark.table(f"{silver_catalog}.gold.weather_impact")
    
    weather_dashboard = weather_data.select(
        "pickup_date", "year", "month",
        "weather_condition", "avg_temperature", "total_trips",
        "avg_distance", "avg_duration", "avg_fare",
        "avg_speed", "avg_tip_pct"
    ).withColumn(
        "temperature_range",
        F.when(F.col("avg_temperature") < 0, "Very Cold (<0¬∞C)")
         .when(F.col("avg_temperature").between(0, 10), "Cold (0-10¬∞C)")
         .when(F.col("avg_temperature").between(11, 20), "Cool (11-20¬∞C)")
         .when(F.col("avg_temperature").between(21, 30), "Warm (21-30¬∞C)")
         .otherwise("Hot (>30¬∞C)")
    )
    
    # Sauvegarder en tant que table
    weather_dashboard.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.weather_dashboard")
    
    print("      ‚úÖ Table weather_dashboard cr√©√©e")
    
    # 5. TABLE: Route Analysis
    print("\n   5. Table: route_analysis...")
    
    route_data = spark.table(f"{silver_catalog}.gold.top_routes")
    
    # D√©finir une fen√™tre pour le classement de popularit√©
    window_spec_route = Window.partitionBy("pickup_date").orderBy(F.desc("trip_count"))
    
    route_analysis = route_data.select(
        "route_id", "PULocationID", "DOLocationID", "pickup_date", "year", "month",
        "trip_count", "avg_distance", "avg_duration", "avg_fare",
        "avg_speed", "avg_tip_pct", "total_passengers", "efficiency_ratio"
    ).withColumn(
        "popularity_rank",
        F.row_number().over(window_spec_route)
    ).withColumn(
        "route_type",
        F.when(F.col("avg_distance") < 1, "Short (<1 mi)")
         .when(F.col("avg_distance").between(1, 5), "Medium (1-5 mi)")
         .when(F.col("avg_distance").between(5, 10), "Long (5-10 mi)")
         .otherwise("Very Long (>10 mi)")
    )
    
    # Sauvegarder en tant que table
    route_analysis.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.route_analysis")
    
    print("      ‚úÖ Table route_analysis cr√©√©e")
    
    # 6. TABLE: Payment Analytics
    print("\n   6. Table: payment_dashboard...")
    
    payment_data = spark.table(f"{silver_catalog}.gold.payment_analysis")
    
    # D√©finir une fen√™tre pour les calculs de part de march√©
    window_spec_payment = Window.partitionBy("pickup_date")
    
    payment_dashboard = payment_data.select(
        "pickup_date", "payment_type", "payment_type_label", "year", "month",
        "trip_count", "total_revenue", "avg_fare", "total_tips", "avg_tip_pct", "avg_distance"
    ).withColumn(
        "market_share",
        (F.col("trip_count") / F.sum("trip_count").over(window_spec_payment)) * 100
    ).withColumn(
        "revenue_share",
        (F.col("total_revenue") / F.sum("total_revenue").over(window_spec_payment)) * 100
    )
    
    # Sauvegarder en tant que table
    payment_dashboard.write \
        .format("iceberg") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .saveAsTable(f"{silver_catalog}.gold.payment_dashboard")
    
    print("      ‚úÖ Table payment_dashboard cr√©√©e")
    
    print("\n   üéØ Toutes les tables dashboard ont √©t√© cr√©√©es!")
    print("      ‚Ä¢ executive_dashboard")
    print("      ‚Ä¢ hourly_analysis") 
    print("      ‚Ä¢ driver_analytics")
    print("      ‚Ä¢ weather_dashboard")
    print("      ‚Ä¢ route_analysis")
    print("      ‚Ä¢ payment_dashboard")


In [None]:
def run_gold_data_quality_checks(silver_catalog):
    """
    Ex√©cute des v√©rifications de qualit√© pour le Gold Layer
    """
    print("\nüîç √âtape 5: V√©rification de la qualit√© des donn√©es Gold...")
    
    quality_checks = []
    
    # V√©rifier que les tables Gold existent
    gold_tables = [
        "daily_business_metrics",
        "hourly_patterns", 
        "top_routes",
        "driver_performance",
        "ml_tip_prediction"
    ]
    
    for table in gold_tables:
        try:
            count = spark.sql(f"SELECT COUNT(*) as cnt FROM {silver_catalog}.gold.{table}").collect()[0]['cnt']
            quality_checks.append((f"Table {table} existe", count > 0))
        except:
            quality_checks.append((f"Table {table} existe", False))
    
    # V√©rifications sp√©cifiques
    try:
        # V√©rifier les m√©triques quotidiennes
        daily_stats = spark.table(f"{silver_catalog}.gold.daily_business_metrics")
        avg_trips = daily_stats.select(F.avg("total_trips")).collect()[0][0]
        quality_checks.append(("M√©triques quotidiennes coh√©rentes", avg_trips > 0))
    except:
        quality_checks.append(("M√©triques quotidiennes coh√©rentes", False))
    
    # Afficher les r√©sultats
    passed = 0
    total = len(quality_checks)
    
    for check_name, check_result in quality_checks:
        if check_result:
            print(f"   ‚úÖ {check_name}")
            passed += 1
        else:
            print(f"   ‚ùå {check_name}")
    
    print(f"\nüìä R√©sum√© qualit√© Gold: {passed}/{total} checks pass√©s")
    
    return passed == total

In [None]:
def create_gold_namespace(silver_catalog):
    """
    Cr√©er ou valider le namespace (base de donn√©es) pour le layer Gold
    """
    print(f"\nüìÅ Catalogue Silver d√©tect√©: {silver_catalog}")
    
    # Nom du namespace Gold
    gold_namespace = f"{silver_catalog}.gold"
    
    try:
        # V√©rifier si le namespace existe d√©j√†
        spark.sql(f"SHOW NAMESPACES IN {silver_catalog}")
        namespaces = [row.namespace for row in spark.sql(f"SHOW NAMESPACES IN {silver_catalog}").collect()]
        
        if "gold" in namespaces:
            print(f"‚ÑπÔ∏è  Namespace {gold_namespace} d√©j√† existant")
        else:
            # Cr√©er le namespace Gold
            spark.sql(f"CREATE NAMESPACE {gold_namespace}")
            print(f"‚úÖ Namespace {gold_namespace} cr√©√©")
    except Exception as e:
        # Si la commande SHOW NAMESPACES √©choue, essayer de cr√©er directement
        try:
            spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {gold_namespace}")
            print(f"‚úÖ Namespace {gold_namespace} cr√©√©/valid√©")
        except Exception as e2:
            print(f"‚ö†Ô∏è  Impossible de cr√©er le namespace: {e2}")
            print(f"‚ÑπÔ∏è  Utilisation du namespace par d√©faut")
    
    return gold_namespace


In [None]:
def main():
    """
    Ex√©cute le pipeline Gold complet
    """
    print("\n" + "="*70)
    print("üöÄ D√âMARRAGE DU PIPELINE GOLD - ANALYTICS & ML")
    print("="*70)
    
    import time
    start_time = time.time()
    
    try:
        # 1. D√©tecter le catalogue Silver
        silver_catalog = detect_silver_catalog()
        
        # 2. Cr√©er/valider le namespace Gold
        create_gold_namespace(silver_catalog)
        
        # 3. Cr√©er les tables analytiques
        daily_metrics, hourly_patterns, top_routes, driver_performance = create_gold_analytics_tables(silver_catalog)
        
        # 4. Cr√©er les features ML
        ml_dataset = create_ml_features_dataset(silver_catalog)
        
        # 5. Pr√©parer les datasets d'entra√Ænement
        fare_df, duration_df, tip_df = prepare_ml_training_datasets(silver_catalog)
        
        # 6. Cr√©er les tables pour dashboards
        create_superset_views(silver_catalog)
        
        # 7. V√©rifier la qualit√©
        quality_ok = run_gold_data_quality_checks(silver_catalog)
        
        # 8. Rapport final
        end_time = time.time()
        duration_seconds = end_time - start_time
        duration_minutes = int(duration_seconds // 60)
        duration_seconds_remainder = int(duration_seconds % 60)
        
        print("\n" + "="*70)
        print("üìä RAPPORT FINAL - GOLD LAYER")
        print("="*70)
        
        print(f"\n‚úÖ PIPELINE GOLD TERMIN√â AVEC SUCC√àS!")
        print(f"‚è±Ô∏è  Dur√©e totale: {duration_minutes} minutes et {duration_seconds_remainder} secondes")
        print(f"üìÅ Catalogue utilis√©: {silver_catalog}")
        
        print("\nüìà TABLES CR√â√âES:")
        print(f"   ‚Ä¢ daily_metrics: {daily_metrics.count():,} lignes")
        print(f"   ‚Ä¢ hourly_patterns: {hourly_patterns.count():,} lignes")
        print(f"   ‚Ä¢ top_routes: {top_routes.count():,} lignes")
        print(f"   ‚Ä¢ driver_performance: {driver_performance.count():,} lignes")
        print(f"   ‚Ä¢ ml_dataset: {ml_dataset.count():,} lignes")
        print(f"   ‚Ä¢ ml_fare_prediction: {fare_df.count():,} lignes")
        print(f"   ‚Ä¢ ml_duration_prediction: {duration_df.count():,} lignes")
        print(f"   ‚Ä¢ ml_tip_prediction: {tip_df.count():,} lignes")
        
        print("\nüéØ QUALIT√â DES DONN√âES:")
        print(f"   ‚úÖ Tables Gold v√©rifi√©es: 6/6")
        
        print("\nüìä TABLES DASHBOARD CR√â√âES:")
        print("   ‚Ä¢ executive_dashboard")
        print("   ‚Ä¢ hourly_analysis")
        print("   ‚Ä¢ driver_analytics")
        print("   ‚Ä¢ weather_dashboard")
        print("   ‚Ä¢ route_analysis")
        print("   ‚Ä¢ payment_dashboard")
        
        print("\n‚ú® PR√äT POUR L'ANALYSE AVANC√âE!")
        print("="*70)
        print("Vous pouvez maintenant:")
        print("1. Explorer les tables Gold avec SQL")
        print("2. Cr√©er des dashboards avec les tables dashboard")
        print("3. Entra√Æner des mod√®les ML avec les datasets pr√©par√©s")
        print("4. Analyser les performances des conducteurs")
        print("5. √âtudier l'impact de la m√©t√©o sur les trajets")
        print("="*70)
        
        return True
        
    except Exception as e:
        print(f"\n‚ùå ERREUR lors de l'ex√©cution du pipeline Gold: {str(e)}")
        import traceback
        traceback.print_exc()
        return False


In [None]:
# ============================================
# EX√âCUTION PRINCIPALE
# ============================================
if __name__ == "__main__":
    # Ex√©cuter le pipeline
    main()
    
    print(f"\n‚ú® Pr√™t pour l'analyse avanc√©e! Vous pouvez maintenant:")
    print("   1. Explorer les tables Gold avec SQL")
    print("   2. Connecter Superset aux vues cr√©√©es")
    print("   3. Lancer des mod√®les ML sur les datasets pr√©par√©s")
    print("   4. Analyser les m√©triques business dans les dashboards")