# Feature Engineering for Fraud Detection

This notebook demonstrates the value of feature engineering with Snowflake's Feature Store, Model Registry, and Experiment Tracking.

**Demo Structure:**
1. **Baseline Model** - XGBoost with basic feature preprocessing
2. **Feature Store Model** - XGBoost with Customer & Terminal entities and feature views
3. **Model Comparison** - Compare models and set best as default in registry

In [59]:
import pandas as pd
import numpy as np
from datetime import datetime
from snowflake.snowpark.context import get_active_session

session = get_active_session()

## Setup: Experiment Tracking & Model Registry

Initialize experiment tracking at the start to capture all model training runs.

In [60]:
from snowflake.ml.experiment import ExperimentTracking
from snowflake.ml.registry import Registry
from snowflake.ml.feature_store import FeatureStore, Entity, FeatureView, CreationMode
from snowflake.ml.model import task
from snowflake.ml.model.model_signature import infer_signature
from snowflake.ml.experiment.callback.xgboost import SnowflakeXgboostCallback
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

DATABASE = "ML_DEMO"
SCHEMA = "FRAUD_DETECTION"

session.sql(f"CREATE DATABASE IF NOT EXISTS {DATABASE}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {DATABASE}.{SCHEMA}").collect()
session.sql(f"USE DATABASE {DATABASE}").collect()
session.sql(f"USE SCHEMA {SCHEMA}").collect()

exp = ExperimentTracking(session=session)
exp.delete_experiment("FRAUD_DETECTION_FEATURE_ENGINEERING")
exp.set_experiment("FRAUD_DETECTION_FEATURE_ENGINEERING")

reg = Registry(session=session, database_name=DATABASE, schema_name=SCHEMA)

print("✓ Experiment tracking initialized")
print("✓ Model registry connected")

In [61]:
df = pd.read_csv('data/transactions.csv')
print(f"Dataset shape: {df.shape}")
df.head()

In [62]:
# Create table from pandas dataframe and then load as Snowpark DataFrame
session.write_pandas(df, 'TRANSACTIONS_RAW', auto_create_table=True, overwrite=True)
df = session.table('TRANSACTIONS_RAW')

print(f"Table TRANSACTIONS_RAW created with {df.count()} records")
df.schema

## Feature Engineering

Creating time-based and rolling window features for fraud detection:

In [63]:
from snowflake.snowpark import Window
from snowflake.snowpark.functions import hour, dayofweek, when, col, count, sum, make_interval, to_timestamp

df = df.with_column('TX_DATETIME', to_timestamp('TX_DATETIME'))

df = df.with_columns(
    ['HOUR', 'DAY_OF_WEEK', 'IS_WEEKEND', 'IS_NIGHT_12AM_7AM'],
    [
        hour('TX_DATETIME'),
        dayofweek('TX_DATETIME'),
        when(dayofweek('TX_DATETIME') >= 6, True).otherwise(False),
        when(hour('TX_DATETIME').between(0, 6), True).otherwise(False)
    ]
)

## View Feature Summary

In [64]:
row_count = df.count()
col_count = len(df.columns)
print(f"\nFinal dataset shape: ({row_count}, {col_count})")
print(f"\nFeatures created:")
feature_cols = ['IS_WEEKEND', 'IS_NIGHT_12AM_7AM', 'DAY_OF_WEEK', 'HOUR']
for c in feature_cols:
    print(f"  - {c}")

print("\nSample of features:")
df.select(['TX_DATETIME', 'CUSTOMER_ID', 'TERMINAL_ID', 'TX_AMOUNT'] + feature_cols).limit(10).show()

In [65]:
print("\nFeature statistics:")
df.select(feature_cols).describe().show()

---
# Step 1: Baseline Model - Basic Feature Preprocessing

Build an XGBoost classifier with only basic features (no feature store).

In [66]:
df_pandas = df.to_pandas()

basic_features = ['TX_AMOUNT', 'HOUR', 'DAY_OF_WEEK', 'IS_WEEKEND', 'IS_NIGHT_12AM_7AM']
X_basic = df_pandas[basic_features].astype(float)
y = df_pandas['TX_FRAUD'].astype(int)

