# Credit Card Fraud Detection - Enhanced Modeling

## HỌ VÀ TÊN: Cao Tấn Hoàng Huy
## MSSV: 23127051

In [None]:
# Khai báo các thư viện cần thiết
import numpy as np
import matplotlib.pyplot as plt

# Setup
np.random.seed(42)

## Load preprocessed data

In [None]:
# Load preprocessed data
print("Loading preprocessed data...")

try:
    # Load from preprocessing notebook
    X_train_scaled = np.load('X_train_scaled.npy')
    X_test_scaled = np.load('X_test_scaled.npy')
    X_train_poly = np.load('X_train_poly.npy')
    X_test_poly = np.load('X_test_poly.npy')
    y_train = np.load('y_train.npy')
    y_test = np.load('y_test.npy')
    feature_names = np.load('feature_names.npy', allow_pickle=True).tolist()
    scaler_mean = np.load('scaler_mean.npy')
    scaler_std = np.load('scaler_std.npy')
    
    print(f"Data loaded successfully from preprocessing files!")
    
except FileNotFoundError as e:
    print(f"Preprocessing files not found. Please run 02_preprocessing.ipynb first.")
    print(f"Error: {e}")
    raise

print(f"\nDataset overview:")
print(f"  X_train_scaled: {X_train_scaled.shape}")
print(f"  X_test_scaled:  {X_test_scaled.shape}")
print(f"  X_train_poly:   {X_train_poly.shape}")
print(f"  X_test_poly:    {X_test_poly.shape}")
print(f"  y_train:        {y_train.shape}")
print(f"  y_test:         {y_test.shape}")

# Class distribution
unique_train, counts_train = np.unique(y_train, return_counts=True)
print(f"\nClass distribution in training:")
for cls, count in zip(unique_train, counts_train):
    pct = (count / len(y_train)) * 100
    label = "Normal" if cls == 0 else "Fraud"
    print(f"  {label}: {count:>6,} ({pct:>5.2f}%)")

imbalance_ratio = counts_train[0] / counts_train[1] if len(counts_train) == 2 else None
print(f"  Imbalance ratio: {imbalance_ratio:.1f}:1")

# ENHANCED MODELING WITH PERFORMANCE FIXES

## 1. Enhanced Logistic Regression với Class Weights

