# Script de prétraitement des données e-commerce pour l'analyse comportementale et le système de recommandation Ce script structure le traitement en phases distinctes


## Importation des bibliothèques et configuration initiale

## importations et la configuration initiale

In [32]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, when, month, dayofweek, hour, minute, second, dayofmonth, countDistinct,
    desc, sum, avg, min, max, datediff, lit, to_date, date_format, 
    collect_list, struct,unix_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
)
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from datetime import datetime
import logging

# Configuration du logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("preprocessing.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Paramètres globaux
input_file = "../data/2019-Oct_reduit.csv"
# input_file = "../data/2019-Oct.csv"
output_directory = "./data/processed"

# Création du répertoire de sortie s'il n'existe pas
os.makedirs(output_directory, exist_ok=True)

# Définition du schéma pour les données
schema = StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_session", StringType(), True)
])

def create_spark_session():
    """
    Crée et retourne une session Spark avec la configuration appropriée
    
    Returns:
        SparkSession: Session Spark configurée
    """
    return SparkSession.builder \
        .appName("E-commerce Data Analysis") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .config("spark.sql.session.timeZone", "UTC") \
        .master("local[*]") \
        .getOrCreate()


## Chargement et exploration des données e-commerce

In [33]:
def load_data(spark_session=None, input_path=input_file, schema=schema):
    """
    Charge les données depuis le fichier CSV avec le schéma défini
    
    Args:
        spark_session: Session Spark à utiliser (crée une nouvelle si None)
        input_path: Chemin du fichier à charger
        schema: Schéma à appliquer
    
    Returns:
        DataFrame Spark contenant les données chargées
    """
    if spark_session is None:
        spark_session = create_spark_session()
    
    logger.info(f"Chargement des données depuis {input_path}")
    try:
        df = spark_session.read.csv(
            input_path,
            header=True,
            schema=schema
        )
        logger.info(f"Données chargées : {df.count()} lignes")
        
        # Afficher un aperçu des données
        logger.info("Aperçu des données:")
        df.show(5)
        
        return df
    except Exception as e:
        logger.error(f"Erreur lors du chargement des données : {str(e)}")
        return None

def explore_data(df):
    """
    Exploration initiale des données
    
    Args:
        df: DataFrame Spark à explorer
    """
    if df is None:
        logger.error("Les données n'ont pas été chargées")
        return
    
    logger.info("Exploration initiale des données")
    logger.info(f"Nombre de lignes: {df.count()}")
    logger.info(f"Nombre de colonnes: {len(df.columns)}")
    
    # Afficher le schéma
    logger.info("Schéma du DataFrame:")
    df.printSchema()
    
    # Calculer les statistiques descriptives 
    logger.info("Statistiques descriptives:")
    df.describe().show()
    
    # Calculer le nombre de valeurs manquantes par colonne
    logger.info("Valeurs manquantes par colonne:")
    for column in df.columns:
        missing_count = df.filter(col(column).isNull()).count()
        missing_percentage = (missing_count / df.count()) * 100
        logger.info(f"{column}: {missing_count} ({missing_percentage:.2f}%)")
    
    # Distribution des types d'événements
    logger.info("Distribution des types d'événements:")
    df.groupBy("event_type").count().orderBy(desc("count")).show()
    
    # Distribution des catégories principales
    logger.info("Top 10 des catégories:")
    df.groupBy("category_code").count().orderBy(desc("count")).limit(10).show()
    
    # Distribution des marques principales
    logger.info("Top 10 des marques:")
    df.groupBy("brand").count().orderBy(desc("count")).limit(10).show()
    
    return df

# Exécuter le chargement et l'exploration
# if __name__ == "__main__":
#     spark = create_spark_session()
#     df = load_data(spark)
#     explore_data(df)

## Prétraitement des données e-commerce

In [34]:
def preprocess_data(df):
    """
    Prétraitement des données : extraction des caractéristiques temporelles,
    traitement des valeurs manquantes, etc.
    
    Args:
        df: DataFrame Spark à prétraiter
    
    Returns:
        DataFrame Spark prétraité
    """
    if df is None:
        logger.error("Les données n'ont pas été chargées")
        return None
    
    logger.info("Prétraitement des données")
    
    # 1. Extraction des caractéristiques temporelles
    logger.info("Extraction des caractéristiques temporelles")
    df_temp = df.withColumn("hour", hour("event_time")) \
        .withColumn("minute", minute("event_time")) \
        .withColumn("second", second("event_time")) \
        .withColumn("day", dayofmonth("event_time")) \
        .withColumn("month", month("event_time")) \
        .withColumn("dayofweek", dayofweek("event_time")) \
        .withColumn("date", date_format("event_time", "yyyy-MM-dd")) \
        .withColumn("hour_bucket", date_format("event_time", "yyyy-MM-dd HH:00:00"))
    
    # 2. Traitement des valeurs manquantes
    logger.info("Traitement des valeurs manquantes")
    df_temp = df_temp.withColumn(
        "category_code",
        when(col("category_code").isNull() | (col("category_code") == "NaN"),
            "unknown").otherwise(col("category_code"))
    )
    
    df_temp = df_temp.withColumn(
        "brand",
        when(col("brand").isNull(), "unknown").otherwise(col("brand"))
    )
    
    # 3. Nettoyage des prix (valeurs négatives ou nulles)
    logger.info("Nettoyage des prix")
    df_temp = df_temp.withColumn(
        "price",
        when(col("price").isNull() | (col("price") <= 0), None).otherwise(col("price"))
    )
    
    # Afficher un aperçu des données prétraitées
    logger.info("Aperçu des données prétraitées:")
    df_temp.show(5)
    
    # Afficher un résumé après prétraitement
    logger.info("Statistiques après prétraitement:")
    df_temp.describe().show()
    
    return df_temp

