In [None]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark import Session

# Définir ta configuration
connection_parameters = {
    "account": "RSYIXFD-HT53341.snowflakecomputing.com",   # ex: "xy12345.eu-central-1"
    "user": "<your-username>",
    "password": "<your-password>",
    "warehouse": "COMPUTE_WH",
    "database": "RAW_DATA",
    "schema": "AMAZING_DATA"
}

# Créer la session
session = Session.builder.configs(connection_parameters).create()

In [None]:
df = session.table("RAW_DATA.AMAZING_DATA.CLIENT_EVENTS")

In [None]:
df.show(10)

In [None]:
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import IntegerType
import numpy as np

# 1. Conversion du timestamp
df = df.with_column("timestamp", F.to_timestamp(F.col("EVENT_TIME")))

# 2. Extraction des composantes temporelles
df = (
    df.with_column("hour", F.hour("timestamp"))
      .with_column("day", F.dayofweek("timestamp"))
)

# 3. Catégorisation des moments de la journée - VERSION OPTIMISÉE SQL
df = df.with_column(
    "time_period",
    F.when((F.col("hour") >= 6) & (F.col("hour") < 12), F.lit("morning"))
     .when((F.col("hour") >= 12) & (F.col("hour") < 18), F.lit("afternoon"))  
     .when((F.col("hour") >= 18) & (F.col("hour") < 24), F.lit("evening"))
     .otherwise(F.lit("night"))
)

# 4. Encodage one-hot des types d'événements
df = (
    df.with_column("EVENT_TYPE_view", F.when(F.col("EVENT_TYPE")=="view", 1).otherwise(0))
      .with_column("EVENT_TYPE_cart", F.when(F.col("EVENT_TYPE")=="cart", 1).otherwise(0))
      .with_column("EVENT_TYPE_purchase", F.when(F.col("EVENT_TYPE")=="purchase", 1).otherwise(0))
)

# 5. Agrégats principaux
agg_behaviour = (
    df.group_by("USER_ID")
      .agg(
          F.sum("EVENT_TYPE_view").alias("total_views"),
          F.sum("EVENT_TYPE_cart").alias("total_cart"),
          F.sum("EVENT_TYPE_purchase").alias("total_purchase"),
          F.count_distinct("CATEGORY_ID").alias("unique_categories"),
          F.max("timestamp").alias("last_activity"),
          F.count("*").alias("total_events")  # Ajouté ici pour éviter un recalcul
      )
)

# 6. Total dépensé sur les achats
total_spent = (
    df.filter(F.col("EVENT_TYPE_purchase") == 1)
      .group_by("USER_ID")
      .agg(F.round(F.sum("PRICE"), 2).alias("total_spent"))
)

agg_behaviour = (
    agg_behaviour.join(total_spent, on="USER_ID", how="left")
                 .with_column("total_spent", F.coalesce(F.col("total_spent"), F.lit(0)))
)

# 7. KPIs dérivés
agg_behaviour = (
    agg_behaviour
    .with_column(
        "avg_basket",
        F.when(F.col("total_purchase") == 0, F.lit(0.0))
         .otherwise(F.round(F.col("total_spent") / F.col("total_purchase"), 2))
    )
    .with_column(
        "conversion_rate",
        F.when(F.col("total_views") == 0, F.lit(0.0))
         .otherwise(F.round(F.col("total_purchase") / F.col("total_views"), 4))
    )
)

# 8. Répartition par moments de la journée - VERSION ULTRA OPTIMISÉE
# Une seule requête pour tout calculer
time_ratios = (
    df.group_by("USER_ID")
      .agg(
          # Compter directement chaque période
          F.sum(F.when(F.col("time_period") == "morning", 1).otherwise(0)).alias("morning_count"),
          F.sum(F.when(F.col("time_period") == "afternoon", 1).otherwise(0)).alias("afternoon_count"),
          F.sum(F.when(F.col("time_period") == "evening", 1).otherwise(0)).alias("evening_count"),
          F.sum(F.when(F.col("time_period") == "night", 1).otherwise(0)).alias("night_count"),
          F.count("*").alias("total_user_events")
      )
      .with_column("morning", F.round(F.col("morning_count") / F.col("total_user_events"), 4))
      .with_column("afternoon", F.round(F.col("afternoon_count") / F.col("total_user_events"), 4))
      .with_column("evening", F.round(F.col("evening_count") / F.col("total_user_events"), 4))
      .with_column("night", F.round(F.col("night_count") / F.col("total_user_events"), 4))
      .select("USER_ID", "morning", "afternoon", "evening", "night")
)

