# Fraud Detection avec Feature Store sur OpenShift AI

Ce notebook illustre l'utilisation de **Feature Store** (Feast) sur **Red Hat OpenShift AI 3.2** pour un cas d'usage de **détection de fraude bancaire**.

## Workflow
1. Générer des données de transactions et profils clients
2. Définir les features (entities, feature views, on-demand features)
3. Enregistrer les features dans le Feature Store (`feast apply`)
4. Matérialiser les features dans le online store (PostgreSQL)
5. Récupérer les features historiques pour entraîner un modèle
6. Prédiction en temps réel via le online store

## 1. Installation des dépendances

In [None]:
!pip install feast[postgres] scikit-learn pandas pyarrow

## 2. Configuration du Feature Store

Récupération de la config client depuis le ConfigMap déployé par l'opérateur Feast.

In [None]:
import os
import yaml

FEATURE_REPO_PATH = "../feature_repo"

# Configuration pour se connecter au Feature Store déployé sur OpenShift.
# En workbench RHOAI, cette config est fournie par le ConfigMap
# 'feast-fraud-features-client'.
# Ici on utilise une config locale pour la démo qui pointe directement
# vers les backends (PostgreSQL online store, DuckDB offline, S3 registry).
#
# Les credentials sont lus depuis des variables d'environnement.
# Sur un workbench RHOAI, configurez-les via les env vars du notebook
# ou via un Secret monté dans le pod.

feature_store_config = {
    "project": "fraud_detection",
    "provider": "local",
    "offline_store": {
        "type": "duckdb",
    },
    "online_store": {
        "type": "postgres",
        "host": os.getenv("POSTGRES_HOST", "postgres.fraud-detection-ml.svc.cluster.local"),
        "port": int(os.getenv("POSTGRES_PORT", "5432")),
        "database": os.getenv("POSTGRES_DB", "feast_db"),
        "user": os.getenv("POSTGRES_USER", "feast_user"),
        "password": os.getenv("POSTGRES_PASSWORD"),
    },
    "registry": {
        "path": os.getenv("FEAST_REGISTRY_PATH", "s3://feast-registry/registry.db"),
        "registry_type": "file",
    },
    "entity_key_serialization_version": 3,
}

if not feature_store_config["online_store"]["password"]:
    raise ValueError(
        "POSTGRES_PASSWORD non défini. "
        "Configurez la variable d'environnement avant d'exécuter ce notebook."
    )

# Écrire le fichier feature_store.yaml
config_path = os.path.join(FEATURE_REPO_PATH, "feature_store.yaml")
with open(config_path, "w") as f:
    yaml.dump(feature_store_config, f, default_flow_style=False)

print(f"Configuration écrite dans {config_path}")
# Afficher la config sans le mot de passe
safe_config = feature_store_config.copy()
safe_config["online_store"] = {
    **safe_config["online_store"],
    "password": "****"
}
print(yaml.dump(safe_config, default_flow_style=False))

## 3. Génération des données de démo

On simule des données de profils clients et de statistiques de transactions.

In [None]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

np.random.seed(42)
N_CUSTOMERS = 500

now = pd.Timestamp.now()

# --- Profils clients ---
customer_ids = [f"C{str(i).zfill(5)}" for i in range(N_CUSTOMERS)]
customer_profiles = pd.DataFrame({
    "customer_id": customer_ids,
    "age": np.random.randint(18, 75, N_CUSTOMERS),
    "country": np.random.choice(["FR", "US", "UK", "DE", "ES"], N_CUSTOMERS),
    "account_age_days": np.random.randint(30, 3650, N_CUSTOMERS),
    "credit_limit": np.round(np.random.uniform(1000, 50000, N_CUSTOMERS), 2),
    "num_cards": np.random.randint(1, 5, N_CUSTOMERS),
    "event_timestamp": [now - pd.Timedelta(hours=np.random.randint(1, 48)) for _ in range(N_CUSTOMERS)],
    "created_timestamp": [now] * N_CUSTOMERS,
})

# --- Statistiques de transactions ---
transaction_stats = pd.DataFrame({
    "customer_id": customer_ids,
    "avg_transaction_amount_30d": np.round(np.random.uniform(20, 500, N_CUSTOMERS), 2),
    "num_transactions_7d": np.random.randint(0, 50, N_CUSTOMERS),
    "num_transactions_1d": np.random.randint(0, 15, N_CUSTOMERS),
    "max_transaction_amount_7d": np.round(np.random.uniform(50, 5000, N_CUSTOMERS), 2),
    "num_foreign_transactions_30d": np.random.randint(0, 10, N_CUSTOMERS),
    "num_declined_transactions_7d": np.random.randint(0, 5, N_CUSTOMERS),
    "event_timestamp": [now - pd.Timedelta(hours=np.random.randint(1, 24)) for _ in range(N_CUSTOMERS)],
    "created_timestamp": [now] * N_CUSTOMERS,
})

