In [3]:
# load and split data

import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv("../data/raw/fraud_transactions.csv")
X = df.drop('fraud_flag', axis=1)
y = df['fraud_flag']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

In [4]:
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer

# currency conversion
class CurrencyConverter(BaseEstimator, TransformerMixin):
    def __init__(self, rates=None, base_currency="INR"):
        if rates is None:
            rates = {"INR": 1.0, "USD": 83.0, "EUR": 90.0}
        self.rates = rates
        self.base_currency = base_currency

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        X["amount_converted"] = X.apply(
            lambda row: row["amount"] * self.rates.get(row["currency"], 1.0), axis=1
        )
        return X
    
    def get_feature_names_out(self, input_features=None):
        """Add 'amount_converted' to the feature names"""
        if input_features is None:
            input_features = []
        # Return original features + new feature
        output_features = list(input_features) + ['amount_converted']
        return np.asarray(output_features, dtype=object)


# typo fixing   
class TypoFixer(BaseEstimator, TransformerMixin):
    def __init__(self, column='merchant_category', typo_map=None):
        if typo_map is None:
            typo_map = {'Groceires': 'Groceries'}
        self.column = column
        self.typo_map = typo_map
    
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X = X.copy()
        if self.column in X.columns:
            X[self.column] = X[self.column].replace(self.typo_map)
        return X
    
    def get_feature_names_out(self, input_features=None):
        """Feature names unchanged - just fixing typos"""
        if input_features is None:
            return np.array([self.column], dtype=object)
        return np.asarray(input_features, dtype=object)
    

# outlier clipping
class OutlierClipper(BaseEstimator, TransformerMixin):
    def __init__(self, features=None, lower_quantile=0.01, upper_quantile=0.99):
        self.features = features
        self.lower_quantile = lower_quantile
        self.upper_quantile = upper_quantile
        self.bounds = {}

    def fit(self, X, y=None):
        for col in self.features:
            q_low = X[col].quantile(self.lower_quantile)
            q_high = X[col].quantile(self.upper_quantile)
            self.bounds[col] = (q_low, q_high)
        return self

    def transform(self, X):
        X = X.copy()
        for col, (low, high) in self.bounds.items():
            X[col] = X[col].clip(lower=low, upper=high)
        return X
    
    def get_feature_names_out(self, input_features=None):
        """Feature names unchanged - just clipping values"""
        if input_features is None:
            return np.asarray(self.features, dtype=object)
        return np.asarray(input_features, dtype=object)
    

# Define Feature Groups
num_features = [
    "amount_converted", "velocity", "ip_risk_score", "customer_age",
    "account_tenure", "geo_distance", "merchant_risk_score", "failed_login_attempts"
]

cat_features = [
    "currency", "merchant_category", "transaction_type", "channel", "location"
]

bin_features = ["card_present", "is_international"]


# Pipelines for Each Feature Type

# Numerical pipeline
num_pipeline = Pipeline([
    ("outlier_clipper", OutlierClipper(features=num_features)),
    ("imputer", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler())
])

# Categorical pipeline
cat_pipeline = Pipeline([
    ("typo_fixer", TypoFixer()),
    ("imputer", SimpleImputer(strategy="constant", fill_value="Unknown")),
    ("encoder", OneHotEncoder(handle_unknown="ignore"))
])

cat_pipeline_catboost = Pipeline([
    ("typo fixer", TypoFixer()),
    ("imputer", SimpleImputer(strategy="constant", fill_value="Unknown"))
])

# Binary pipeline
bin_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="most_frequent"))
])


# Full Preprocessing Pipeline
preprocessor = Pipeline([
    ("currency_converter", CurrencyConverter()),
    ("transformer", ColumnTransformer([
        ("num", num_pipeline, num_features),
        ("cat", cat_pipeline, cat_features),
        ("bin", bin_pipeline, bin_features)
    ]))
])

preprocessor_catboost = Pipeline([
    ("currency_converter", CurrencyConverter()),
    ("transformer", ColumnTransformer([
        ("num", num_pipeline, num_features),
        ("cat", cat_pipeline_catboost, cat_features),
        ("bin", bin_pipeline, bin_features)
    ]))
])

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn.pipeline import Pipeline
from sklearn.model_selection import RandomizedSearchCV, ParameterSampler, StratifiedKFold

parameters = {
    'n_estimators': [100, 200, 300],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
    'bootstrap': [True, False]
}