def compute_user_behavior(cleaned_df):
    """
    Calcul des métriques de comportement utilisateur pour la segmentation et l'analyse
    
    Args:
        cleaned_df: DataFrame Spark nettoyé
    
    Returns:
        DataFrame Spark des comportements utilisateur
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    logger.info("Calcul des métriques de comportement utilisateur")
    
    # Agrégation des comportements par utilisateur avec métriques enrichies pour la segmentation RFM
    user_behavior_df = cleaned_df.groupBy("user_id").agg(
        count("*").alias("nb_events"),
        sum(when(col("event_type") == "view", 1).otherwise(0)).alias("nb_views"),
        sum(when(col("event_type") == "cart", 1).otherwise(0)).alias("nb_carts"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("nb_purchases"),
        sum(when(col("event_type") == "remove_from_cart", 1).otherwise(0)).alias("nb_removes"),
        avg("price").alias("avg_price_viewed"),
        avg(when(col("event_type") == "purchase", col("price")).otherwise(None)).alias("avg_price_purchased"),
        countDistinct("user_session").alias("nb_sessions"),
        min("event_time").alias("first_seen"),
        max("event_time").alias("last_seen"),
        # Métriques pour la segmentation RFM
        datediff(lit("2019-10-31"), max("event_time")).alias("recency"),
        countDistinct(when(col("event_type") == "purchase", col("date")).otherwise(None)).alias("frequency"),
        sum(when(col("event_type") == "purchase", col("price")).otherwise(0)).alias("monetary"),
        # Liste des catégories consultées
        collect_list(when(col("event_type") == "view", col("category_code")).otherwise(None)).alias("viewed_categories"),
        # Liste des marques consultées
        collect_list(when(col("event_type") == "view", col("brand")).otherwise(None)).alias("viewed_brands")
    )
    
    # Calcul des métriques dérivées
    user_behavior_df = user_behavior_df.withColumn(
        "conversion_rate",
        when(col("nb_views") > 0, col("nb_purchases") / col("nb_views")).otherwise(0)
    )
    
    user_behavior_df = user_behavior_df.withColumn(
        "cart_abandonment",
        when(col("nb_carts") > 0, (col("nb_carts") - col("nb_purchases")) / col("nb_carts")).otherwise(0)
    )
    
    user_behavior_df = user_behavior_df.withColumn(
        "engagement_days",
        datediff(col("last_seen"), col("first_seen")) + 1
    )
    
    logger.info("Métriques de comportement utilisateur calculées")
    logger.info("Aperçu des comportements utilisateur:")
    user_behavior_df.show(5)
    
    return user_behavior_df

def prepare_recommendation_data(cleaned_df):
    """
    Prépare les données pour le système de recommandation
    
    Args:
        cleaned_df: DataFrame Spark nettoyé
    
    Returns:
        Tuple (recommandation_df, produit_df): DataFrames pour les systèmes de recommandation
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None, None
    
    logger.info("Préparation des données pour le système de recommandation")
    
    # DataFrame pour le filtrage collaboratif (ALS)
    recommandation_df = cleaned_df.filter(col("event_type").isin(["view", "cart", "purchase"])).select(
        "user_id",
        "product_id",
        "event_type",
        "price",
        "event_time",
        # Score implicite basé sur l'interaction
        when(col("event_type") == "view", 1)
        .when(col("event_type") == "cart", 5)
        .when(col("event_type") == "purchase", 10)
        .otherwise(0).alias("interaction_score")
    )
    
    # DataFrame des produits pour les recommandations basées sur le contenu
    produit_df = cleaned_df.select(
        "product_id", 
        "category_id", 
        "category_code", 
        "brand", 
        "price"
    ).distinct()
    
    logger.info("Aperçu des données de recommandation:")
    recommandation_df.show(5)
    logger.info("Aperçu des données produits:")
    produit_df.show(5)
    
    return recommandation_df, produit_df