In [None]:
class LogisticRegressionWithWeights:
    """
    Enhanced Logistic Regression với class weights và regularization
    Giải quyết class imbalance và overfitting
    """
    def __init__(self, learning_rate=0.01, max_iterations=1000, 
                 tolerance=1e-6, regularization_strength=0.1, 
                 class_weights=None, verbose=False):
        self.learning_rate = learning_rate
        self.max_iterations = max_iterations
        self.tolerance = tolerance
        self.regularization_strength = regularization_strength  # λ for L2
        self.class_weights = class_weights
        self.verbose = verbose
        self.weights = None
        self.bias = None
        self.cost_history = []
        self.fitted = False
    
    def _sigmoid(self, z):
        # Improved sigmoid với numerical stability
        z = np.clip(z, -709, 709)  # Prevent overflow
        return 1 / (1 + np.exp(-z))
    
    def _compute_sample_weights(self, y):
        """Calculate sample weights based on class weights"""
        if self.class_weights is None:
            return np.ones(len(y))
        
        sample_weights = np.zeros(len(y))
        for class_label, weight in self.class_weights.items():
            mask = (y == class_label)
            sample_weights[mask] = weight
        
        return sample_weights
    
    def _compute_cost(self, X, y, sample_weights):
        """Compute weighted logistic loss với L2 regularization"""
        m = X.shape[0]
        
        # Forward propagation
        z = np.dot(X, self.weights) + self.bias
        y_pred = self._sigmoid(z)
        
        # Prevent log(0)
        epsilon = 1e-15
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
        
        # Weighted logistic loss
        log_loss = -(y * np.log(y_pred) + (1 - y) * np.log(1 - y_pred))
        weighted_loss = np.mean(sample_weights * log_loss)
        
        # L2 regularization term
        l2_penalty = self.regularization_strength * np.sum(self.weights ** 2)
        
        total_cost = weighted_loss + l2_penalty
        
        return total_cost
    
    def fit(self, X, y):
        """Train the enhanced logistic regression model"""
        X = np.array(X, dtype=np.float64)
        y = np.array(y, dtype=np.float64)
        
        m, n = X.shape
        
        # Initialize parameters
        self.weights = np.random.normal(0, 0.01, n)  # Small random initialization
        self.bias = 0.0
        self.cost_history = []
        
        # Compute sample weights
        sample_weights = self._compute_sample_weights(y)
        
        if self.verbose:
            print(f"Training enhanced logistic regression...")
            print(f"  Features: {n}")
            print(f"  Samples: {m}")
            print(f"  Regularization: {self.regularization_strength}")
            if self.class_weights:
                print(f"  Class weights: {self.class_weights}")
        
        # Training loop
        prev_cost = float('inf')
        
        for iteration in range(self.max_iterations):
            # Forward propagation
            z = np.dot(X, self.weights) + self.bias
            y_pred = self._sigmoid(z)
            
            # Compute cost
            cost = self._compute_cost(X, y, sample_weights)
            self.cost_history.append(cost)
            
            # Weighted gradients
            error = y_pred - y
            weighted_error = sample_weights * error
            
            # Gradients với L2 regularization
            dw = (1/m) * np.dot(X.T, weighted_error) + 2 * self.regularization_strength * self.weights
            db = (1/m) * np.sum(weighted_error)
            
            # Update parameters
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db
            
            # Convergence check
            if abs(prev_cost - cost) < self.tolerance:
                if self.verbose:
                    print(f"  Converged at iteration {iteration + 1}")
                break
                
            prev_cost = cost
            
            # Progress logging
            if self.verbose and (iteration + 1) % 100 == 0:
                print(f"  Iteration {iteration + 1:>4}: Cost = {cost:.6f}")
        
        self.fitted = True
        
        if self.verbose:
            print(f"  Training completed!")
            print(f"  Final cost: {self.cost_history[-1]:.6f}")
        
        return self
    
    def predict_proba(self, X):
        """Predict class probabilities"""
        if not self.fitted:
            raise ValueError("Model chưa được train. Hãy gọi fit() trước.")
        
        X = np.array(X, dtype=np.float64)
        z = np.dot(X, self.weights) + self.bias
        probabilities = self._sigmoid(z)
        
        # Return both class probabilities
        prob_class_0 = 1 - probabilities
        prob_class_1 = probabilities
        
        return np.column_stack([prob_class_0, prob_class_1])
    
    def predict(self, X, threshold=0.5):
        """Predict binary classes"""
        probabilities = self.predict_proba(X)
        return (probabilities[:, 1] >= threshold).astype(int)
    
    def get_decision_scores(self, X):
        """Get decision scores (raw logits)"""
        if not self.fitted:
            raise ValueError("Model chưa được train.")
        
        X = np.array(X, dtype=np.float64)
        return np.dot(X, self.weights) + self.bias

print("Enhanced LogisticRegressionWithWeights implementation completed!")

## 2. Feature Selection Implementation

In [None]:
def univariate_feature_selection(X, y, k=25):
    """
    Univariate feature selection using chi-square test
    Select k best features based on chi-square scores
    """
    X = np.array(X, dtype=np.float64)
    y = np.array(y, dtype=np.float64)
    
    n_samples, n_features = X.shape
    
    # Convert to non-negative values for chi-square
    X_positive = X - np.min(X, axis=0) + 1e-8
    
    scores = np.zeros(n_features)
    
    for feature_idx in range(n_features):
        feature_values = X_positive[:, feature_idx]
        
        # Binning for chi-square test
        # Use quantile-based binning
        percentiles = [0, 25, 50, 75, 100]
        bins = np.percentile(feature_values, percentiles)
        bins = np.unique(bins)  # Remove duplicates
        
        if len(bins) < 2:
            scores[feature_idx] = 0
            continue
        
        # Digitize feature values
        feature_binned = np.digitize(feature_values, bins[1:-1])
        
        # Create contingency table
        unique_bins = np.unique(feature_binned)
        unique_classes = np.unique(y)
        
        contingency_table = np.zeros((len(unique_bins), len(unique_classes)))
        
        for i, bin_val in enumerate(unique_bins):
            for j, class_val in enumerate(unique_classes):
                count = np.sum((feature_binned == bin_val) & (y == class_val))
                contingency_table[i, j] = count
        
        # Compute chi-square statistic
        chi2_score = compute_chi2_statistic(contingency_table)
        scores[feature_idx] = chi2_score
    
    # Select k best features
    selected_indices = np.argsort(scores)[::-1][:k]
    selected_indices = np.sort(selected_indices)  # Keep original order
    
    return selected_indices, scores

