## 1. Préparation
- Données: `dataset/raw/creditcard.csv`
- Sorties intermédiaires: `dataset/processed/` (parquet/Delta)
- Objectif: pipeline reproductible.

In [16]:
# Spark session
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("cc-fraud")
    .getOrCreate()
)
spark

In [17]:
# Chargement CSV -> DataFrame Spark
data_path = "../dataset/raw/creditcard.csv"
df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(data_path)
)
df.printSchema()
df.show(5)

root
 |-- Time: double (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nulla

In [18]:
# Profiling rapide
n_rows = df.count()
class_counts = df.groupBy("Class").count().collect()
print(f"Rows: {n_rows}")
print({row['Class']: row['count'] for row in class_counts})
df.describe().show()
# Option: sauvegarde parquet brut
df.write.mode("overwrite").parquet("../dataset/processed/raw_parquet")

Rows: 284807
{1: 492, 0: 284315}
+-------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+
|summary|              Time|                  V1|                  V2|                  V3|                  V4|                  V5|                  V6|                  V7|                  V8|                  V9|                 V10|                 V11|                 V12|                 V13|         

In [19]:
# Split stratifié simple (approx via sampleBy)
fractions = df.select("Class").distinct().withColumn("fraction", df.Class * 0 + 0.2)
# Pour simplicité, utiliser randomSplit tout en fixant seed
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df.cache(); test_df.cache()
print(train_df.count(), test_df.count())

228269 56538


In [20]:
# Pipeline de features
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
feature_cols = [c for c in df.columns if c not in ("Class",)]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features")
pipeline_feats = Pipeline(stages=[assembler, scaler])
pipeline_feats

Pipeline_c5918ce826cf

In [None]:
# Modèle baseline: Logistic Regression avec poids de classe
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Poids de classe pour atténuer le déséquilibre
class_counts = df.groupBy("Class").count().collect()
counts_map = {row["Class"]: row["count"] for row in class_counts}
total = sum(counts_map.values())
pos = counts_map.get(1, 1)
balancing_ratio = (total - pos) / total
train_df = train_df.withColumn("classWeightCol", F.when(F.col("Class") == 1, balancing_ratio).otherwise(1 - balancing_ratio))

lr = LogisticRegression(featuresCol="features", labelCol="Class", weightCol="classWeightCol")
param_grid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5])
    .build())

evaluator = BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderPR")
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
full_pipeline = Pipeline(stages=[assembler, scaler, cv])
cv_model = full_pipeline.fit(train_df)
preds = cv_model.transform(test_df)
print("AUPRC", evaluator.evaluate(preds))

In [None]:
# Évaluation complémentaire et seuil
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array
preds = preds.withColumn("score", vector_to_array(F.col("probability"))[1])
# Exemple: calcul F1 pour un seuil donné
threshold = 0.5
preds_thresh = preds.withColumn("prediction_thresh", (F.col("score") >= threshold).cast("double"))
tp = preds_thresh.filter((F.col("prediction_thresh") == 1) & (F.col("Class") == 1)).count()
fp = preds_thresh.filter((F.col("prediction_thresh") == 1) & (F.col("Class") == 0)).count()
fn = preds_thresh.filter((F.col("prediction_thresh") == 0) & (F.col("Class") == 1)).count()
precision = tp / (tp + fp + 1e-9)
recall = tp / (tp + fn + 1e-9)
f1 = 2 * precision * recall / (precision + recall + 1e-9)
print("precision", precision, "recall", recall, "f1", f1)

precision 0.48292682926593694 recall 0.8761061946825123 f1 0.6226415089718959


## 2. Variantes modèles
- Essayer RandomForestClassifier / GBTClassifier avec petite grille.
- Comparer AUPRC et temps d'entraînement.

In [None]:
# RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier
import time

start = time.time()
rf = RandomForestClassifier(featuresCol="features", labelCol="Class", numTrees=20, seed=42)
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])
rf_model = rf_pipeline.fit(train_df)
rf_preds = rf_model.transform(test_df)
rf_auprc = evaluator.evaluate(rf_preds)
rf_time = time.time() - start
print(f"RF AUPRC: {rf_auprc:.4f}, Time: {rf_time:.2f}s")

In [None]:
# GradientBoostedTreeClassifier
from pyspark.ml.classification import GBTClassifier

start = time.time()
gbt = GBTClassifier(featuresCol="features", labelCol="Class", maxIter=20, seed=42)
gbt_pipeline = Pipeline(stages=[assembler, scaler, gbt])
gbt_model = gbt_pipeline.fit(train_df)
gbt_preds = gbt_model.transform(test_df)
gbt_auprc = evaluator.evaluate(gbt_preds)
gbt_time = time.time() - start
print(f"GBT AUPRC: {gbt_auprc:.4f}, Time: {gbt_time:.2f}s")

## 3. Export et scoring
- Sauver `full_pipeline` entraîné (`PipelineModel.save`).
- Script batch de scoring : charger modèle, lire parquet, écrire scores.

In [None]:
# Résumé des métrics (sans visualisations - voir model_comparison_viz pour cela)
import pandas as pd

# Ajouter la colonne score pour RF et GBT
rf_preds = rf_preds.withColumn("score", vector_to_array(F.col("probability"))[1])
gbt_preds = gbt_preds.withColumn("score", vector_to_array(F.col("probability"))[1])

# Calcul rapide precision/recall à seuil 0.5
def quick_metrics(df_preds, thresh=0.5):
    pred = F.when(F.col("score") >= thresh, 1).otherwise(0)
    tp = df_preds.filter((F.col("Class") == 1) & (pred == 1)).count()
    fp = df_preds.filter((F.col("Class") == 0) & (pred == 1)).count()
    fn = df_preds.filter((F.col("Class") == 1) & (pred == 0)).count()
    tn = df_preds.filter((F.col("Class") == 0) & (pred == 0)).count()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    return precision, recall, f1

lr_p, lr_r, lr_f1 = quick_metrics(preds)
rf_p, rf_r, rf_f1 = quick_metrics(rf_preds)
gbt_p, gbt_r, gbt_f1 = quick_metrics(gbt_preds)

# Tableau de synthèse
results = pd.DataFrame({
    "Model": ["Logistic Regression", "Random Forest", "GBT"],
    "AUPRC": [evaluator.evaluate(preds), rf_auprc, gbt_auprc],
    "Precision@0.5": [lr_p, rf_p, gbt_p],
    "Recall@0.5": [lr_r, rf_r, gbt_r],
    "F1@0.5": [lr_f1, rf_f1, gbt_f1],
    "Training Time (s)": [47.5, rf_time, gbt_time]  # LR time approx from previous run
})
print(results.to_string(index=False))

In [None]:
# Sauvegarder le meilleur modèle (GBT = meilleur AUPRC)
best_model_name = "gbt_pipeline" if gbt_auprc >= max(evaluator.evaluate(preds), rf_auprc) else ("rf_pipeline" if rf_auprc > evaluator.evaluate(preds) else "lr_pipeline")
best_pipeline = gbt_pipeline if best_model_name == "gbt_pipeline" else (rf_pipeline if best_model_name == "rf_pipeline" else full_pipeline)
best_model = gbt_model if best_model_name == "gbt_pipeline" else (rf_model if best_model_name == "rf_pipeline" else cv_model)

model_path = f"../models/{best_model_name}"
best_model.write().overwrite().save(model_path)
print(f"✓ Sauvegarde du meilleur modèle ({best_model_name}) à {model_path}")