def prepare_time_series_data(cleaned_df):
    """
    Prépare les données temporelles pour les analyses et simulations
    
    Args:
        cleaned_df: DataFrame Spark nettoyé
    
    Returns:
        DataFrame agrégé par heures pour analyses temporelles
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    logger.info("Préparation des données pour analyses temporelles")
    
    # Agrégation horaire des événements
    hourly_events = cleaned_df.groupBy("hour_bucket").agg(
        count("*").alias("total_events"),
        countDistinct("user_id").alias("unique_users"),
        sum(when(col("event_type") == "view", 1).otherwise(0)).alias("views"),
        sum(when(col("event_type") == "cart", 1).otherwise(0)).alias("carts"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases"),
        sum(when(col("event_type") == "remove_from_cart", 1).otherwise(0)).alias("removes"),
        avg("price").alias("avg_price")
    ).orderBy("hour_bucket")
    
    logger.info("Aperçu des données temporelles:")
    hourly_events.show(5)
    
    return hourly_events

# Exécuter le prétraitement
# if __name__ == "__main__":
#     spark = create_spark_session()
#     df = load_data(spark)
#     cleaned_df = preprocess_data(df)
#     user_behavior_df = compute_user_behavior(cleaned_df)


## Analyse et visualisation des données e-commerce

In [35]:
def generate_analytics(cleaned_df):
    """
    Génère des statistiques agrégées pour l'analyse
    
    Args:
        cleaned_df: DataFrame Spark nettoyé    
    Returns:
        Tuples de données pour la visualisation
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    logger.info("Génération des statistiques pour analyse")
    
    # 1. Distribution des types d'événements
    event_counts = cleaned_df.groupBy("event_type") \
        .count() \
        .orderBy(desc("count"))
    
    # Conversion en liste Python pour la visualisation
    event_counts_list = [(row['event_type'], row['count']) 
                         for row in event_counts.collect()]
    
    # 2. Distribution horaire des événements
    hourly_events = cleaned_df.groupBy("hour") \
        .count() \
        .orderBy("hour")
    
    hourly_events_list = [(row['hour'], row['count']) 
                          for row in hourly_events.collect()]
    
    # 3. Top catégories
    top_categories = cleaned_df.groupBy("category_code") \
        .count() \
        .orderBy(desc("count")) \
        .limit(10)
    
    top_categories_list = [(row['category_code'], row['count']) 
                           for row in top_categories.collect()]
    
    # 4. Top marques
    top_brands = cleaned_df.groupBy("brand") \
        .count() \
        .orderBy(desc("count")) \
        .limit(10)
    
    top_brands_list = [(row['brand'], row['count']) 
                       for row in top_brands.collect()]
    
    # 5. Statistiques de prix
    price_stats = cleaned_df.select("price").summary(
        "count", "mean", "stddev", "min", "25%", "50%", "75%", "max"
    )
    
    price_stats_list = [(row['summary'], row['price']) 
                         for row in price_stats.collect()]
    
    logger.info("Statistiques d'analyse générées")
    
    return (event_counts_list, hourly_events_list, 
            top_categories_list, top_brands_list, price_stats_list)

def visualize_data(analytics_data, output_dir=output_directory, show_plots=True):
    """
    Crée des visualisations à partir des données agrégées
    
    Args:
        analytics_data: Tuple de données d'analyse
            (event_counts, hourly_events, top_categories, top_brands, price_stats)
        output_dir: Répertoire de sortie pour les visualisations
    """
    # Déballage des données d'analyse
    event_counts, hourly_events, top_categories, top_brands, price_stats = analytics_data
    
    # Création du répertoire de visualisation
    vis_dir = os.path.join(output_dir, "visualizations")
    os.makedirs(vis_dir, exist_ok=True)
    
    # Configuration du style des graphiques
    sns.set(style="whitegrid")
    plt.rcParams['figure.figsize'] = (12, 6)
    
    # 1. Distribution des types d'événements
    plt.figure()
    ax = sns.barplot(x=[x[0] for x in event_counts], y=[x[1] for x in event_counts], palette="viridis")
    plt.title('Distribution des Types d\'Événements', fontsize=15)
    plt.ylabel('Nombre d\'événements', fontsize=12)
    plt.xlabel('Type d\'événement', fontsize=12)
    plt.xticks(rotation=45)
    
    # Ajouter les valeurs sur les barres
    for i, p in enumerate(ax.patches):
        ax.annotate(f'{int(p.get_height()):,}', 
                   (p.get_x() + p.get_width() / 2., p.get_height()),
                   ha = 'center', va = 'bottom', fontsize=10)
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "event_types_distribution.png"), dpi=300)
    if show_plots:
        plt.show()
    plt.close()
    logger.info(f"Graphique de distribution des types d'événements sauvegardé")
    
    # 2. Distribution horaire des événements
    plt.figure()
    ax = sns.lineplot(x=[x[0] for x in hourly_events], y=[x[1] for x in hourly_events], marker='o', linewidth=2.5)
    plt.title('Distribution Horaire des Événements', fontsize=15)
    plt.xlabel('Heure de la journée', fontsize=12)
    plt.ylabel('Nombre d\'événements', fontsize=12)
    
    # Formater l'axe des x pour afficher toutes les heures
    plt.xticks(range(0, 24))
    
    # Ajouter une grille
    plt.grid(True, linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "hourly_distribution.png"), dpi=300)
    if show_plots:
        plt.show()
    plt.close()
    logger.info("Graphique de distribution horaire sauvegardé")
    
    # 3. Top 10 des catégories
    plt.figure()
    category_names = [x[0] if len(str(x[0])) < 20 else str(x[0])[:17]+'...' for x in top_categories]
    ax = sns.barplot(x=[x[1] for x in top_categories], y=category_names, palette="viridis")
    plt.title('Top 10 des Catégories les Plus Consultées', fontsize=15)
    plt.xlabel('Nombre d\'événements', fontsize=12)
    
    # Ajouter les valeurs sur les barres
    for i, p in enumerate(ax.patches):
        ax.annotate(f'{int(p.get_width()):,}', 
                   (p.get_width(), p.get_y() + p.get_height() / 2.),
                   ha = 'left', va = 'center', fontsize=10, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "top_categories.png"), dpi=300)
    if show_plots:
        plt.show()
    plt.close()
    logger.info("Graphique des top catégories sauvegardé")
    
    # 4. Top 10 des marques
    plt.figure()
    ax = sns.barplot(x=[x[1] for x in top_brands], y=[x[0] for x in top_brands], palette="viridis")
    plt.title('Top 10 des Marques les Plus Consultées', fontsize=15)
    plt.xlabel('Nombre d\'événements', fontsize=12)
    
    # Ajouter les valeurs sur les barres
    for i, p in enumerate(ax.patches):
        ax.annotate(f'{int(p.get_width()):,}', 
                   (p.get_width(), p.get_y() + p.get_height() / 2.),
                   ha = 'left', va = 'center', fontsize=10, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "top_brands.png"), dpi=300)
    if show_plots:
        plt.show()
    plt.close()
    logger.info("Graphique des top marques sauvegardé")
    
    # 5. Distribution des prix (boîte à moustaches)
    plt.figure()    
    logger.info(f"Toutes les visualisations ont été sauvegardées dans {vis_dir}")
    
    return vis_dir