def compute_chi2_statistic(contingency_table):
    """Compute chi-square statistic from contingency table"""
    # Add small epsilon to avoid division by zero
    epsilon = 1e-10
    contingency_table = contingency_table + epsilon
    
    # Compute expected frequencies
    row_totals = np.sum(contingency_table, axis=1, keepdims=True)
    col_totals = np.sum(contingency_table, axis=0, keepdims=True)
    total = np.sum(contingency_table)
    
    expected = (row_totals * col_totals) / total
    
    # Compute chi-square
    chi2 = np.sum((contingency_table - expected) ** 2 / expected)
    
    return chi2

print("Feature selection implementation completed!")

## 3. Apply Feature Selection

In [None]:
# Apply feature selection to polynomial features
print("Applying feature selection...")

# Select best 25 features from polynomial features
selected_features, feature_scores = univariate_feature_selection(
    X_train_poly, y_train, k=25
)

# Apply selection
X_train_selected = X_train_poly[:, selected_features]
X_test_selected = X_test_poly[:, selected_features]

print(f"\nFeature selection completed!")
print(f"  Original features: {X_train_poly.shape[1]}")
print(f"  Selected features: {X_train_selected.shape[1]}")
print(f"  Reduction: {((X_train_poly.shape[1] - X_train_selected.shape[1]) / X_train_poly.shape[1] * 100):.1f}%")

print(f"\nTop 10 selected features (by score):")
top_10_indices = np.argsort(feature_scores)[::-1][:10]
for i, feature_idx in enumerate(top_10_indices):
    if feature_idx in selected_features:
        print(f"  {i+1:2d}. Feature {feature_idx:3d}: Score = {feature_scores[feature_idx]:.2f}")

## 4. Model Training với Enhanced Configuration

In [None]:
# Train enhanced model với optimized parameters
print("Training enhanced logistic regression model...")

# Calculate optimal class weights
unique_classes, class_counts = np.unique(y_train, return_counts=True)
total_samples = len(y_train)

# Enhanced class weights (manual tuning for better performance)
class_weights = {
    0: 1.0,      # Normal class
    1: 20.0      # Fraud class - higher weight for better recall
}

print(f"\nUsing enhanced class weights: {class_weights}")

# Initialize and train enhanced model
enhanced_model = LogisticRegressionWithWeights(
    learning_rate=0.01,
    max_iterations=1000,
    tolerance=1e-6,
    regularization_strength=0.1,  # Strong regularization
    class_weights=class_weights,
    verbose=True
)

# Train model on selected features
enhanced_model.fit(X_train_selected, y_train)

print(f"\nEnhanced model training completed!")

## 5. Evaluation Metrics Implementation

In [None]:
def compute_confusion_matrix(y_true, y_pred):
    """Compute confusion matrix"""
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    
    # For binary classification
    tp = np.sum((y_true == 1) & (y_pred == 1))  # True Positives
    tn = np.sum((y_true == 0) & (y_pred == 0))  # True Negatives  
    fp = np.sum((y_true == 0) & (y_pred == 1))  # False Positives
    fn = np.sum((y_true == 1) & (y_pred == 0))  # False Negatives
    
    return np.array([[tn, fp], [fn, tp]])

def compute_metrics(y_true, y_pred):
    """Compute comprehensive classification metrics"""
    cm = compute_confusion_matrix(y_true, y_pred)
    tn, fp, fn, tp = cm.ravel()
    
    # Basic metrics
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    
    # F1 Score
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Additional metrics for fraud detection
    false_positive_rate = fp / (tn + fp) if (tn + fp) > 0 else 0
    false_negative_rate = fn / (tp + fn) if (tp + fn) > 0 else 0
    
    return {
        'confusion_matrix': cm,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'specificity': specificity,
        'f1_score': f1,
        'false_positive_rate': false_positive_rate,
        'false_negative_rate': false_negative_rate,
        'tp': tp, 'tn': tn, 'fp': fp, 'fn': fn
    }