X_train_basic, X_test_basic, y_train, y_test = train_test_split(
    X_basic, y, test_size=0.2, random_state=42, stratify=y
)

print(f"Training set: {X_train_basic.shape[0]} samples")
print(f"Test set: {X_test_basic.shape[0]} samples")
print(f"Features: {basic_features}")

In [67]:
model_baseline = XGBClassifier(
    n_estimators=100,       # fixed, predictable
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
    eval_metric="auc",
    tree_method="hist",
    n_jobs=-1
)

RUN_NAME = "baseline_basic_features_demo_1"

with exp.start_run(RUN_NAME):
    # Log minimal, clear params
    exp.log_params({
        "model_type": "XGBClassifier",
        "feature_set": "basic",
        "n_features": len(basic_features),
        "n_train_rows": len(X_train_basic),
        "n_test_rows": len(X_test_basic),
        "n_estimators": 100,
        "max_depth": 5,
        "learning_rate": 0.1,
        "eval_metric": "auc"
    })

    model_baseline.fit(
        X_train_basic,
        y_train,
        eval_set=[(X_test_basic, y_test)],
        verbose=False
    )

    y_pred_baseline = model_baseline.predict(X_test_basic)
    y_proba_baseline = model_baseline.predict_proba(X_test_basic)[:, 1]

    metrics_baseline = {
        "accuracy": float(accuracy_score(y_test, y_pred_baseline)),
        "precision": float(precision_score(y_test, y_pred_baseline, zero_division=0)),
        "recall": float(recall_score(y_test, y_pred_baseline, zero_division=0)),
        "f1_score": float(f1_score(y_test, y_pred_baseline, zero_division=0)),
        "roc_auc": float(roc_auc_score(y_test, y_proba_baseline))
    }
    exp.log_metrics(metrics_baseline)

    mv_baseline = reg.log_model(
        model_baseline,
        model_name="FRAUD_DETECTION_MODEL",
        version_name="v1_demo_basic",
        sample_input_data=X_train_basic.iloc[: min(50, len(X_train_basic))],
        task=task.Task.TABULAR_BINARY_CLASSIFICATION,
        metrics=metrics_baseline,
        comment="Demo baseline model with basic features"
    )

print("\n=== Baseline Demo Model Results ===")
for metric, value in metrics_baseline.items():
    print(f"{metric}: {value:.4f}")

---
# Step 2: Feature Store Model - Customer & Terminal Entities

Create entities and feature views for Customer and Terminal, then build model with enriched features.

In [68]:
from snowflake.ml.feature_store import FeatureStore, Entity, FeatureView, CreationMode

fs = FeatureStore(
    session=session,
    database=DATABASE,
    name=SCHEMA,
    default_warehouse="COMPUTE_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

print("✓ Feature Store initialized")

In [69]:
customer_entity = Entity(
    name="CUSTOMER",
    join_keys=["CUSTOMER_ID"],
    desc="Customer entity for fraud detection features"
)

terminal_entity = Entity(
    name="TERMINAL",
    join_keys=["TERMINAL_ID"],
    desc="Terminal/ATM entity for fraud detection features"
)

try:
    fs.register_entity(customer_entity)
    print("✓ Customer entity registered")
except:
    print("Customer entity already exists")

try:
    fs.register_entity(terminal_entity)
    print("✓ Terminal entity registered")
except:
    print("Terminal entity already exists")

fs.list_entities().show()

In [70]:
# Add customer rolling window features using Snowpark DataFrame API
# This demonstrates the DataFrame approach (vs SQL shown in terminal features)
print("Adding customer rolling window features using Snowpark DataFrame API...")

df = df.with_columns(
    ['CUSTOMER_TX_COUNT_1H', 'CUSTOMER_TX_COUNT_1W', 'CUSTOMER_AMOUNT_1H', 'CUSTOMER_AMOUNT_1W'], 
    [
        count('TRANSACTION_ID').over(Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=1), Window.currentRow)),
        count('TRANSACTION_ID').over(Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(weeks=1), Window.currentRow)),
        sum('TX_AMOUNT').over(Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=1), Window.currentRow)),
        sum('TX_AMOUNT').over(Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(weeks=1), Window.currentRow))
    ]
)

In [71]:
from snowflake.snowpark.functions import avg, stddev, count_distinct