In [36]:
def analyze_purchase_funnel(cleaned_df):
    """
    Analyse le funnel de conversion des utilisateurs (parcours d'achat)
    
    Args:
        cleaned_df: DataFrame Spark nettoyé
    
    Returns:
        DataFrame contenant les métriques du funnel de conversion
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    logger.info("Analyse du funnel de conversion")
    
    # Comptage des événements par type pour le funnel global
    funnel_global = cleaned_df.groupBy("event_type") \
        .count() \
        .orderBy(desc("count"))
    
    # Analyse du funnel par utilisateur (combien atteignent chaque étape)
    user_funnel = cleaned_df.groupBy("user_id").agg(
        sum(when(col("event_type") == "view", 1).otherwise(0)).alias("views"),
        sum(when(col("event_type") == "cart", 1).otherwise(0)).alias("cart_additions"),
        sum(when(col("event_type") == "remove_from_cart", 1).otherwise(0)).alias("cart_removals"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
    )
    
    # Calcul des métriques du funnel
    funnel_metrics = user_funnel.agg(
        count("user_id").alias("total_users"),
        sum(when(col("views") > 0, 1).otherwise(0)).alias("users_with_views"),
        sum(when(col("cart_additions") > 0, 1).otherwise(0)).alias("users_with_cart"),
        sum(when(col("purchases") > 0, 1).otherwise(0)).alias("users_with_purchase")
    )
    
    # Ajout des taux de conversion entre étapes
    funnel_metrics = funnel_metrics.withColumn(
        "view_to_cart_rate", 
        col("users_with_cart") / col("users_with_views") * 100
    ).withColumn(
        "cart_to_purchase_rate", 
        col("users_with_purchase") / col("users_with_cart") * 100
    ).withColumn(
        "view_to_purchase_rate", 
        col("users_with_purchase") / col("users_with_views") * 100
    )
    
    # Analyse du funnel par catégorie
    funnel_by_category = cleaned_df.filter(col("category_code") != "unknown") \
        .groupBy("category_code").agg(
            sum(when(col("event_type") == "view", 1).otherwise(0)).alias("views"),
            sum(when(col("event_type") == "cart", 1).otherwise(0)).alias("cart_additions"),
            sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchases")
        ).withColumn(
            "view_to_purchase_rate",
            when(col("views") > 0, col("purchases") / col("views") * 100).otherwise(0)
        ).orderBy(desc("view_to_purchase_rate"))
    
    logger.info("Funnel de conversion analysé avec succès")
    logger.info("Aperçu des métriques du funnel:")
    funnel_metrics.show()
    
    logger.info("Top catégories par taux de conversion:")
    funnel_by_category.filter(col("views") > 100).select(
        "category_code", "views", "purchases", "view_to_purchase_rate"
    ).orderBy(desc("view_to_purchase_rate")).limit(10).show()
    
    return funnel_metrics, funnel_by_category

def analyze_session_patterns(cleaned_df):
    """
    Analyse les patterns de session des utilisateurs
    
    Args:
        cleaned_df: DataFrame Spark nettoyé
        
    Returns:
        DataFrame contenant les métriques de session
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    logger.info("Analyse des patterns de session utilisateurs")
    
    # Statistiques par session
    session_stats = cleaned_df.groupBy("user_id", "user_session").agg(
        count("*").alias("session_events"),
        min("event_time").alias("session_start"),
        max("event_time").alias("session_end"),
        sum(when(col("event_type") == "view", 1).otherwise(0)).alias("session_views"),
        sum(when(col("event_type") == "cart", 1).otherwise(0)).alias("session_carts"),
        sum(when(col("event_type") == "purchase", 1).otherwise(0)).alias("session_purchases"),
        countDistinct("product_id").alias("unique_products_viewed"),
        countDistinct("category_code").alias("unique_categories_viewed")
    )
    
    # Calculer la durée des sessions en minutes
    session_stats = session_stats.withColumn(
        "session_duration_minutes", 
        (unix_timestamp("session_end") - unix_timestamp("session_start")) / 60
    )
    
    # Calculer si la session a abouti à une conversion
    session_stats = session_stats.withColumn(
        "converted", 
        when(col("session_purchases") > 0, 1).otherwise(0)
    )
    
    # Métrique d'engagement (score simple)
    session_stats = session_stats.withColumn(
        "engagement_score",
        col("session_views") + col("session_carts") * 2 + col("session_purchases") * 5
    )
    
    # Statistiques agrégées globales
    session_metrics = session_stats.agg(
        count("*").alias("total_sessions"),
        avg("session_events").alias("avg_events_per_session"),
        avg("session_duration_minutes").alias("avg_session_duration_minutes"),
        avg("unique_products_viewed").alias("avg_products_per_session"),
        avg("unique_categories_viewed").alias("avg_categories_per_session"),
        sum("converted") / count("*") * 100 .alias("session_conversion_rate"),
        avg("engagement_score").alias("avg_engagement_score")
    )
    
    # Distribution des durées de session
    session_durations = session_stats.select(
        when(col("session_duration_minutes") < 1, "< 1 min")
        .when(col("session_duration_minutes") < 5, "1-5 mins")
        .when(col("session_duration_minutes") < 15, "5-15 mins")
        .when(col("session_duration_minutes") < 30, "15-30 mins")
        .when(col("session_duration_minutes") < 60, "30-60 mins")
        .otherwise("> 60 mins").alias("duration_bucket")
    ).groupBy("duration_bucket").count().orderBy("duration_bucket")
    
    logger.info("Patterns de session analysés avec succès")
    logger.info("Métriques des sessions:")
    session_metrics.show()
    
    logger.info("Distribution des durées de session:")
    session_durations.show()
    
    return session_metrics, session_durations, session_stats