rf = RandomForestClassifier(random_state=42, n_jobs=-1, oob_score=True)
rf_random = RandomizedSearchCV(
    estimator=rf,
    param_distributions=parameters,
    n_iter=10,
    cv=3,
    verbose=2,
    random_state=42,
    n_jobs=-1,
    scoring='f1'
)
# Full pipeline with model
model_pipeline = Pipeline([
    ("preprocessor", preprocessor),
    ("classifier", rf_random)
])

# Train the model
model_pipeline.fit(X_train, y_train)
# Evaluate the model
y_pred = model_pipeline.predict(X_test)
y_prob = model_pipeline.predict_proba(X_test)[:, 1]

print(classification_report(y_test, y_pred))
print("""\nConfusion matrix: [[TN FP]
                   [FN TP]]""")
print(confusion_matrix(y_test, y_pred))
print("\nOOB Score:", model_pipeline.named_steps['classifier'].best_estimator_.oob_score_)

Fitting 3 folds for each of 10 candidates, totalling 30 fits


24 fits failed out of a total of 30.
The score on these train-test partitions for these parameters will be set to nan.
If these failures are not expected, you can try to debug them by setting error_score='raise'.

Below are more details about the failures:
--------------------------------------------------------------------------------
24 fits failed with the following error:
Traceback (most recent call last):
  File "d:\Projects\Fraud Detection\.venv\Lib\site-packages\sklearn\model_selection\_validation.py", line 833, in _fit_and_score
    estimator.fit(X_train, y_train, **fit_params)
  File "d:\Projects\Fraud Detection\.venv\Lib\site-packages\sklearn\base.py", line 1336, in wrapper
    return fit_method(estimator, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\Projects\Fraud Detection\.venv\Lib\site-packages\sklearn\ensemble\_forest.py", line 447, in fit
    raise ValueError("Out of bag estimation only available if bootstrap=True")
ValueError: Out of ba

              precision    recall  f1-score   support

           0       1.00      1.00      1.00     19798
           1       0.93      0.72      0.81       202

    accuracy                           1.00     20000
   macro avg       0.96      0.86      0.91     20000
weighted avg       1.00      1.00      1.00     20000

[[19787    11]
 [   56   146]]
OOB Score: 0.9972625


In [None]:
from catboost import CatBoostClassifier
from sklearn.model_selection import ParameterSampler, StratifiedKFold
from sklearn.metrics import f1_score
import numpy as np

param_grid_catboost = {
    "iterations": [200, 500],
    "depth": [4, 6, 8],
    "learning_rate": [0.01, 0.1, 0.2],
    "l2_leaf_reg": [1, 3, 5]
}

best_score = -np.inf
best_model = None

# transform training data
X_trf = preprocessor_catboost.fit_transform(X_train)
feature_names = preprocessor_catboost.named_steps["transformer"].get_feature_names_out()
X_train_transformed = pd.DataFrame(X_trf, columns=feature_names)

# identify categorical features for CatBoost
catboost_features = [col for col in X_train_transformed.columns.tolist() if col.startswith("cat_")]

# stratified k-fold cv
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

for params in ParameterSampler(param_grid_catboost, n_iter=5, random_state=42):
    
    cv_scores = []

    for train_index, val_index in skf.split(X_train_transformed, y_train):
        X_train_cv, X_val_cv = X_train_transformed.iloc[train_index], X_train_transformed.iloc[val_index]
        y_train_cv, y_val_cv = y_train.iloc[train_index], y_train.iloc[val_index]

    model = CatBoostClassifier(
        **params,
        loss_function="Logloss",
        eval_metric="F1",
        class_weights=[1, 10],
        random_state=42,
        verbose=0,
        cat_features=[X_train_transformed.columns.get_loc(col) for col in catboost_features]
    )

    model.fit(X_train_cv, y_train_cv)
    y_pred = model.predict(X_val_cv)

    cv_scores.append(f1_score(y_val_cv, y_pred))

    if np.mean(cv_scores) > best_score:
        best_score = np.mean(cv_scores)
        best_model = model

print("Best CatBoost Model:")
print("Best CV F1:", best_score)
print("Best params:", best_model.get_params())


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix, f1_score
from sklearn.pipeline import Pipeline

# -----------------------------
# Parameter Space
# -----------------------------
param_dist = {
    'classifier__n_estimators': [100, 200, 300],
    'classifier__max_depth': [None, 10],
    'classifier__min_samples_split': [2, 5, 10],
    'classifier__min_samples_leaf': [1, 2, 4]
}

# -----------------------------
# Base Model
# -----------------------------
rf = RandomForestClassifier(
    class_weight='balanced',
    random_state=42,
    n_jobs=-1,
    oob_score=True
)

# -----------------------------
# Pipeline
# -----------------------------
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', rf)
])