# Customer features using Snowpark DataFrame API
customer_window_1h = Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=1), Window.currentRow)
customer_window_24h = Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=24), Window.currentRow)
customer_window_7d = Window.partition_by('CUSTOMER_ID').order_by('TX_DATETIME').range_between(-make_interval(days=7), Window.currentRow)

customer_features_df = df.select(
    col('CUSTOMER_ID'),
    col('TX_DATETIME'),
    count('TRANSACTION_ID').over(customer_window_1h).alias('CUST_TX_COUNT_1H'),
    count('TRANSACTION_ID').over(customer_window_24h).alias('CUST_TX_COUNT_24H'),
    count('TRANSACTION_ID').over(customer_window_7d).alias('CUST_TX_COUNT_7D'),
    sum('TX_AMOUNT').over(customer_window_1h).alias('CUST_AMOUNT_1H'),
    sum('TX_AMOUNT').over(customer_window_24h).alias('CUST_AMOUNT_24H'),
    avg('TX_AMOUNT').over(customer_window_7d).alias('CUST_AVG_AMOUNT_7D'),
    stddev('TX_AMOUNT').over(customer_window_7d).alias('CUST_STD_AMOUNT_7D')
)

print("Customer features DataFrame created with Snowpark API")
customer_features_df.limit(3).show()

customer_fv = FeatureView(
    name="CUSTOMER_FRAUD_FEATURES",
    entities=[customer_entity],
    feature_df=customer_features_df,
    timestamp_col="TX_DATETIME",
    refresh_freq="1 hour",
    desc="Customer behavioral features for fraud detection"
)

customer_fv = customer_fv.attach_feature_desc({
    "CUST_TX_COUNT_1H": "Number of transactions by customer in last hour",
    "CUST_TX_COUNT_24H": "Number of transactions by customer in last 24 hours",
    "CUST_TX_COUNT_7D": "Number of transactions by customer in last 7 days",
    "CUST_AMOUNT_1H": "Total amount spent by customer in last hour",
    "CUST_AMOUNT_24H": "Total amount spent by customer in last 24 hours",
    "CUST_AVG_AMOUNT_7D": "Average transaction amount for customer over 7 days",
    "CUST_STD_AMOUNT_7D": "Std deviation of transaction amounts for customer over 7 days"
})

try:
    registered_customer_fv = fs.register_feature_view(
        feature_view=customer_fv,
        version="v1",
        block=True
    )
    print("✓ Customer feature view registered")
except Exception as e:
    registered_customer_fv = fs.get_feature_view("CUSTOMER_FRAUD_FEATURES", "v1")
    print(f"Customer feature view already exists, retrieved existing")