def visualize_funnel_data(funnel_metrics, funnel_by_category, vis_dir):
    """
    Création de visualisations pour le funnel de conversion
    
    Args:
        funnel_metrics: DataFrame des métriques du funnel
        funnel_by_category: DataFrame du funnel par catégorie
        vis_dir: Répertoire de sortie pour les visualisations
    """
    # Graphique du funnel de conversion
    plt.figure(figsize=(12, 8))
    funnel_data = funnel_metrics.collect()[0]
    
    funnel_stages = ['Users with Views', 'Users with Cart', 'Users with Purchase']
    funnel_values = [funnel_data['users_with_views'], funnel_data['users_with_cart'], funnel_data['users_with_purchase']]
    
    colors = ["#2ca02c", "#ff7f0e", "#1f77b4"]
    
    # Créer le graphique en entonnoir
    plt.bar(funnel_stages, funnel_values, width=0.5, color=colors, edgecolor='black')
    
    # Ajouter les valeurs et taux de conversion sur les barres
    for i, (stage, value) in enumerate(zip(funnel_stages, funnel_values)):
        plt.text(i, value + (max(funnel_values) * 0.01), f"{int(value):,}", 
                 ha='center', va='bottom', fontsize=12, fontweight='bold')
        
        if i > 0:
            conversion_rate = funnel_values[i] / funnel_values[i-1] * 100
            plt.text(i - 0.5, (funnel_values[i-1] + funnel_values[i]) / 2, 
                     f"{conversion_rate:.1f}%", ha='center', va='center',
                     fontsize=12, fontweight='bold', bbox=dict(facecolor='white', alpha=0.7))
    
    plt.title('Funnel de Conversion', fontsize=16)
    plt.ylabel('Nombre d\'utilisateurs', fontsize=14)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "conversion_funnel.png"), dpi=300)
    plt.close()
    logger.info("Graphique du funnel de conversion sauvegardé")
    
    # Top catégories par taux de conversion
    top_categories = funnel_by_category.filter(col("views") > 100) \
        .select("category_code", "view_to_purchase_rate") \
        .orderBy(desc("view_to_purchase_rate")) \
        .limit(10).collect()
    
    plt.figure(figsize=(12, 6))
    category_names = [row['category_code'] if len(str(row['category_code'])) < 20 
                      else str(row['category_code'])[:17]+'...' for row in top_categories]
    conversion_rates = [row['view_to_purchase_rate'] for row in top_categories]
    
    ax = sns.barplot(x=conversion_rates, y=category_names, palette="viridis")
    plt.title('Top 10 des Catégories par Taux de Conversion (Vue → Achat)', fontsize=15)
    plt.xlabel('Taux de Conversion (%)', fontsize=12)
    
    # Ajouter les valeurs sur les barres
    for i, p in enumerate(ax.patches):
        ax.annotate(f'{p.get_width():.2f}%', 
                   (p.get_width(), p.get_y() + p.get_height() / 2.),
                   ha='left', va='center', fontsize=10, fontweight='bold')
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "top_categories_conversion.png"), dpi=300)
    plt.close()
    logger.info("Graphique des taux de conversion par catégorie sauvegardé")