# Sauvegarder en Parquet
data_dir = os.path.join(FEATURE_REPO_PATH, "data")
os.makedirs(data_dir, exist_ok=True)

customer_profiles.to_parquet(os.path.join(data_dir, "customer_profiles.parquet"))
transaction_stats.to_parquet(os.path.join(data_dir, "transaction_stats.parquet"))

print(f"{N_CUSTOMERS} profils clients générés")
print(f"{N_CUSTOMERS} statistiques de transactions générées")
customer_profiles.head()

In [None]:
transaction_stats.head()

## 4. Définition des Features

Les features sont définies dans `feature_repo/features.py`. Voici un aperçu :

In [None]:
# Afficher les définitions de features
with open(os.path.join(FEATURE_REPO_PATH, "features.py"), "r") as f:
    print(f.read())

## 5. Enregistrement des features (`feast apply`)

On enregistre les entités, feature views et on-demand features dans le registry.

In [None]:
from feast import FeatureStore

# Initialiser le Feature Store
store = FeatureStore(repo_path=FEATURE_REPO_PATH)

print("Projet Feast :", store.project)
print("Applying feature definitions...")
store.apply(
    [
        store.repo.feature_views[0].entities[0] if store.repo.feature_views else None,
    ]
)

In [None]:
# Appliquer toutes les définitions via la CLI feast
!cd {FEATURE_REPO_PATH} && feast apply

In [None]:
# Vérifier les objets enregistrés
store = FeatureStore(repo_path=FEATURE_REPO_PATH)

print("=== Entités ===")
for entity in store.list_entities():
    print(f"  - {entity.name}")

print("\n=== Feature Views ===")
for fv in store.list_feature_views():
    print(f"  - {fv.name} ({len(fv.features)} features, TTL={fv.ttl})")
    for feature in fv.features:
        print(f"      {feature.name}: {feature.dtype}")

print("\n=== On-Demand Feature Views ===")
for odfv in store.list_on_demand_feature_views():
    print(f"  - {odfv.name}")
    for feature in odfv.features:
        print(f"      {feature.name}: {feature.dtype}")

## 6. Matérialisation dans le Online Store (PostgreSQL)

On charge les features dans PostgreSQL pour le serving temps réel.

In [None]:
from datetime import datetime, timedelta

# Matérialiser les features des 2 derniers jours dans le online store
store.materialize_incremental(end_date=datetime.utcnow())
print("Matérialisation terminée dans PostgreSQL")

## 7. Récupération des features historiques (Training)

On récupère les features depuis le **offline store** (DuckDB) pour entraîner un modèle de détection de fraude.

In [None]:
# Simuler un jeu de données de transactions labellisées
N_TRANSACTIONS = 2000

entity_df = pd.DataFrame({
    "customer_id": np.random.choice(customer_ids, N_TRANSACTIONS),
    "event_timestamp": [now - pd.Timedelta(hours=np.random.randint(1, 24)) for _ in range(N_TRANSACTIONS)],
    "transaction_amount": np.round(np.random.exponential(200, N_TRANSACTIONS), 2),
    "is_foreign_transaction": np.random.choice([0, 1], N_TRANSACTIONS, p=[0.85, 0.15]),
})

# Liste des features à récupérer
feature_refs = [
    "customer_profile:age",
    "customer_profile:account_age_days",
    "customer_profile:credit_limit",
    "customer_profile:num_cards",
    "transaction_stats:avg_transaction_amount_30d",
    "transaction_stats:num_transactions_7d",
    "transaction_stats:num_transactions_1d",
    "transaction_stats:max_transaction_amount_7d",
    "transaction_stats:num_foreign_transactions_30d",
    "transaction_stats:num_declined_transactions_7d",
    "fraud_risk_features:amount_ratio_to_avg",
    "fraud_risk_features:amount_ratio_to_max",
    "fraud_risk_features:risk_score",
]

# Récupérer les features historiques depuis l'offline store
print("Récupération des features historiques...")
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=feature_refs,
).to_df()

print(f"Dataset d'entraînement : {training_df.shape[0]} lignes, {training_df.shape[1]} colonnes")
training_df.head(10)

## 8. Entraînement du modèle de détection de fraude

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

# Générer des labels de fraude simulés
# Les transactions avec un risk_score élevé ont plus de chances d'être frauduleuses
training_df = training_df.dropna()
fraud_probability = 1 / (1 + np.exp(-(training_df["risk_score"] - 1.5) * 3))
training_df["is_fraud"] = (np.random.random(len(training_df)) < fraud_probability).astype(int)

