# Fraud Detection System (Logistic Regression from Scratch)
**Tugas Besar 2 IF3070 â€“ Dasar Inteligensi Artifisial**

**Team:** AbyuDAIya-Ganbatte

This notebook implements a complete Fraud Detection pipeline using **only NumPy and Pandas**. It includes:
1.  **Logistic Regression** with Adam Optimizer, Focal Loss, and Elastic Net.
2.  **Preprocessing** from scratch (Scaling, Imputation, Encoding).
3.  **Feature Engineering** specifically designed for fraud detection.
4.  **Evaluation** using manual implementations of ROC-AUC, F1, and more.

In [1]:
import numpy as np
import pandas as pd
import json
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.impute import KNNImputer # Only used for complex imputation

# Set random seed for reproducibility
np.random.seed(42)

## 1. Evaluation Metrics
Implementation of metrics from scratch (Accuracy, Precision, Recall, F1, ROC-AUC).

In [2]:
def accuracy_score(y_true, y_pred):
    y_true = np.array(y_true).flatten()
    y_pred = np.array(y_pred).flatten()
    return np.mean(y_true == y_pred)

def precision_score(y_true, y_pred):
    y_true = np.array(y_true).flatten()
    y_pred = np.array(y_pred).flatten()
    true_positives = np.sum((y_pred == 1) & (y_true == 1))
    false_positives = np.sum((y_pred == 1) & (y_true == 0))
    if true_positives + false_positives == 0: return 0.0
    return true_positives / (true_positives + false_positives)

def recall_score(y_true, y_pred):
    y_true = np.array(y_true).flatten()
    y_pred = np.array(y_pred).flatten()
    true_positives = np.sum((y_pred == 1) & (y_true == 1))
    false_negatives = np.sum((y_pred == 0) & (y_true == 1))
    if true_positives + false_negatives == 0: return 0.0
    return true_positives / (true_positives + false_negatives)

def f1_score(y_true, y_pred):
    prec = precision_score(y_true, y_pred)
    rec = recall_score(y_true, y_pred)
    if prec + rec == 0: return 0.0
    return 2 * (prec * rec) / (prec + rec)

def roc_auc_score(y_true, y_scores):
    y_true = np.array(y_true).flatten()
    y_scores = np.array(y_scores).flatten()
    
    thresholds = np.unique(y_scores)
    thresholds = np.concatenate([[thresholds.max() + 1], thresholds, [thresholds.min() - 1]])
    thresholds = np.sort(thresholds)[::-1]
    
    tpr_list = []
    fpr_list = []
    n_pos = np.sum(y_true == 1)
    n_neg = np.sum(y_true == 0)
    
    if n_pos == 0 or n_neg == 0: return 0.5
    
    for thresh in thresholds:
        y_pred = (y_scores >= thresh).astype(int)
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fp = np.sum((y_pred == 1) & (y_true == 0))
        tpr_list.append(tp / n_pos)
        fpr_list.append(fp / n_neg)
    
    # Trapezoidal rule
    tpr_array = np.array(tpr_list)
    fpr_array = np.array(fpr_list)
    sorted_indices = np.argsort(fpr_array)
    return np.trapz(tpr_array[sorted_indices], fpr_array[sorted_indices])

def confusion_matrix(y_true, y_pred):
    y_true = np.array(y_true).flatten()
    y_pred = np.array(y_pred).flatten()
    tn = np.sum((y_pred == 0) & (y_true == 0))
    fp = np.sum((y_pred == 1) & (y_true == 0))
    fn = np.sum((y_pred == 0) & (y_true == 1))
    tp = np.sum((y_pred == 1) & (y_true == 1))
    return np.array([[tn, fp], [fn, tp]])

## 2. Logistic Regression Model
Includes Adam Optimizer, Momentum, and Focal Loss.