def print_detailed_metrics(metrics, title="Model Performance"):
    """Print detailed performance metrics"""
    print(f"\n{title}")
    print("=" * len(title))
    
    # Confusion Matrix
    cm = metrics['confusion_matrix']
    print(f"\nConfusion Matrix:")
    print(f"                  Predicted")
    print(f"                Normal  Fraud")
    print(f"Actual Normal   {cm[0,0]:>6}  {cm[0,1]:>5}")
    print(f"       Fraud    {cm[1,0]:>6}  {cm[1,1]:>5}")
    
    # Performance Metrics
    print(f"\nPerformance Metrics:")
    print(f"  Accuracy:      {metrics['accuracy']:.4f}")
    print(f"  Precision:     {metrics['precision']:.4f}")
    print(f"  Recall:        {metrics['recall']:.4f}")
    print(f"  F1-Score:      {metrics['f1_score']:.4f}")
    print(f"  Specificity:   {metrics['specificity']:.4f}")
    
    # Fraud Detection Context
    print(f"\nFraud Detection Context:")
    print(f"  True Positives:  {metrics['tp']:>4} (Fraud correctly identified)")
    print(f"  True Negatives:  {metrics['tn']:>4} (Normal correctly identified)")
    print(f"  False Positives: {metrics['fp']:>4} (Normal wrongly flagged as fraud)")
    print(f"  False Negatives: {metrics['fn']:>4} (Fraud missed)")
    
    print(f"\nError Rates:")
    print(f"  False Positive Rate: {metrics['false_positive_rate']:.4f}")
    print(f"  False Negative Rate: {metrics['false_negative_rate']:.4f}")

print("Evaluation metrics implementation completed!")

## 6. Threshold Optimization

In [None]:
def optimize_threshold(model, X, y, thresholds=None):
    """Find optimal threshold for fraud detection"""
    if thresholds is None:
        thresholds = np.arange(0.1, 0.9, 0.05)
    
    probabilities = model.predict_proba(X)[:, 1]
    
    results = []
    
    for threshold in thresholds:
        y_pred = (probabilities >= threshold).astype(int)
        metrics = compute_metrics(y, y_pred)
        
        results.append({
            'threshold': threshold,
            'f1_score': metrics['f1_score'],
            'precision': metrics['precision'],
            'recall': metrics['recall'],
            'accuracy': metrics['accuracy']
        })
    
    # Find threshold with best F1 score
    best_idx = np.argmax([r['f1_score'] for r in results])
    best_result = results[best_idx]
    
    return best_result, results

# Find optimal threshold on training data
print("Optimizing decision threshold...")

best_threshold_result, all_threshold_results = optimize_threshold(
    enhanced_model, X_train_selected, y_train
)

optimal_threshold = best_threshold_result['threshold']

print(f"\nThreshold optimization completed!")
print(f"  Optimal threshold: {optimal_threshold:.3f}")
print(f"  F1-Score at optimal threshold: {best_threshold_result['f1_score']:.4f}")
print(f"  Precision: {best_threshold_result['precision']:.4f}")
print(f"  Recall: {best_threshold_result['recall']:.4f}")

# Plot threshold optimization
thresholds_array = [r['threshold'] for r in all_threshold_results]
f1_scores = [r['f1_score'] for r in all_threshold_results]
precisions = [r['precision'] for r in all_threshold_results]
recalls = [r['recall'] for r in all_threshold_results]

plt.figure(figsize=(12, 8))
plt.plot(thresholds_array, f1_scores, label='F1-Score', marker='o')
plt.plot(thresholds_array, precisions, label='Precision', marker='s')
plt.plot(thresholds_array, recalls, label='Recall', marker='^')

# Mark optimal threshold
plt.axvline(x=optimal_threshold, color='red', linestyle='--', 
           label=f'Optimal Threshold ({optimal_threshold:.3f})')

plt.xlabel('Threshold', fontweight='bold')
plt.ylabel('Score', fontweight='bold')
plt.title('Threshold Optimization for Enhanced Model', fontweight='bold', pad=15)
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

## 7. Final Model Evaluation

