In [None]:
pip install -r requirements.txt

In [None]:
import duckdb
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA

DB_NAME = "amazing.duckdb"
TABLE_EVENTS = "all_events"
TABLE_SEGMENTS = "user_segments"
N_CLUSTERS = 5
SAMPLE_USER_PERCENT = 0.005
BATCH_SIZE = 1000 

con = duckdb.connect(DB_NAME)

In [None]:
# Chargement de users avec au moins 10 événements 
print("Chargement d'un échantillon d'utilisateurs actifs...")

user_ids_df = con.execute(f"""
    SELECT user_id
    FROM {TABLE_EVENTS}
    WHERE user_id IS NOT NULL
    GROUP BY user_id
    HAVING COUNT(*) >= 10
""").fetch_df()

sampled_user_ids = user_ids_df.sample(frac=SAMPLE_USER_PERCENT, random_state=42)['user_id'].tolist()

print(f"Nombre d'utilisateurs actifs échantillonnés : {len(sampled_user_ids)}")

#  Création des features utilisateurs batch par batch 
print("Création des features utilisateurs par batch...")

user_features_list = []

for i in tqdm(range(0, len(sampled_user_ids), BATCH_SIZE), desc="Avancement user features", ncols=100):
    batch_ids = sampled_user_ids[i:i+BATCH_SIZE]
    batch_ids_str = ",".join(f"'{uid}'" for uid in batch_ids)

    batch_query = f"""
    WITH
        base_events AS (
            SELECT
                user_id,
                event_type,
                event_time,
                price,
                LEAD(event_time) OVER (PARTITION BY user_id ORDER BY event_time) AS next_event_time
            FROM {TABLE_EVENTS}
            WHERE user_id IN ({batch_ids_str})
        ),
        features AS (
            SELECT
                user_id,
                COUNT(*) AS total_events,
                SUM(CASE WHEN event_type = 'view' THEN 1 ELSE 0 END) AS total_views,
                SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS total_purchases,
                AVG(EXTRACT(EPOCH FROM (next_event_time - event_time))) AS avg_time_between_events,
                SUM(CASE WHEN event_type = 'purchase' THEN price ELSE 0 END) AS total_spent,
                COALESCE(AVG(CASE WHEN event_type = 'purchase' THEN price ELSE NULL END), 0) AS avg_basket,
                MAX(event_time) AS last_event_time
            FROM base_events
            GROUP BY user_id
    )
    SELECT
        *,
        CASE WHEN total_views > 0 THEN total_purchases * 1.0 / total_views ELSE 0 END AS conversion_rate,
        CASE WHEN (total_views + total_purchases) > 0 THEN total_purchases * 1.0 / (total_views + total_purchases) ELSE 0 END AS purchase_ratio,
        DATE_PART('day', CURRENT_TIMESTAMP - last_event_time) AS days_since_last_event
    FROM features
    """

    batch_features = con.execute(batch_query).fetch_df()

    # Récupérer les user_id de cette batch
    valid_user_ids = con.execute(f"""
        SELECT user_id
        FROM {TABLE_EVENTS}
        WHERE user_id IN ({batch_ids_str})
        GROUP BY user_id
        HAVING COUNT(*) >= 10
    """).fetch_df()
    valid_user_ids = set(valid_user_ids["user_id"].astype(str))

    # Filtrage strict des user_id valides
    batch_features = batch_features[batch_features["user_id"].astype(str).isin(valid_user_ids)]

    user_features_list.append(batch_features)

# Fusionner tous les batchs
user_features = pd.concat(user_features_list, ignore_index=True)

# Vérification des NaN
print("Vérification des NaN")
nan_summary = user_features.isna().sum()
print("Résumé des NaN par colonne :")
print(nan_summary[nan_summary > 0])

users_with_nan = user_features[user_features.isna().any(axis=1)]
print(f"Nombre d'utilisateurs avec des NaN : {len(users_with_nan)}")
print("Exemples d'utilisateurs avec NaN :")
print(users_with_nan.head(10))

# Standardisation
print("Standardisation des features...")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(user_features.drop(columns=["last_event_time"]))

# Clustering
print("Clustering avec KMeans...")
kmeans = KMeans(n_clusters=N_CLUSTERS, random_state=42)
clusters = kmeans.fit_predict(X_scaled)

user_features["segment"] = clusters

#  Sauvegarde des résultats dans DuckDB
print(f"Sauvegarde dans {TABLE_SEGMENTS}...")
con.execute(f"DROP TABLE IF EXISTS {TABLE_SEGMENTS}")
con.register("temp_user_features", user_features)
con.execute(f"CREATE TABLE {TABLE_SEGMENTS} AS SELECT * FROM temp_user_features")


print("Clustering sauvegarder avec succès !")

In [None]:


# --- Visualisation du clustering ---
print("Visualisation du clustering...")
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)

plt.figure(figsize=(10, 8))
plt.scatter(X_pca[:, 0], X_pca[:, 1], c=user_features["segment"], cmap='tab10', s=10)
plt.title("Visualisation des clusters utilisateurs (PCA)")
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.grid(True)
plt.show()


In [None]:
# Lister toutes les tables et colonnes des tables
print("Tables disponibles dans la base :")
tables = con.execute("SHOW TABLES").fetch_df()
print(tables)

print("Structure de la table all_events :")
schema_all_events = con.execute("DESCRIBE TABLE all_events").fetch_df()
print(schema_all_events)

print("Structure de la table user_segments :")
schema_user_segments = con.execute("DESCRIBE TABLE user_segments").fetch_df()
print(schema_user_segments)

df_purchase = con.execute("SELECT * FROM user_segments LIMIT 10").fetch_df()
print(df_purchase)

In [None]:
# Fermeture de la connexion DuckDB
con.close()