def visualize_session_data(session_metrics, session_durations, session_stats, vis_dir):
    """
    Création de visualisations pour les patterns de session
    
    Args:
        session_metrics: DataFrame des métriques de session agrégées
        session_durations: DataFrame de distribution des durées de session
        session_stats: DataFrame des statistiques détaillées par session
        vis_dir: Répertoire de sortie pour les visualisations
    """
    # Distribution des durées de session
    durations_data = session_durations.orderBy("duration_bucket").collect()
    
    # Définir l'ordre correct des buckets de durée
    duration_order = ["< 1 min", "1-5 mins", "5-15 mins", "15-30 mins", "30-60 mins", "> 60 mins"]
    
    # Préparer les données pour le graphique
    durations_dict = {row['duration_bucket']: row['count'] for row in durations_data}
    ordered_durations = [durations_dict.get(bucket, 0) for bucket in duration_order]
    
    plt.figure(figsize=(12, 6))
    ax = sns.barplot(x=duration_order, y=ordered_durations, palette="viridis")
    plt.title('Distribution des Durées de Session', fontsize=15)
    plt.xlabel('Durée de Session', fontsize=12)
    plt.ylabel('Nombre de Sessions', fontsize=12)
    
    # Ajouter les valeurs sur les barres
    for i, p in enumerate(ax.patches):
        ax.annotate(f'{int(p.get_height()):,}', 
                   (p.get_x() + p.get_width() / 2., p.get_height()),
                   ha='center', va='bottom', fontsize=10)
    
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "session_duration_distribution.png"), dpi=300)
    plt.close()
    logger.info("Graphique de distribution des durées de session sauvegardé")
    
    # Relation entre engagement et conversion
    engagement_data = session_stats.select(
        "engagement_score", "converted", "session_duration_minutes"
    ).filter(col("session_duration_minutes") < 60).collect()  # Filtrer les sessions extrêmement longues
    
    engagement_scores = [row['engagement_score'] for row in engagement_data]
    conversion_status = [row['converted'] for row in engagement_data]
    durations = [row['session_duration_minutes'] for row in engagement_data]
    
    plt.figure(figsize=(12, 6))
    plt.scatter(engagement_scores, durations, c=conversion_status, cmap='viridis', 
                alpha=0.5, edgecolors='none')
    
    plt.colorbar(label='Conversion (1=Oui, 0=Non)')
    plt.title('Relation entre Engagement, Durée de Session et Conversion', fontsize=15)
    plt.xlabel('Score d\'Engagement', fontsize=12)
    plt.ylabel('Durée de Session (minutes)', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig(os.path.join(vis_dir, "engagement_vs_conversion.png"), dpi=300)
    plt.close()
    logger.info("Graphique de relation engagement-conversion sauvegardé")

## Sauvegarde des données traitées avec Spark

In [37]:
def save_processed_data(cleaned_df, user_behavior_df=None, recommendation_df=None, 
                        product_df=None, time_series_df=None, output_dir=output_directory):
    """
    Sauvegarde des données prétraitées dans différents formats
    
    Args:
        cleaned_df: DataFrame Spark principal prétraité
        user_behavior_df: DataFrame des comportements utilisateur (optionnel)
        recommendation_df: DataFrame pour recommandations (optionnel)
        product_df: DataFrame produits (optionnel)
        time_series_df: DataFrame séries temporelles (optionnel)
        output_dir: Répertoire de sortie pour les données traitées
    
    Returns:
        Dictionnaire des chemins de fichiers sauvegardés
    """
    if cleaned_df is None:
        logger.error("Les données n'ont pas été prétraitées")
        return None
    
    # Convertir en chemins absolus avec slashes normaux
    output_dir = os.path.abspath(output_dir).replace("\\", "/")
    
    # Création des sous-répertoires
    parquet_dir = os.path.join(output_dir, "parquet").replace("\\", "/")
    os.makedirs(parquet_dir, exist_ok=True)
    
    timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Chemins des fichiers avec slashes normaux
    output_paths = {
        "cleaned_data": f"{parquet_dir}/cleaned_data_{timestamp_str}.parquet",
        "user_behavior": f"{parquet_dir}/user_behavior_{timestamp_str}.parquet" if user_behavior_df is not None else None,
        "recommendation_data": f"{parquet_dir}/recommendation_data_{timestamp_str}.parquet" if recommendation_df is not None else None,
        "product_data": f"{parquet_dir}/product_data_{timestamp_str}.parquet" if product_df is not None else None,
        "time_series_data": f"{parquet_dir}/time_series_data_{timestamp_str}.parquet" if time_series_df is not None else None
    }

    # Sauvegarde des données nettoyées principales
    logger.info(f"Sauvegarde des données nettoyées: {output_paths['cleaned_data']}")
    try:
        cleaned_df.write.mode("overwrite").format("parquet").save(output_paths["cleaned_data"])
        logger.info("Données nettoyées sauvegardées")
    except Exception as e:
        logger.error(f"Erreur lors de la sauvegarde des données nettoyées: {str(e)}")
    
    # Sauvegarde des comportements utilisateur
    if user_behavior_df is not None:
        logger.info(f"Sauvegarde des comportements utilisateur: {output_paths['user_behavior']}")
        try:
            user_behavior_df.write.mode("overwrite").format("parquet").save(output_paths["user_behavior"])
            logger.info("Comportements utilisateur sauvegardés")
        except Exception as e:
            logger.error(f"Erreur lors de la sauvegarde des comportements utilisateur: {str(e)}")
    
    # Sauvegarde des données de recommandation
    if recommendation_df is not None:
        logger.info(f"Sauvegarde des données de recommandation: {output_paths['recommendation_data']}")
        try:
            recommendation_df.write.mode("overwrite").format("parquet").save(output_paths["recommendation_data"])
            logger.info("Données de recommandation sauvegardées")
        except Exception as e:
            logger.error(f"Erreur lors de la sauvegarde des données de recommandation: {str(e)}")
    
    # Sauvegarde des données produits
    if product_df is not None:
        logger.info(f"Sauvegarde des données produits: {output_paths['product_data']}")
        try:
            product_df.write.mode("overwrite").format("parquet").save(output_paths["product_data"])
            logger.info("Données produits sauvegardées")
        except Exception as e:
            logger.error(f"Erreur lors de la sauvegarde des données produits: {str(e)}")
    
    # Sauvegarde des données temporelles
    if time_series_df is not None:
        logger.info(f"Sauvegarde des données temporelles: {output_paths['time_series_data']}")
        try:
            time_series_df.write.mode("overwrite").format("parquet").save(output_paths["time_series_data"])
            logger.info("Données temporelles sauvegardées")
        except Exception as e:
            logger.error(f"Erreur lors de la sauvegarde des données temporelles: {str(e)}")
    
    logger.info(f"Toutes les données ont été sauvegardées dans {output_dir}")
    return output_paths

# Exécuter la sauvegarde des données
# save_processed_data(cleaned_df, user_behavior_df)


## pipeline d'execution

In [None]:
def main():
    """
    Fonction principale exécutant tout le pipeline de prétraitement
    """
    # Création de la session Spark
    spark = create_spark_session()
    
    # Chargement des données
    raw_df = load_data(spark)
    
    #visualisation des données
    explore_data(raw_df)

    # Nettoyage et prétraitement
    cleaned_df = preprocess_data(raw_df)
    
    # Calcul des métriques utilisateur pour segmentation
    user_behavior_df = compute_user_behavior(cleaned_df)
    
    # Préparation des données pour recommandation
    recommendation_df, product_df = prepare_recommendation_data(cleaned_df)
    
    # Préparation des données temporelles pour simulation
    time_series_df = prepare_time_series_data(cleaned_df)
    
    # Analyse du funnel de conversion
    # funnel_metrics, funnel_by_category = analyze_purchase_funnel(cleaned_df)

    # Analyse des patterns de session
    # session_metrics, session_durations, session_stats = analyze_session_patterns(cleaned_df)

    # # Modification de la partie visualize_data
    # analytics_data = generate_analytics(cleaned_df)
    # if analytics_data:
    #     vis_path = visualize_data(analytics_data, show_plots=True)
    #     # Ajout des nouvelles visualisations
    #     visualize_funnel_data(funnel_metrics, funnel_by_category, vis_path)
    #     visualize_session_data(session_metrics, session_durations, session_stats, vis_path)
    #     print(f"Visualisations générées dans : {vis_path}")
    # Sauvegarde de tous les DataFrames générés
    save_processed_data(
        cleaned_df=cleaned_df,
        user_behavior_df=user_behavior_df,
        recommendation_df=recommendation_df,
        product_df=product_df,
        time_series_df=time_series_df
    )
    
    logger.info("Pipeline de prétraitement terminé avec succès")
    
    # Arrêt de la session Spark
    spark.stop()

# Si exécuté comme script principal
if __name__ == "__main__":
    main()

2025-05-18 15:57:24,468 - INFO - Chargement des données depuis ../data/2019-Oct_reduit.csv
2025-05-18 15:57:24,843 - INFO - Données chargées : 1000000 lignes
2025-05-18 15:57:24,844 - INFO - Aperçu des données:
2025-05-18 15:57:24,931 - INFO - Exploration initiale des données


+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:04|      view|   1004237|2053013555631882655|electr

2025-05-18 15:57:25,250 - INFO - Nombre de lignes: 1000000
2025-05-18 15:57:25,254 - INFO - Nombre de colonnes: 9
2025-05-18 15:57:25,255 - INFO - Schéma du DataFrame:
2025-05-18 15:57:25,256 - INFO - Statistiques descriptives:


root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_session: string (nullable = true)



2025-05-18 15:57:30,136 - INFO - Valeurs manquantes par colonne:


+-------+----------+--------------------+--------------------+-------------------+--------+------------------+--------------------+--------------------+
|summary|event_type|          product_id|         category_id|      category_code|   brand|             price|             user_id|        user_session|
+-------+----------+--------------------+--------------------+-------------------+--------+------------------+--------------------+--------------------+
|  count|   1000000|             1000000|             1000000|             681869|  852440|           1000000|             1000000|             1000000|
|   mean|      NULL|   1.0347992085779E7|2.056347726779049...|               NULL|Infinity|295.98247050999464|   5.3127631576035E8|                NULL|
| stddev|      NULL|1.1238269023409404E7|1.579787912319246...|               NULL|     NaN| 368.2165155182077|1.6673329959351553E7|                NULL|
|    min|      cart|             1001588| 2053013552226107603|    accessories.bag|

2025-05-18 15:57:31,699 - INFO - event_time: 0 (0.00%)
2025-05-18 15:57:32,523 - INFO - event_type: 0 (0.00%)
2025-05-18 15:57:33,271 - INFO - product_id: 0 (0.00%)
2025-05-18 15:57:33,913 - INFO - category_id: 0 (0.00%)
2025-05-18 15:57:34,541 - INFO - category_code: 318131 (31.81%)
2025-05-18 15:57:35,108 - INFO - brand: 147560 (14.76%)
2025-05-18 15:57:35,869 - INFO - price: 0 (0.00%)
2025-05-18 15:57:36,422 - INFO - user_id: 0 (0.00%)
2025-05-18 15:57:37,079 - INFO - user_session: 0 (0.00%)
2025-05-18 15:57:37,080 - INFO - Distribution des types d'événements:
2025-05-18 15:57:37,999 - INFO - Top 10 des catégories:


+----------+------+
|event_type| count|
+----------+------+
|      view|968513|
|  purchase| 16848|
|      cart| 14639|
+----------+------+



2025-05-18 15:57:39,220 - INFO - Top 10 des marques:


+--------------------+------+
|       category_code| count|
+--------------------+------+
|                NULL|318131|
|electronics.smart...|274622|
|  electronics.clocks| 35394|
|  computers.notebook| 28913|
|electronics.audio...| 26397|
|electronics.video.tv| 22019|
|appliances.kitche...| 17964|
|appliances.enviro...| 17285|
|       apparel.shoes| 16116|
|appliances.kitche...| 16098|
+--------------------+------+



2025-05-18 15:57:40,998 - INFO - Prétraitement des données
2025-05-18 15:57:40,999 - INFO - Extraction des caractéristiques temporelles
2025-05-18 15:57:41,074 - INFO - Traitement des valeurs manquantes
2025-05-18 15:57:41,095 - INFO - Nettoyage des prix
2025-05-18 15:57:41,107 - INFO - Aperçu des données prétraitées:


+-------+------+
|  brand| count|
+-------+------+
|   NULL|147560|
|samsung|120715|
|  apple|103067|
| xiaomi| 69246|
| huawei| 28013|
|lucente| 16948|
|  bosch| 12402|
|     lg| 10921|
|   acer|  9802|
|   sony|  9561|
+-------+------+



2025-05-18 15:57:41,266 - INFO - Statistiques après prétraitement:


+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+------+------+---+-----+---------+----------+-------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|hour|minute|second|day|month|dayofweek|      date|        hour_bucket|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+----+------+------+---+-----+---------+----------+-------------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|             unknown|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|   0|     0|     0|  1|   10|        3|2019-10-01|2019-10-01 00:00:00|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|   0|     0|     0|  1|   10|        3|2019-10-01|2019-1

2025-05-18 15:57:51,966 - INFO - Calcul des métriques de comportement utilisateur
2025-05-18 15:57:52,098 - INFO - Métriques de comportement utilisateur calculées
2025-05-18 15:57:52,099 - INFO - Aperçu des comportements utilisateur:


+-------+----------+--------------------+--------------------+---------------+--------+-----------------+--------------------+--------------------+-----------------+------------------+------------------+-------+-------+--------------------+----------+-------------------+
|summary|event_type|          product_id|         category_id|  category_code|   brand|            price|             user_id|        user_session|             hour|            minute|            second|    day|  month|           dayofweek|      date|        hour_bucket|
+-------+----------+--------------------+--------------------+---------------+--------+-----------------+--------------------+--------------------+-----------------+------------------+------------------+-------+-------+--------------------+----------+-------------------+
|  count|   1000000|             1000000|             1000000|        1000000| 1000000|           998393|             1000000|             1000000|          1000000|           1000000|

2025-05-18 15:58:01,572 - INFO - Préparation des données pour le système de recommandation
2025-05-18 15:58:01,618 - INFO - Aperçu des données de recommandation:
2025-05-18 15:58:01,715 - INFO - Aperçu des données produits:


+---------+---------+--------+--------+------------+----------+----------------+-------------------+-----------+-------------------+-------------------+-------+---------+--------+--------------------+--------------------+---------------+----------------+---------------+
|  user_id|nb_events|nb_views|nb_carts|nb_purchases|nb_removes|avg_price_viewed|avg_price_purchased|nb_sessions|         first_seen|          last_seen|recency|frequency|monetary|   viewed_categories|       viewed_brands|conversion_rate|cart_abandonment|engagement_days|
+---------+---------+--------+--------+------------+----------+----------------+-------------------+-----------+-------------------+-------------------+-------+---------+--------+--------------------+--------------------+---------------+----------------+---------------+
|318145786|        2|       2|       0|           0|         0|          35.075|               NULL|          1|2019-10-01 10:32:50|2019-10-01 10:33:18|     30|        0|     0.0|  [unkno

2025-05-18 15:58:03,959 - INFO - Préparation des données pour analyses temporelles
2025-05-18 15:58:04,035 - INFO - Aperçu des données temporelles:


+----------+-------------------+--------------------+-------+------+
|product_id|        category_id|       category_code|  brand| price|
+----------+-------------------+--------------------+-------+------+
|  26200210|2053013563693335403|             unknown|unknown| 98.84|
|  21402696|2053013561579406073|  electronics.clocks|unknown| 24.97|
|   3100871|2053013555262783879|appliances.kitche...|  vitek| 23.14|
|   1480608|2053013561092866779|   computers.desktop| pulser|380.94|
|  37800028|2078957461921858354|             unknown|carters| 31.02|
+----------+-------------------+--------------------+-------+------+
only showing top 5 rows



2025-05-18 15:58:07,050 - INFO - Sauvegarde des données nettoyées: c:/Users/Acer_M/Documents/DIC2/bigdata/projet/customer_recommendation_segmentation/data/processed/parquet/cleaned_data_20250518_155807.parquet


+-------------------+------------+------------+-----+-----+---------+-------+------------------+
|        hour_bucket|total_events|unique_users|views|carts|purchases|removes|         avg_price|
+-------------------+------------+------------+-----+-----+---------+-------+------------------+
|2019-10-01 00:00:00|        1083|         383| 1070|    3|       10|      0| 303.2282086795937|
|2019-10-01 01:00:00|         121|         102|  121|    0|        0|      0|327.07438016528926|
|2019-10-01 02:00:00|       22886|        5378|22326|  244|      316|      0| 289.5163129346923|
|2019-10-01 03:00:00|       49409|       10514|47951|  613|      845|      0|283.44011712259385|
|2019-10-01 04:00:00|       55290|       11933|53390|  879|     1021|      0| 288.2792449345431|
+-------------------+------------+------------+-----+-----+---------+-------+------------------+
only showing top 5 rows



2025-05-18 15:58:10,919 - INFO - Données nettoyées sauvegardées
2025-05-18 15:58:10,920 - INFO - Sauvegarde des comportements utilisateur: c:/Users/Acer_M/Documents/DIC2/bigdata/projet/customer_recommendation_segmentation/data/processed/parquet/user_behavior_20250518_155807.parquet
2025-05-18 15:58:19,563 - INFO - Comportements utilisateur sauvegardés
2025-05-18 15:58:19,565 - INFO - Sauvegarde des données de recommandation: c:/Users/Acer_M/Documents/DIC2/bigdata/projet/customer_recommendation_segmentation/data/processed/parquet/recommendation_data_20250518_155807.parquet
2025-05-18 15:58:21,273 - INFO - Données de recommandation sauvegardées
2025-05-18 15:58:21,275 - INFO - Sauvegarde des données produits: c:/Users/Acer_M/Documents/DIC2/bigdata/projet/customer_recommendation_segmentation/data/processed/parquet/product_data_20250518_155807.parquet
2025-05-18 15:58:23,077 - INFO - Données produits sauvegardées
2025-05-18 15:58:23,078 - INFO - Sauvegarde des données temporelles: c:/Users