In [75]:
# Terminal features using Snowpark DataFrame API
terminal_window_1h = Window.partition_by('TERMINAL_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=1), Window.currentRow)
terminal_window_24h = Window.partition_by('TERMINAL_ID').order_by('TX_DATETIME').range_between(-make_interval(hours=24), Window.currentRow)
terminal_window_7d = Window.partition_by('TERMINAL_ID').order_by('TX_DATETIME').range_between(-make_interval(days=7), Window.currentRow)

terminal_features_df = df.select(
    col('TERMINAL_ID'),
    col('TX_DATETIME'),
    count('TRANSACTION_ID').over(terminal_window_1h).alias('TERM_TX_COUNT_1H'),
    count('TRANSACTION_ID').over(terminal_window_24h).alias('TERM_TX_COUNT_24H'),
    sum('TX_AMOUNT').over(terminal_window_24h).alias('TERM_AMOUNT_24H'),
    avg('TX_AMOUNT').over(terminal_window_7d).alias('TERM_AVG_AMOUNT_7D'),
    sum(when(col('TX_FRAUD') == 1, 1).otherwise(0)).over(terminal_window_7d).alias('TERM_FRAUD_COUNT_7D')
)

print("Terminal features DataFrame created with Snowpark API")
terminal_features_df.limit(3).show()

terminal_fv = FeatureView(
    name="TERMINAL_FRAUD_FEATURES",
    entities=[terminal_entity],
    feature_df=terminal_features_df,
    timestamp_col="TX_DATETIME",
    refresh_freq="1 hour",
    desc="Terminal behavioral features for fraud detection"
)

terminal_fv = terminal_fv.attach_feature_desc({
    "TERM_TX_COUNT_1H": "Number of transactions at terminal in last hour",
    "TERM_TX_COUNT_24H": "Number of transactions at terminal in last 24 hours",
    "TERM_AMOUNT_24H": "Total amount processed at terminal in last 24 hours",
    "TERM_AVG_AMOUNT_7D": "Average transaction amount at terminal over 7 days",
    "TERM_FRAUD_COUNT_7D": "Number of fraudulent transactions at terminal in last 7 days"
})

try:
    registered_terminal_fv = fs.register_feature_view(
        feature_view=terminal_fv,
        version="v1",
        block=True
    )
    print("\u2713 Terminal feature view registered")
except Exception as e:
    registered_terminal_fv = fs.get_feature_view("TERMINAL_FRAUD_FEATURES", "v1")
    print(f"Terminal feature view already exists, retrieved existing")

In [76]:
spine_df = session.table("TRANSACTIONS_SOURCE").select(
    "TRANSACTION_ID", "CUSTOMER_ID", "TERMINAL_ID", "TX_DATETIME", 
    "TX_AMOUNT", "TX_FRAUD", "HOUR", "DAY_OF_WEEK", "IS_WEEKEND", "IS_NIGHT_12AM_7AM"
)

training_dataset_v2 = fs.generate_dataset(
    name="FRAUD_TRAINING_V2",
    spine_df=spine_df,
    features=[registered_customer_fv, registered_terminal_fv],
    version="v1",
    spine_timestamp_col="TX_DATETIME",
    spine_label_cols=["TX_FRAUD"],
    desc="Training dataset with customer and terminal features"
)

print("✓ Training dataset generated with feature store features")
training_dataset_v2.read.to_snowpark_dataframe().limit(5).show()

In [78]:
df_v2 = training_dataset_v2.read.to_snowpark_dataframe().to_pandas()

features_v2 = [
    'TX_AMOUNT', 'HOUR', 'DAY_OF_WEEK', 'IS_WEEKEND', 'IS_NIGHT_12AM_7AM',
    'CUST_TX_COUNT_1H', 'CUST_TX_COUNT_24H', 'CUST_TX_COUNT_7D',
    'CUST_AMOUNT_1H', 'CUST_AMOUNT_24H', 'CUST_AVG_AMOUNT_7D', 'CUST_STD_AMOUNT_7D',
    'TERM_TX_COUNT_1H', 'TERM_TX_COUNT_24H', 'TERM_AMOUNT_24H',
    'TERM_AVG_AMOUNT_7D', 'TERM_FRAUD_COUNT_7D'
]

X_v2 = df_v2[features_v2].fillna(0).astype(float)
y_v2 = df_v2['TX_FRAUD'].astype(int)

X_train_v2, X_test_v2, y_train_v2, y_test_v2 = train_test_split(
    X_v2, y_v2, test_size=0.2, random_state=42, stratify=y_v2
)

print(f"Training set: {X_train_v2.shape[0]} samples")
print(f"Test set: {X_test_v2.shape[0]} samples")
print(f"Features ({len(features_v2)}): {features_v2}")

In [None]:
model_v2 = XGBClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    random_state=42,
    eval_metric="auc",
    tree_method="hist",
    n_jobs=-1
)

RUN_NAME = "feature_store_customer_terminal"