In [3]:
class LogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000, optimizer="batch", 
                 batch_size=None, regularization=0.0, l1_ratio=0.0, class_weight=None,
                 lr_schedule="constant", lr_decay=0.1, lr_decay_steps=100,
                 momentum=0.0, nesterov=False,
                 beta1=0.9, beta2=0.999, epsilon=1e-8,
                 use_focal_loss=False, focal_gamma=2.0,
                 early_stopping=True, patience=10, tol=1e-5, verbose=True):
        self.learning_rate = learning_rate
        self.initial_lr = learning_rate
        self.n_iterations = n_iterations
        self.optimizer = optimizer.lower()
        self.regularization = regularization
        self.l1_ratio = l1_ratio
        self.class_weight = class_weight
        self.lr_schedule = lr_schedule
        self.lr_decay = lr_decay
        self.lr_decay_steps = lr_decay_steps
        self.momentum = momentum
        self.nesterov = nesterov
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        self.use_focal_loss = use_focal_loss
        self.focal_gamma = focal_gamma
        self.early_stopping = early_stopping
        self.patience = patience
        self.tol = tol
        self.verbose = verbose
        
        if batch_size is None:
            self.batch_size = 32 if self.optimizer == "mini-batch" else None
        else:
            self.batch_size = batch_size
        
        self.weights = None
        self.bias = None
        self.velocity_w = None
        self.velocity_b = None
        self.m_w = None
        self.v_w = None
        self.m_b = None
        self.v_b = None
        self.t = 0
        self.class_weights_ = None
        self.loss_history = []
        self.weight_history = []
        self.lr_history = []
    
    def _get_learning_rate(self, iteration):
        if self.lr_schedule == "constant": return self.initial_lr
        elif self.lr_schedule == "step":
            return self.initial_lr * (self.lr_decay ** (iteration // self.lr_decay_steps))
        elif self.lr_schedule == "exponential":
            return self.initial_lr * (self.lr_decay ** (iteration / self.n_iterations))
        elif self.lr_schedule == "cosine":
            return self.initial_lr * (1 + np.cos(np.pi * iteration / self.n_iterations)) / 2
        return self.initial_lr
    
    def sigmoid(self, z):
        z = np.clip(z, -500, 500)
        positive_mask = z >= 0
        negative_mask = ~positive_mask
        result = np.zeros_like(z, dtype=float)
        result[positive_mask] = 1 / (1 + np.exp(-z[positive_mask]))
        exp_z = np.exp(z[negative_mask])
        result[negative_mask] = exp_z / (1 + exp_z)
        return result
    
    def compute_loss(self, y_true, y_pred, sample_weights=None):
        epsilon = 1e-15
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
        
        if self.use_focal_loss:
            p_t = np.where(y_true == 1, y_pred, 1 - y_pred)
            focal_weight = (1 - p_t) ** self.focal_gamma
            loss_per_sample = -focal_weight * np.log(p_t)
        else:
            loss_per_sample = -(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
        
        if sample_weights is not None:
            loss = np.sum(sample_weights * loss_per_sample) / np.sum(sample_weights)
        else:
            loss = np.mean(loss_per_sample)
            
        if self.regularization > 0 and self.weights is not None:
            l1_term = self.l1_ratio * np.sum(np.abs(self.weights))
            l2_term = (1 - self.l1_ratio) * 0.5 * np.sum(self.weights ** 2)
            loss += self.regularization * (l1_term + l2_term)
        return loss
    
    def _compute_gradients(self, X, y, y_pred, sample_weights=None):
        n_samples = len(y)
        if self.use_focal_loss:
            epsilon = 1e-15
            y_pred_clipped = np.clip(y_pred, epsilon, 1 - epsilon)
            p_t = np.where(y == 1, y_pred_clipped, 1 - y_pred_clipped)
            focal_weight = (1 - p_t) ** self.focal_gamma
            grad_p = np.where(
                y == 1,
                -focal_weight * (self.focal_gamma * (1 - y_pred_clipped) * np.log(y_pred_clipped + epsilon) + 1) / (y_pred_clipped + epsilon),
                focal_weight * (self.focal_gamma * y_pred_clipped * np.log(1 - y_pred_clipped + epsilon) + 1) / (1 - y_pred_clipped + epsilon)
            )
            error = grad_p * y_pred_clipped * (1 - y_pred_clipped)
        else:
            error = y_pred - y
        
        if sample_weights is not None:
            weighted_error = sample_weights * error
            dw = np.dot(X.T, weighted_error) / np.sum(sample_weights)
            db = np.sum(weighted_error) / np.sum(sample_weights)
        else:
            dw = (1 / n_samples) * np.dot(X.T, error)
            db = (1 / n_samples) * np.sum(error)
            
        if self.regularization > 0:
            l2_grad = (1 - self.l1_ratio) * self.weights
            l1_grad = self.l1_ratio * np.sign(self.weights)
            dw += self.regularization * (l1_grad + l2_grad)
        return dw, db
    
    def fit(self, X, y):
        if isinstance(X, pd.DataFrame): X = X.values
        if isinstance(y, (pd.Series, pd.DataFrame)): y = y.values.flatten()
        
        n_samples, n_features = X.shape
        np.random.seed(42)
        self.weights = np.random.randn(n_features) * np.sqrt(2.0 / n_features)
        self.bias = 0.0
        
        # Adam Init
        self.m_w = np.zeros(n_features)
        self.v_w = np.zeros(n_features)
        self.m_b = 0.0
        self.v_b = 0.0
        self.t = 0
        
        if self.class_weight == "balanced":
            class_counts = np.bincount(y.astype(int))
            self.class_weights_ = n_samples / (2 * class_counts)
            sample_weights = np.where(y == 1, self.class_weights_[1], self.class_weights_[0])
        else:
            sample_weights = None
        
        if self.optimizer == "adam":
            self._fit_adam(X, y, sample_weights)
        else:
            raise NotImplementedError("Only Adam implemented in this notebook view for brevity")
        return self

    def _fit_adam(self, X, y, sample_weights=None):
        n_samples = len(y)
        batch_size = min(self.batch_size if self.batch_size else 32, n_samples)
        best_loss = float('inf')
        patience_counter = 0
        best_weights = self.weights.copy()
        
        for iteration in range(self.n_iterations):
            lr = self._get_learning_rate(iteration)
            indices = np.random.permutation(n_samples)
            X_shuffled, y_shuffled = X[indices], y[indices]
            sw_shuffled = sample_weights[indices] if sample_weights is not None else None
            
            for start_idx in range(0, n_samples, batch_size):
                end_idx = min(start_idx + batch_size, n_samples)
                X_batch = X_shuffled[start_idx:end_idx]
                y_batch = y_shuffled[start_idx:end_idx]
                sw_batch = sw_shuffled[start_idx:end_idx] if sw_shuffled is not None else None
                
                z = np.dot(X_batch, self.weights) + self.bias
                y_pred = self.sigmoid(z)
                dw, db = self._compute_gradients(X_batch, y_batch, y_pred, sw_batch)
                
                self.t += 1
                self.m_w = self.beta1 * self.m_w + (1 - self.beta1) * dw
                self.m_b = self.beta1 * self.m_b + (1 - self.beta1) * db
                self.v_w = self.beta2 * self.v_w + (1 - self.beta2) * (dw ** 2)
                self.v_b = self.beta2 * self.v_b + (1 - self.beta2) * (db ** 2)
                
                m_w_corr = self.m_w / (1 - self.beta1 ** self.t)
                m_b_corr = self.m_b / (1 - self.beta1 ** self.t)
                v_w_corr = self.v_w / (1 - self.beta2 ** self.t)
                v_b_corr = self.v_b / (1 - self.beta2 ** self.t)
                
                self.weights -= lr * m_w_corr / (np.sqrt(v_w_corr) + self.epsilon)
                self.bias -= lr * m_b_corr / (np.sqrt(v_b_corr) + self.epsilon)
            
            z_full = np.dot(X, self.weights) + self.bias
            loss = self.compute_loss(y, self.sigmoid(z_full), sample_weights)
            self.loss_history.append(loss)
            
            if self.early_stopping:
                if loss < best_loss - self.tol:
                    best_loss = loss
                    best_weights = self.weights.copy()
                    patience_counter = 0
                else:
                    patience_counter += 1
                    if patience_counter >= self.patience:
                        if self.verbose: print(f"Early stopping at {iteration}, loss: {loss:.6f}")
                        self.weights = best_weights
                        break
                        
            if self.verbose and iteration % 500 == 0:
                print(f"Iteration {iteration}: loss = {loss:.6f}")

    def predict_proba(self, X):
        if isinstance(X, pd.DataFrame): X = X.values
        z = np.dot(X, self.weights) + self.bias
        return self.sigmoid(z)
    
    def predict(self, X, threshold=0.5):
        return (self.predict_proba(X) >= threshold).astype(int)
    
    def save_model(self, filename):
        model_data = {
            'weights': self.weights.tolist(), 'bias': float(self.bias), 
            'learning_rate': self.learning_rate, 'n_iterations': self.n_iterations
        }
        with open(filename, 'w') as f: json.dump(model_data, f)

## 3. Preprocessing Utils & Feature Engineering
Splitters, Imputers, Scalers, and Feature Generators.

In [4]:
def train_test_split(X, y, test_size=0.2, random_state=None, stratify=None):
    if isinstance(X, pd.DataFrame): X = X.values
    if isinstance(y, (pd.Series, pd.DataFrame)): y = y.values.flatten()
    n_samples = len(X)
    if random_state: np.random.seed(random_state)
    
    if stratify is not None:
        if isinstance(stratify, (pd.Series, pd.DataFrame)): stratify = stratify.values.flatten()
        train_idx, test_idx = [], []
        for cls in np.unique(stratify):
            cls_idx = np.where(stratify == cls)[0]
            np.random.shuffle(cls_idx)
            n_test_cls = int(len(cls_idx) * test_size)
            test_idx.extend(cls_idx[:n_test_cls])
            train_idx.extend(cls_idx[n_test_cls:])
        train_idx, test_idx = np.array(train_idx), np.array(test_idx)
        np.random.shuffle(train_idx)
        np.random.shuffle(test_idx)
    else:
        indices = np.random.permutation(n_samples)
        n_test = int(n_samples * test_size)
        test_idx, train_idx = indices[:n_test], indices[n_test:]
    return X[train_idx], X[test_idx], y[train_idx], y[test_idx]

class StandardScaler:
    def __init__(self):
        self.means = None
        self.stds = None
    def fit_transform(self, X):
        self.means = np.mean(X, axis=0)
        self.stds = np.std(X, axis=0)
        self.stds[self.stds == 0] = 1.0
        return (X - self.means) / self.stds
    def transform(self, X):
        return (X - self.means) / self.stds

def clip_outliers(X, lower=1, upper=99):
    X_c = X.copy()
    bounds = {'lower': [], 'upper': []}
    for i in range(X.shape[1]):
        l, u = np.percentile(X[:, i], lower), np.percentile(X[:, i], upper)
        bounds['lower'].append(l); bounds['upper'].append(u)
        X_c[:, i] = np.clip(X[:, i], l, u)
    return X_c, bounds

def apply_clip(X, bounds):
    X_c = X.copy()
    for i in range(X.shape[1]):
        X_c[:, i] = np.clip(X[:, i], bounds['lower'][i], bounds['upper'][i])
    return X_c

def handle_missing(df, numerical, categorical, n_neighbors=5):
    df_filled = df.copy()
    vals = {}
    # KNN Imputation
    num_cols = [c for c in numerical if c in df_filled.columns]
    if num_cols and df_filled[num_cols].isnull().any().any():
        imputer = KNNImputer(n_neighbors=n_neighbors)
        df_filled[num_cols] = imputer.fit_transform(df_filled[num_cols])
        vals['knn'] = imputer
        vals['num_cols'] = num_cols
    
    # Mode Imputation
    for col in categorical:
        if col in df_filled.columns:
            mode = df_filled[col].mode()[0]
            df_filled[col] = df_filled[col].fillna(mode)
            vals[f'mode_{col}'] = mode
    return df_filled, vals

def apply_imputation(df, vals):
    df_f = df.copy()
    if 'knn' in vals:
        cols = [c for c in vals['num_cols'] if c in df_f.columns]
        df_f[cols] = vals['knn'].transform(df_f[cols])
    for k, v in vals.items():
        if k.startswith('mode_') and k[5:] in df_f.columns:
            df_f[k[5:]] = df_f[k[5:]].fillna(v)
    return df_f

def one_hot_encode(df, cols):
    df_enc = df.copy()
    info = {}
    for col in cols:
        if col in df_enc.columns:
            cats = sorted([c for c in df_enc[col].unique() if pd.notna(c)])
            info[col] = cats
            for c in cats:
                df_enc[f"{col}_{c}"] = (df_enc[col] == c).astype(int)
            df_enc.drop(columns=[col], inplace=True)
    return df_enc, info

def apply_one_hot(df, info):
    df_enc = df.copy()
    for col, cats in info.items():
        if col in df_enc.columns:
            for c in cats:
                df_enc[f"{col}_{c}"] = (df_enc[col] == c).astype(int)
            df_enc.drop(columns=[col], inplace=True)
    return df_enc

### Feature Engineering
Generating ratios, logs, and interaction features.

In [5]:
def engineer_features(df):
    df = df.copy()
    # Ratios
    if 'transaction_amount' in df.columns and 'avg_transaction_amount' in df.columns:
        df['amount_vs_avg'] = df['transaction_amount'] / (df['avg_transaction_amount'] + 1e-5)
    if 'transactions_last_1h' in df.columns and 'transactions_last_24h' in df.columns:
        df['hourly_conc'] = df['transactions_last_1h'] / (df['transactions_last_24h'] + 1)
        
    # Logs
    for col in ['transaction_amount', 'distance_from_home']:
        if col in df.columns and df[col].min() >= 0:
            df[f'{col}_log'] = np.log1p(df[col])
            
    # Interactions
    if 'ip_risk_score' in df.columns and 'device_trust_score' in df.columns:
        df['risk_interaction'] = df['ip_risk_score'] * (1 - df['device_trust_score'] / 100)
    if 'failed_login_attempts' in df.columns and 'transaction_amount' in df.columns:
        df['failed_login_impact'] = df['failed_login_attempts'] * df['transaction_amount']
        
    return df

## 4. Main Pipeline
Data loading, preprocessing execution, training, and evaluation.

In [6]:
def preprocess_pipeline(train_df, test_df):
    # Drop IDs
    ids = test_df['ID'].values
    drop_cols = ['ID', 'transaction_id', 'user_id']
    y_train = train_df['is_fraud'].values
    
    train_X = train_df.drop(columns=['is_fraud'] + drop_cols, errors='ignore')
    test_X = test_df.drop(columns=drop_cols, errors='ignore')
    
    num_cols = train_X.select_dtypes(include=[np.number]).columns.tolist()
    cat_cols = train_X.select_dtypes(include=['object', 'category']).columns.tolist()
    
    # 1. Impute
    print("Imputing...")
    train_filled, vals = handle_missing(train_X, num_cols, cat_cols, n_neighbors=5)
    test_filled = apply_imputation(test_X, vals)
    
    # 2. Engineer
    print("Engineering Features...")
    train_eng = engineer_features(train_filled)
    test_eng = engineer_features(test_filled)
    
    # 3. Encode
    print("Encoding...")
    train_enc, enc_info = one_hot_encode(train_eng, cat_cols)
    test_enc = apply_one_hot(test_eng, enc_info)
    
    # Align columns
    cols = sorted(list(set(train_enc.columns) | set(test_enc.columns)))
    for c in cols:
        if c not in train_enc.columns: train_enc[c] = 0
        if c not in test_enc.columns: test_enc[c] = 0
    train_enc = train_enc[cols]
    test_enc = test_enc[cols]
    
    # 4. Clip & Scale
    print("Scaling...")
    X_train_np, bounds = clip_outliers(train_enc.values)
    X_test_np = apply_clip(test_enc.values, bounds)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_np)
    X_test_scaled = scaler.transform(X_test_np)
    
    return X_train_scaled, y_train, X_test_scaled, ids

In [7]:
# === EXECUTION ===

# 1. Load Data
try:
    train_df = pd.read_csv("train.csv")
    test_df = pd.read_csv("test.csv")
    print("Data Loaded.")
    
    # 2. Run Preprocessing
    X_train, y_train, X_test, test_ids = preprocess_pipeline(train_df, test_df)
    print(f"Train Shape: {X_train.shape}, Test Shape: {X_test.shape}")
    
    # 3. Split Validation
    X_tr, X_val, y_tr, y_val = train_test_split(X_train, y_train, stratify=y_train)
    
    # 4. Train Model
    print("Training Model...")
    model = LogisticRegression(
        learning_rate=0.0015, n_iterations=3000, optimizer="adam", 
        batch_size=256, regularization=0.0006, l1_ratio=0.5,
        class_weight="balanced"
    )
    model.fit(X_tr, y_tr)
    
    # 5. Evaluate
    y_prob_val = model.predict_proba(X_val)
    val_auc = roc_auc_score(y_val, y_prob_val)
    print(f"Validation ROC AUC: {val_auc:.4f}")
    
    # 6. Submission
    final_probs = model.predict_proba(X_test)
    submission = pd.DataFrame({'ID': test_ids, 'is_fraud': final_probs})
    submission.to_csv("submission.csv", index=False)
    print("Submission saved to submission.csv")
    
except FileNotFoundError:
    print("Error: train.csv or test.csv not found in current directory.")

Data Loaded.
Imputing...
Engineering Features...
Encoding...
Scaling...
Train Shape: (100000, 60), Test Shape: (100000, 60)
Training Model...
Iteration 0: loss = 0.680445
Early stopping at 34, loss: 0.674819
Validation ROC AUC: 0.6048
Submission saved to submission.csv


  return np.trapz(tpr_array[sorted_indices], fpr_array[sorted_indices])
