In [5]:
import os
import glob
import dask.dataframe as dd
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from dask.distributed import Client
from dask.utils import is_dataframe_like  # Importation corrigée

# Configuration
DATA_PATH = "../data/cleaned"
OUTPUT_PATH = "../output/user_features_enhanced.parquet"
SELECTED_CATEGORIES = {"electronics", "computers", "sport", "kids"}
CHUNK_SIZE = "100MB"  # Ajustez en fonction de votre mémoire

def load_parquet_files():
    print(" Chargement des fichiers Parquet...")
    files = glob.glob(os.path.join(DATA_PATH, "*.parquet"))
    if not files:
        raise FileNotFoundError(f"Aucun fichier .parquet trouvé dans {DATA_PATH}")
    cols = ["user_id", "event_type", "event_time", "category_code"]
    df = dd.read_parquet(files, columns=cols, engine="pyarrow", chunksize=CHUNK_SIZE)
    df = df[df["event_type"].isin(["view", "purchase"])]
    df["category_main"] = df["category_code"].str.split(".").str[0]
    df = df[df["category_main"].isin(SELECTED_CATEGORIES)]
    print("✅ Fichiers Parquet chargés et filtrés !")
    return df

def generate_user_features_enhanced():
    print("✨ Génération des features utilisateur améliorées...")
    df = load_parquet_files()

    # Gestion du type datetime
    print("⏳ Vérification et conversion du type datetime...")
    if not is_dataframe_like(df["event_time"]) or not dd.api.types.is_datetime64_dtype(df["event_time"].dtype):
        df["event_time"] = dd.to_datetime(df["event_time"], errors='coerce')
    df = df.dropna(subset=["event_time"])
    print("✅ Type datetime vérifié et converti !")

    # Calcul des métriques de base
    print(" Calcul des métriques de base...")
    total_events = df.groupby("user_id").size().compute()
    total_views = df[df["event_type"] == "view"].groupby("user_id").size().compute()
    total_purchases = df[df["event_type"] == "purchase"].groupby("user_id").size().compute()
    unique_categories = df.groupby("user_id")["category_main"].nunique().compute()
    last_event_time = df.groupby("user_id")["event_time"].max().compute()
    print("✅ Métriques de base calculées !")

    # Calcul des métriques temporelles
    print("⏱️ Calcul des métriques temporelles...")
    df["month"] = df["event_time"].dt.month
    monthly_purchases = df[df["event_type"] == "purchase"].groupby(["user_id", "month"]).size().unstack(fill_value=0).compute()
    time_features = pd.concat([last_event_time, monthly_purchases], axis=1, sort=False).fillna(0)
    print("✅ Métriques temporelles calculées !")

    # Calcul des préférences par catégorie
    print(" Calcul des préférences par catégorie...")
    category_views = df[df["event_type"] == "view"].groupby(["user_id", "category_main"]).size().unstack(fill_value=0).compute()
    category_purchases = df[df["event_type"] == "purchase"].groupby(["user_id", "category_main"]).size().unstack(fill_value=0).compute()
    category_features = pd.concat([category_views, category_purchases], axis=1, sort=False).fillna(0)
    print("✅ Préférences par catégorie calculées !")

    # Fusion de toutes les features
    print(" Fusion de toutes les features...")
    user_features = pd.concat([total_events, total_views, total_purchases, unique_categories, time_features, category_features], axis=1, sort=False).fillna(0)
    user_features["conversion_rate"] = user_features["total_purchases"] / user_features["total_views"]
    user_features["conversion_rate"] = user_features["conversion_rate"].fillna(0)
    print("✅ Features fusionnées et taux de conversion calculé !")

    # Sauvegarde en Parquet
    print(f" Sauvegarde des features dans {OUTPUT_PATH}...")
    os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
    user_features.to_parquet(OUTPUT_PATH, engine="pyarrow")
    print("✅ Features utilisateur améliorées sauvegardées !")
    return user_features

def cluster_users(user_features):
    print(" Clustering des utilisateurs...")
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(user_features)
    kmeans = KMeans(n_clusters=5, random_state=42)
    clusters = kmeans.fit_predict(scaled_features)
    user_features["cluster"] = clusters
    print("✅ Clustering terminé !")
    return user_features

def analyze_sales_by_cluster(user_features):
    print(" Analyse des ventes par cluster...")
    monthly_sales = user_features.groupby("cluster")[[col for col in user_features.columns if isinstance(col, int)]].sum()
    print(" Ventes mensuelles par cluster :")
    print(monthly_sales)
    category_cols = [col for col in user_features.columns if isinstance(col, str) and col in SELECTED_CATEGORIES]
    category_sales = user_features.groupby("cluster")[category_cols].sum()
    print(" Ventes par catégorie et par cluster :")
    print(category_sales)
    print("✅ Analyse des ventes terminée !")

if __name__ == "__main__":
    client = Client(n_workers=4, threads_per_worker=1)
    user_features = generate_user_features_enhanced()
    if user_features is not None:
        user_features = cluster_users(user_features)
        analyze_sales_by_cluster(user_features)
    client.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 61151 instead


✨ Génération des features utilisateur améliorées...
 Chargement des fichiers Parquet...
✅ Fichiers Parquet chargés et filtrés !
⏳ Vérification et conversion du type datetime...
✅ Type datetime vérifié et converti !
 Calcul des métriques de base...


2025-03-13 09:17:21,894 - distributed.scheduler - ERROR - Task ('read_parquet-fused-operation-e4aa2ff58ac818a5f27b942dc8f1ff57', 24) marked as failed because 4 workers died while trying to run it


KilledWorker: Attempted to run task ('read_parquet-fused-operation-e4aa2ff58ac818a5f27b942dc8f1ff57', 24) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:61476. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.