# -----------------------------
# Randomized Search
# -----------------------------
cv = StratifiedKFold(
    n_splits=5,
    shuffle=True,
    random_state=42
)

random_search = RandomizedSearchCV(
    estimator=pipeline,
    param_distributions=param_dist,
    n_iter=15,
    scoring='f1',
    cv=cv,
    n_jobs=-1,
    verbose=2,
    random_state=42,
    refit=True   # refit best model automatically
)

# -----------------------------
# Train
# -----------------------------
random_search.fit(X_train, y_train)

# -----------------------------
# Best Model & Parameters
# -----------------------------
best_model = random_search.best_estimator_

print("\nBest F1 Score (CV):", random_search.best_score_)
print("\nBest Parameters:")
for k, v in random_search.best_params_.items():
    print(f"{k}: {v}")

# -----------------------------
# Train and Test Evaluation
# -----------------------------

y_pred = best_model.predict(X_train)
print("\ntraining f1 score:", f1_score(y_train, y_pred))

y_pred = best_model.predict(X_test)
print("training f1 score:", f1_score(y_test, y_pred))
# print("\nClassification Report:")
# print(classification_report(y_test, y_pred))

print("\nConfusion Matrix [[TN FP]\n [FN TP]]:")
print(confusion_matrix(y_test, y_pred))

# -----------------------------
# OOB Score (Only if bootstrap=True)
# -----------------------------
rf_best = best_model.named_steps['classifier']

if rf_best.bootstrap:
    print("\nOOB Score:", rf_best.oob_score_)
else:
    print("\nOOB Score: Not available (bootstrap=False)")


Fitting 5 folds for each of 15 candidates, totalling 75 fits

Best F1 Score (CV): 0.8474077348318468

Best Parameters:
classifier__n_estimators: 200
classifier__min_samples_split: 5
classifier__min_samples_leaf: 4
classifier__max_depth: 10
training f1 score: 0.8512396694214877
training f1 score: 0.8133704735376045

Confusion Matrix [[TN FP]
 [FN TP]]:
[[19787    11]
 [   56   146]]

OOB Score: 0.99725


| Pattern           | Meaning              |
| ----------------- | -------------------- |
| Train ≫ CV ≈ Test | Overfitting          |
| Train ≈ CV ≈ Test | Healthy              |
| CV ≫ Test         | Data leakage / shift |


In [None]:
rf = RandomForestClassifier(n_estimators=200, min_samples_split=10, min_samples_leaf=4, max_depth=10,
                            random_state=42, class_weight='balanced', n_jobs=-1, oob_score=True)

rfp = Pipeline([
    ('preprocessor', preprocessor),
    ('model', rf)
])

rfp.fit(X_train, y_train)

y_pred = rfp.predict(X_train)
print("training f1 score:", f1_score(y_train, y_pred))

y_pred = rfp.predict(X_test)
print("testing f1 score:", f1_score(y_test, y_pred))

print("classification_report", classification_report(y_test,y_pred))

training f1 score: 0.8512396694214877
testing f1 score: 0.8133704735376045
classification_report               precision    recall  f1-score   support

           0       1.00      1.00      1.00     19798
           1       0.93      0.72      0.81       202

    accuracy                           1.00     20000
   macro avg       0.96      0.86      0.91     20000
weighted avg       1.00      1.00      1.00     20000



In [35]:
from sklearn.model_selection import train_test_split

X_tr, X_val, y_tr, y_val = train_test_split(
    X_train, y_train,
    test_size=0.2,
    stratify=y_train,
    random_state=42
)

In [36]:
best_model.fit(X_tr, y_tr)
y_val_probs = best_model.predict_proba(X_val)[:, 1]

In [37]:
from sklearn.metrics import precision_recall_curve
import numpy as np

precision, recall, thresholds = precision_recall_curve(y_val, y_val_probs)

f1_scores = 2 * (precision * recall) / (precision + recall + 1e-9)
best_idx = np.argmax(f1_scores)

best_threshold = thresholds[best_idx]
best_f1 = f1_scores[best_idx]

print("Best threshold:", best_threshold)
print("Validation F1:", best_f1)

Best threshold: 0.7614023318402273
Validation F1: 0.8591065287152962


In [38]:
y_test_probs = best_model.predict_proba(X_test)[:, 1]
y_test_pred = (y_test_probs >= best_threshold).astype(int)

from sklearn.metrics import f1_score
print("Test F1:", f1_score(y_test, y_test_pred))

Test F1: 0.8056338028169014