# 9. Heure moyenne d'activité - VERSION OPTIMISÉE
hour_stats = (
    df.group_by("USER_ID")
      .agg(
          F.avg(F.cos(F.lit(2 * np.pi) * F.col("hour") / F.lit(24))).alias("hour_cos"),
          F.avg(F.sin(F.lit(2 * np.pi) * F.col("hour") / F.lit(24))).alias("hour_sin")
      )
      .with_column(
          "peak_hour_rad", 
          F.atan2(F.col("hour_sin"), F.col("hour_cos"))
      )
      .with_column(
          "peak_hour",
          F.round(
              F.when(
                  F.col("peak_hour_rad") < 0,
                  (F.col("peak_hour_rad") + F.lit(2 * np.pi)) * F.lit(24) / F.lit(2 * np.pi)
              ).otherwise(
                  F.col("peak_hour_rad") * F.lit(24) / F.lit(2 * np.pi)
              ),
              2
          )
      )
      .select("USER_ID", "peak_hour")
)

# 10. Récence en jours
now = df.agg(F.max("timestamp").alias("max_ts")).collect()[0]["MAX_TS"]
agg_behaviour = agg_behaviour.with_column(
    "recency_days",
    F.datediff("day", F.col("last_activity"), F.lit(now))
)

# 11. Fusion finale - UNE SEULE JOINTURE
df_user_features = (
    agg_behaviour.join(time_ratios, on="USER_ID", how="left")
                 .join(hour_stats, on="USER_ID", how="left")
                 .drop("last_activity", "total_events")  # Nettoyage
)

# 12. Remplacer les nulls (si il y en a)
numeric_columns = ["total_views", "total_cart", "total_purchase", "unique_categories", 
                  "total_spent", "avg_basket", "conversion_rate", "recency_days",
                  "morning", "afternoon", "evening", "night", "peak_hour"]

for col in numeric_columns:
    if col in df_user_features.columns:
        df_user_features = df_user_features.with_column(
            col, 
            F.coalesce(F.col(col), F.lit(0.0))
        )

df_user_features.show(10)

In [None]:
from snowflake.snowpark import functions as F

TOP_N = 20

# 1) Identifier les top 20 catégories et créer les features en une seule fois
top_categories_list = [
    row["CATEGORY_ID"] for row in (
        df.filter(F.col("EVENT_TYPE_purchase") == 1)
          .group_by("CATEGORY_ID")
          .agg(F.count("*").alias("purchase_count"))
          .sort(F.desc("purchase_count"))
          .limit(TOP_N)
          .collect()
    ) if row["CATEGORY_ID"] is not None
]

# 2) Calculer directement les pourcentages par catégorie - VERSION ULTRA OPTIMISÉE
# Une seule requête pour tout calculer
category_features = (
    df.filter(F.col("EVENT_TYPE_purchase") == 1)
      .group_by("USER_ID")
      .agg([
          # Calculer directement le ratio de chaque top catégorie
          *[F.round(
              F.sum(F.when(F.col("CATEGORY_ID") == cat, 1).otherwise(0)) / F.count("*"), 
              4
          ).alias(f"cat_{str(cat).replace('-', '_').replace(' ', '_').replace('.', '_')}")
          for cat in top_categories_list],
          # Calculer le ratio des "autres" catégories
          F.round(
              F.sum(F.when(~F.col("CATEGORY_ID").isin(top_categories_list), 1).otherwise(0)) / F.count("*"),
              4
          ).alias("cat_others")
      ])
)