with exp.start_run(RUN_NAME):
    exp.log_params({
        "model_type": "XGBClassifier",
        "feature_set": "customer_terminal",
        "n_features": len(features_v2),
        "n_train_rows": len(X_train_v2),
        "n_test_rows": len(X_test_v2),
        "n_estimators": 100,
        "max_depth": 5,
        "learning_rate": 0.1,
        "eval_metric": "auc",
        "feature_views": "CUSTOMER_FRAUD_FEATURES, TERMINAL_FRAUD_FEATURES"
    })

    model_v2.fit(
        X_train_v2,
        y_train_v2,
        eval_set=[(X_test_v2, y_test_v2)],
        verbose=False
    )

    y_pred_v2 = model_v2.predict(X_test_v2)
    y_proba_v2 = model_v2.predict_proba(X_test_v2)[:, 1]

    metrics_v2 = {
        "accuracy": float(accuracy_score(y_test_v2, y_pred_v2)),
        "precision": float(precision_score(y_test_v2, y_pred_v2, zero_division=0)),
        "recall": float(recall_score(y_test_v2, y_pred_v2, zero_division=0)),
        "f1_score": float(f1_score(y_test_v2, y_pred_v2, zero_division=0)),
        "roc_auc": float(roc_auc_score(y_test_v2, y_proba_v2))
    }
    exp.log_metrics(metrics_v2)

    mv_v2 = reg.log_model(
        model_v2,
        model_name="FRAUD_DETECTION_MODEL",
        version_name="v2_feature_store",
        sample_input_data=X_train_v2.iloc[:min(50, len(X_train_v2))],
        task=task.Task.TABULAR_BINARY_CLASSIFICATION,
        metrics=metrics_v2,
        comment="Model with Customer and Terminal feature views from Feature Store"
    )

print("\n=== Feature Store Model Results ===")
for metric, value in metrics_v2.items():
    print(f"{metric}: {value:.4f}")

---
# Step 3: Model Comparison & Set Best as Default

Compare both models and set the best performing version as the default in the model registry.

In [None]:
comparison_data = {
    'Version': ['v1_demo_basic', 'v2_feature_store'],
    'Features': ['Basic (5)', 'Customer+Terminal (17)'],
    'Accuracy': [metrics_baseline['accuracy'], metrics_v2['accuracy']],
    'Precision': [metrics_baseline['precision'], metrics_v2['precision']],
    'Recall': [metrics_baseline['recall'], metrics_v2['recall']],
    'F1 Score': [metrics_baseline['f1_score'], metrics_v2['f1_score']],
    'ROC AUC': [metrics_baseline['roc_auc'], metrics_v2['roc_auc']]
}

comparison_df = pd.DataFrame(comparison_data)
print("\n" + "="*80)
print("MODEL COMPARISON - Feature Engineering Impact on Fraud Detection")
print("="*80)
print(comparison_df.to_string(index=False))
print("="*80)

In [None]:
print("\nIMPROVEMENT ANALYSIS:")
print("-" * 50)

for metric in ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'ROC AUC']:
    baseline = comparison_df[comparison_df['Version'] == 'v1_demo_basic'][metric].values[0]
    best = comparison_df[metric].max()
    best_version = comparison_df[comparison_df[metric] == best]['Version'].values[0]
    if baseline > 0:
        improvement = ((best - baseline) / baseline) * 100
        print(f"{metric:12} | Baseline: {baseline:.4f} -> Best: {best:.4f} ({best_version}) | +{improvement:.2f}%")
    else:
        print(f"{metric:12} | Baseline: {baseline:.4f} -> Best: {best:.4f} ({best_version})")

In [None]:
all_metrics = {
    'v1_demo_basic': metrics_baseline,
    'v2_feature_store': metrics_v2
}

best_version = max(all_metrics.keys(), key=lambda v: all_metrics[v]['f1_score'])
best_f1 = all_metrics[best_version]['f1_score']

print(f"\nBest performing model: {best_version}")
print(f"   F1 Score: {best_f1:.4f}")

model = reg.get_model("FRAUD_DETECTION_MODEL")
model.default = best_version

print(f"\nModel registry default version set to: {best_version}")
print(f"\nModel versions in registry:")
model.show_versions()

In [None]:
print("""
================================================================================
                           DEMO SUMMARY
================================================================================

Experiment Tracking: Both model runs logged to 'FRAUD_DETECTION_FEATURE_ENGINEERING'
Feature Store: Created CUSTOMER and TERMINAL entities with feature views
Model Registry: 2 versions logged to 'FRAUD_DETECTION_MODEL'
Default Version: Set to best performing model based on F1 score

KEY TAKEAWAYS:
1. Feature engineering improves fraud detection performance
2. Customer behavioral features capture spending patterns and velocity
3. Terminal features identify high-risk locations and fraud patterns

NEXT STEPS:
- View experiment results in Snowsight: AI & ML > Experiments
- View model registry: AI & ML > Model Registry
- Enable ML Observability to monitor model drift in production

================================================================================
""")