In [None]:
# Evaluate enhanced model with optimal threshold
print("Evaluating enhanced model performance...")

# Training set evaluation
y_train_pred = enhanced_model.predict(X_train_selected, threshold=optimal_threshold)
train_metrics = compute_metrics(y_train, y_train_pred)

# Test set evaluation
y_test_pred = enhanced_model.predict(X_test_selected, threshold=optimal_threshold)
test_metrics = compute_metrics(y_test, y_test_pred)

# Print detailed results
print_detailed_metrics(train_metrics, "ENHANCED MODEL - TRAINING SET PERFORMANCE")
print_detailed_metrics(test_metrics, "ENHANCED MODEL - TEST SET PERFORMANCE")

# Comparison summary
print(f"\n" + "=" * 80)
print(f"ENHANCED MODEL SUMMARY")
print(f"=" * 80)

print(f"\nKey Improvements Applied:")
print(f"  ✓ Used full training data (no undersampling)")
print(f"  ✓ Feature selection: {X_train_poly.shape[1]} → {X_train_selected.shape[1]} features")
print(f"  ✓ Strong L2 regularization (λ = {enhanced_model.regularization_strength})")
print(f"  ✓ Enhanced class weights: {enhanced_model.class_weights}")
print(f"  ✓ Optimized threshold: {optimal_threshold:.3f}")

print(f"\nFinal Test Results:")
print(f"  Accuracy:  {test_metrics['accuracy']:.4f}")
print(f"  Precision: {test_metrics['precision']:.4f}")
print(f"  Recall:    {test_metrics['recall']:.4f}")
print(f"  F1-Score:  {test_metrics['f1_score']:.4f}")

# Performance analysis
print(f"\nPerformance Analysis:")
if test_metrics['f1_score'] > 0.80:
    performance_level = "EXCELLENT"
elif test_metrics['f1_score'] > 0.70:
    performance_level = "GOOD"
elif test_metrics['f1_score'] > 0.60:
    performance_level = "FAIR"
else:
    performance_level = "NEEDS IMPROVEMENT"

print(f"  Overall Performance: {performance_level}")
print(f"  Fraud Detection Rate: {test_metrics['recall']:.1%}")
print(f"  False Alarm Rate: {test_metrics['false_positive_rate']:.3%}")

# Cost analysis for fraud detection
total_test_transactions = len(y_test)
actual_frauds = int(np.sum(y_test))
detected_frauds = test_metrics['tp']
missed_frauds = test_metrics['fn']
false_alarms = test_metrics['fp']

print(f"\nFraud Detection Impact:")
print(f"  Total test transactions: {total_test_transactions:,}")
print(f"  Actual frauds: {actual_frauds}")
print(f"  Frauds detected: {detected_frauds} ({detected_frauds/actual_frauds:.1%})")
print(f"  Frauds missed: {missed_frauds} ({missed_frauds/actual_frauds:.1%})")
print(f"  False alarms: {false_alarms}")

print(f"\nModel ready for deployment!")

## Save final model and results

In [None]:
# Save final model components
print("\nSaving final model components...")

# Save model parameters
np.save('enhanced_model_weights.npy', enhanced_model.weights)
np.save('enhanced_model_bias.npy', enhanced_model.bias)
np.save('selected_features.npy', selected_features)
np.save('optimal_threshold.npy', optimal_threshold)

# Save results
np.save('test_predictions.npy', y_test_pred)
np.save('test_probabilities.npy', enhanced_model.predict_proba(X_test_selected))

print(f"\nFiles saved:")
print(f"  - enhanced_model_weights.npy")
print(f"  - enhanced_model_bias.npy")
print(f"  - selected_features.npy")
print(f"  - optimal_threshold.npy")
print(f"  - test_predictions.npy")
print(f"  - test_probabilities.npy")

print(f"\n" + "=" * 80)
print(f"ENHANCED FRAUD DETECTION MODEL COMPLETED SUCCESSFULLY!")
print(f"=" * 80)
print(f"\nKey achievements:")
print(f"  ✓ Improved F1-Score: {test_metrics['f1_score']:.4f}")
print(f"  ✓ High Recall: {test_metrics['recall']:.4f} (fraud detection rate)")
print(f"  ✓ Balanced Performance with enhanced features")
print(f"  ✓ Production-ready model saved")