# 3) Nettoyer les noms de colonnes qui commencent par un chiffre
final_columns = ["USER_ID"]
for col in category_features.columns[1:]:  # Skip USER_ID
    if col.startswith("cat_") and len(col) > 4:
        col_suffix = col[4:]  # Enlever "cat_"
        if col_suffix[0].isdigit():
            new_name = f"cat_c_{col_suffix}"
            category_features = category_features.with_column_renamed(col, new_name)
            final_columns.append(new_name)
        else:
            final_columns.append(col)
    else:
        final_columns.append(col)

# 4) Jointure finale avec gestion des nulls
df_final = df_user_features.join(category_features, on="USER_ID", how="left")

# Remplacer les nulls par 0 pour les utilisateurs sans achats
category_cols = [col for col in df_final.columns if col.startswith("cat_")]

# Si pas de colonnes cat_ trouvées, chercher autrement
if not category_cols:
    category_cols = [col for col in category_features.columns if col != "USER_ID"]

# Remplacer les nulls
for col_name in category_cols:
    df_final = df_final.with_column(
        col_name, 
        F.round(F.coalesce(F.col(col_name), F.lit(0.0)), 4)
    )

# Affichage final
df_final.show(10)

In [None]:
# from snowflake.snowpark import functions as F

# # 1) Identifier les colonnes numériques à standardiser
# numeric_columns = ["TOTAL_VIEWS", "TOTAL_CART", "TOTAL_PURCHASE", "UNIQUE_CATEGORIES", 
#                   "TOTAL_SPENT", "AVG_BASKET", "RECENCY_DAYS", "PEAK_HOUR"]

# # 2) Calculer mean et std pour toutes les colonnes en une seule requête
# stats_expressions = []
# for col in numeric_columns:
#     stats_expressions.extend([
#         F.avg(col).alias(f"{col}_mean"),
#         F.stddev(col).alias(f"{col}_std")
#     ])

# # Une seule requête pour calculer toutes les stats
# stats_row = df_final.select(*stats_expressions).collect()[0]

# # 3) Créer le DataFrame standardisé en une seule fois
# standardized_expressions = [F.col("USER_ID")]  # Garder l'ID

# # Construire la liste des noms de colonnes standardisées pour la vérification
# scaled_column_names = []

# for col in numeric_columns:
#     mean_val = stats_row[f"{col}_MEAN"] or 0.0
#     std_val = stats_row[f"{col}_STD"] or 1.0
    
#     # Éviter division par zéro
#     if std_val == 0:
#         std_val = 1.0
    
#     # Expression de standardisation: (x - mean) / std
#     scaled_col_name = f"{col}_scaled"
#     scaled_column_names.append(scaled_col_name)
    
#     standardized_expr = F.round(
#         (F.col(col) - F.lit(mean_val)) / F.lit(std_val),
#         4
#     ).alias(scaled_col_name)
    
#     standardized_expressions.append(standardized_expr)

# # 4) Récupérer les autres colonnes non-numériques (si nécessaire)
# try:
#     all_columns = df_final.columns
#     other_columns = [col for col in all_columns 
#                     if col not in numeric_columns and col != "USER_ID"]
    
#     # Ajouter les colonnes non-numériques
#     for col in other_columns:
#         standardized_expressions.append(F.col(col))
        
# except Exception as e:
#     print(f"Note: Impossible de récupérer toutes les colonnes: {e}")
#     other_columns = []

# # 5) Créer le DataFrame standardisé
# df_standardized = df_final.select(*standardized_expressions)

# # Prendre les 3 premières colonnes standardisées
# sample_cols = scaled_column_names[:3]
# verification_expr = []

# for col in sample_cols:
#     verification_expr.extend([
#         F.round(F.avg(col), 4).alias(f"{col}_mean"),
#         F.round(F.stddev(col), 4).alias(f"{col}_std")
#     ])

# df_standardized.show(10)

In [None]:
# Sauvegarde dans une nouvelle table
df_final.write.save_as_table(
    "PROCESSED_DATA.AMAZING_DATA.CLIENT_EVENTS_CLEAN",
    mode="overwrite"
)