print(f"Distribution des fraudes :")
print(training_df["is_fraud"].value_counts())
print(f"Taux de fraude : {training_df['is_fraud'].mean():.2%}")

# Préparer les features pour le modèle
model_features = [
    "age", "account_age_days", "credit_limit", "num_cards",
    "avg_transaction_amount_30d", "num_transactions_7d", "num_transactions_1d",
    "max_transaction_amount_7d", "num_foreign_transactions_30d",
    "num_declined_transactions_7d",
    "transaction_amount", "is_foreign_transaction",
    "amount_ratio_to_avg", "amount_ratio_to_max", "risk_score",
]

X = training_df[model_features]
y = training_df["is_fraud"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Entraîner un Random Forest
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# Évaluer
y_pred = clf.predict(X_test)
print("\n=== Rapport de classification ===")
print(classification_report(y_test, y_pred, target_names=["Légitime", "Fraude"]))

print("=== Matrice de confusion ===")
print(confusion_matrix(y_test, y_pred))

In [None]:
# Importance des features
import matplotlib.pyplot as plt

importances = pd.Series(clf.feature_importances_, index=model_features).sort_values(ascending=True)

fig, ax = plt.subplots(figsize=(10, 6))
importances.plot(kind="barh", ax=ax)
ax.set_title("Importance des features pour la détection de fraude")
ax.set_xlabel("Importance")
plt.tight_layout()
plt.show()

## 9. Prédiction en temps réel via le Online Store

Simulation d'une transaction entrante : on récupère les features du client depuis le **online store** (PostgreSQL) en temps réel, puis on applique le modèle.

In [None]:
# Simuler une transaction suspecte
test_customer = "C00042"
transaction = {
    "transaction_amount": 4500.00,  # montant élevé
    "is_foreign_transaction": 1,     # transaction à l'étranger
}

print(f"Transaction entrante pour le client {test_customer}:")
print(f"  Montant : {transaction['transaction_amount']} EUR")
print(f"  Transaction étrangère : {'Oui' if transaction['is_foreign_transaction'] else 'Non'}")

# Récupérer les features en temps réel depuis PostgreSQL
online_features = store.get_online_features(
    entity_rows=[
        {
            "customer_id": test_customer,
            **transaction,
        }
    ],
    features=feature_refs,
).to_dict()

print("\nFeatures récupérées depuis le online store (PostgreSQL) :")
for key, values in online_features.items():
    if key != "customer_id":
        print(f"  {key}: {values[0]}")

In [None]:
# Construire le vecteur de features pour la prédiction
feature_vector = {}
feature_vector.update(online_features)
feature_vector.update({k: [v] for k, v in transaction.items()})

predict_df = pd.DataFrame(feature_vector)

# Ne garder que les features du modèle (dans le bon ordre)
available_features = [f for f in model_features if f in predict_df.columns]
predict_input = predict_df[available_features]

# Prédiction
prediction = clf.predict(predict_input)[0]
probability = clf.predict_proba(predict_input)[0]

print("\n" + "=" * 50)
if prediction == 1:
    print(f"ALERTE FRAUDE - Probabilité : {probability[1]:.1%}")
    print("Action : Transaction bloquée pour vérification")
else:
    print(f"Transaction légitime - Probabilité fraude : {probability[1]:.1%}")
    print("Action : Transaction approuvée")
print("=" * 50)

## 10. Résumé de l'architecture

```
                    OpenShift AI 3.2
    ┌─────────────────────────────────────────┐
    │                                         │
    │   ┌─────────────┐   ┌───────────────┐  │
    │   │  Notebook    │   │  Feature Store │  │
    │   │  (Workbench) │──▶│  (Feast)       │  │
    │   └─────────────┘   └───────┬───────┘  │
    │                             │           │
    │              ┌──────────────┼────────┐  │
    │              ▼              ▼        ▼  │
    │   ┌──────────────┐ ┌──────────┐ ┌────┐ │
    │   │ Offline Store │ │  Online  │ │ S3 │ │
    │   │   (DuckDB)   │ │  Store   │ │Reg.│ │
    │   │              │ │(Postgres)│ │    │ │
    │   │  Training    │ │ Serving  │ │Meta│ │
    │   └──────────────┘ └──────────┘ └────┘ │
    │                                         │
    └─────────────────────────────────────────┘
```

### Composants

| Composant | Technologie | Usage |
|-----------|-------------|-------|
| Offline Store | DuckDB | Features historiques pour le training |
| Online Store | PostgreSQL | Features temps réel pour l'inférence |
| Registry | S3 (MinIO) | Métadonnées des features |
| Feature Server | Feast (RHOAI) | API gRPC/REST pour servir les features |
| Notebook | Workbench RHOAI | Développement et expérimentation |