In [21]:
import pyspark
from pyspark.sql import SparkSession
import math
import time

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CreditCardFraud_LowLevel") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

sc = spark.sparkContext

## Data Loading
We load the credit card fraud dataset into an RDD using low-level Spark operations:
- Read the CSV file directly with `textFile`.
- Filter out the header row to process only data rows.
- Print the total number of records for verification.

In [22]:
# Path to the credit card dataset in Kaggle
credit_card_path = "/kaggle/input/creditcardfraud/creditcard.csv"

# Load the CSV file as a text file and filter out the header
lines_rdd = sc.textFile(credit_card_path)
header = lines_rdd.first()
data_rdd_raw = lines_rdd.filter(lambda line: line != header)

# Quick count of the raw data
print(f"Total number of records: {data_rdd_raw.count()}")

Total number of records: 284807


## Data Parsing and Initial Analysis
We parse the raw text data into features and labels, and analyze the dataset:
- Convert each line into a tuple of (features, label).
- Cache the parsed RDD for efficiency.
- Analyze class distribution to understand the dataset's imbalance (fraud vs. non-fraud).
- Print the total valid records, number of features, class distribution, and a sample record.

In [23]:
# Parse each line into features and label
def parse_line(line):
    try:
        parts = [float(x.strip().replace('"', '')) for x in line.split(",")]
        features = parts[:-1]  # All columns except the last one
        label = parts[-1]      # Last column is the label (Class)
        return (features, label)
    except Exception as e:
        return None

parsed_data_rdd = data_rdd_raw.map(parse_line).filter(lambda x: x is not None).cache()
num_features = len(parsed_data_rdd.first()[0])
total_count = parsed_data_rdd.count()

print(f"Total valid records: {total_count}")
print(f"Number of features: {num_features}")

# Check class distribution (important for imbalanced dataset)
class_counts = parsed_data_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b).collectAsMap()
print(f"Class distribution: {class_counts}")
print(f"Percentage of fraudulent transactions: {class_counts.get(1.0, 0)/total_count*100:.4f}%")

# Show a sample record
print("\nSample record (features, label):")
print(parsed_data_rdd.first())

Total valid records: 284807
Number of features: 30
Class distribution: {0.0: 284315, 1.0: 492}
Percentage of fraudulent transactions: 0.1727%

Sample record (features, label):
([0.0, -1.3598071336738, -0.0727811733098497, 2.53634673796914, 1.37815522427443, -0.338320769942518, 0.462387777762292, 0.239598554061257, 0.0986979012610507, 0.363786969611213, 0.0907941719789316, -0.551599533260813, -0.617800855762348, -0.991389847235408, -0.311169353699879, 1.46817697209427, -0.470400525259478, 0.207971241929242, 0.0257905801985591, 0.403992960255733, 0.251412098239705, -0.018306777944153, 0.277837575558899, -0.110473910188767, 0.0669280749146731, 0.128539358273528, -0.189114843888824, 0.133558376740387, -0.0210530534538215, 149.62], 0.0)


## Feature Scaling
We apply min-max normalization to scale features to the [0,1] range for faster convergence:
- Compute min and max for each feature using RDD operations.
- Broadcast min/max values to all worker nodes for efficiency.
- Normalize features and cache the scaled RDD.
- Print a sample scaled record and unpersist the unscaled data.

In [24]:
# Collect min and max for each feature using mapPartitions for better performance
def compute_min_max(iterator):
    local_min_max = ([float('inf')] * num_features, [float('-inf')] * num_features)
    for record in iterator:
        features = record[0]
        for i in range(num_features):
            local_min_max[0][i] = min(local_min_max[0][i], features[i])
            local_min_max[1][i] = max(local_min_max[1][i], features[i])
    yield local_min_max

# Aggregate min/max across all partitions
min_max_stats = parsed_data_rdd.mapPartitions(compute_min_max).reduce(
    lambda x, y: (
        [min(x[0][i], y[0][i]) for i in range(num_features)],
        [max(x[1][i], y[1][i]) for i in range(num_features)]
    )
)

min_features = min_max_stats[0]
max_features = min_max_stats[1]

# Calculate feature ranges, ensuring no division by zero
epsilon = 1e-8  # Small value to prevent division by zero
feature_ranges = []
for i in range(num_features):
    range_val = max_features[i] - min_features[i]
    feature_ranges.append(max(range_val, epsilon))

# Broadcast min, max, and ranges to all worker nodes
bc_min_features = sc.broadcast(min_features)
bc_max_features = sc.broadcast(max_features)
bc_feature_ranges = sc.broadcast(feature_ranges)

# Scale the features using min-max normalization
def scale_record(record):
    features, label = record
    scaled_features = []
    for i in range(len(features)):
        scaled_val = (features[i] - bc_min_features.value[i]) / bc_feature_ranges.value[i]
        scaled_features.append(scaled_val)
    return (scaled_features, label)

scaled_data_rdd = parsed_data_rdd.map(scale_record).cache()

print("Sample Scaled Record (Features, Label):")
print(scaled_data_rdd.first())

# Unpersist the unscaled data
parsed_data_rdd.unpersist()

Sample Scaled Record (Features, Label):
([0.0, 0.9351923374337303, 0.7664904186403037, 0.8813649032863348, 0.31302265906669463, 0.7634387348529242, 0.2676686424971201, 0.26681517599177856, 0.7864441979341067, 0.4753117341039581, 0.5106004821833838, 0.25248431906394647, 0.6809076254567205, 0.3715906024604766, 0.6355905300192973, 0.4460836956482719, 0.4343923913601106, 0.7371725526870235, 0.6550658609829579, 0.5948632283047696, 0.5829422304973765, 0.5611843885604425, 0.5229921162596571, 0.6637929753279846, 0.3912526763768729, 0.5851217945036548, 0.39455679156287454, 0.4189761351972912, 0.3126966335786978, 0.0058237930868049554], 0.0)


PythonRDD[4] at RDD at PythonRDD.scala:53

## Train-Test Split and Class Weighting
We prepare the data for training by:
- Splitting the scaled data into training (80%) and test (20%) sets with a fixed seed for reproducibility.
- Caching the split RDDs for efficiency.
- Calculating class weights to handle imbalance (higher weight for the minority class, fraud).
- Broadcasting class weights to all worker nodes.
- Printing the sizes of the split sets and class weights.

In [25]:
# Randomly split data into training (80%) and testing (20%) sets
train_rdd, test_rdd = scaled_data_rdd.randomSplit([0.8, 0.2], seed=42)
train_rdd.cache()
test_rdd.cache()

train_count = train_rdd.count()
test_count = test_rdd.count()
print(f"Training set size: {train_count}")
print(f"Test set size: {test_count}")

# Unpersist the full scaled data RDD
scaled_data_rdd.unpersist()

# Calculate class weights on the training data
train_class_counts = train_rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b).collectAsMap()
count_class_0 = train_class_counts.get(0.0, 0)
count_class_1 = train_class_counts.get(1.0, 0)

# Compute weights: larger weight for the minority class
total = count_class_0 + count_class_1
if count_class_0 == 0 or count_class_1 == 0:
    print("Warning: One class is missing in the training set. Using equal weights.")
    class_weights = {0.0: 1.0, 1.0: 1.0}
else:
    # Simple inverse frequency weighting
    class_weights = {
        0.0: total / (2.0 * count_class_0),
        1.0: total / (2.0 * count_class_1)
    }

print(f"Class distribution in training set - 0: {count_class_0}, 1: {count_class_1}")
print(f"Class weights - 0: {class_weights[0.0]:.4f}, 1: {class_weights[1.0]:.4f}")

# Broadcast class weights to all worker nodes
bc_class_weights = sc.broadcast(class_weights)

Training set size: 228163
Test set size: 56644
Class distribution in training set - 0: 227769, 1: 394
Class weights - 0: 0.5009, 1: 289.5470


## Logistic Regression Helper Functions
We define core functions for logistic regression:
- `sigmoid`: Computes the sigmoid function with overflow protection.
- `dot_product`: Calculates the dot product of two vectors.
- `predict`: Predicts probabilities using weights and sigmoid.
- `compute_gradient_and_loss`: Computes the gradient and loss for a single record, including L2 regularization and class weighting.
These functions are essential for the gradient descent implementation.

In [26]:
def sigmoid(z):
    """Sigmoid function with simple overflow protection"""
    if z < -500:
        return 0.0
    elif z > 500:
        return 1.0
    else:
        return 1.0 / (1.0 + math.exp(-z))

def dot_product(vec1, vec2):
    """Compute dot product of two vectors (lists)"""
    return sum(v1 * v2 for v1, v2 in zip(vec1, vec2))

def predict(features, weights):
    """Predict probability using dot product and sigmoid"""
    # Add a 1.0 for the bias term
    features_with_bias = [1.0] + features
    z = dot_product(features_with_bias, weights)
    return sigmoid(z)

def compute_gradient_and_loss(record, weights, reg_param):
    """Compute gradient and loss for a single record with regularization"""
    features, label = record
    features_with_bias = [1.0] + features  # Add bias term
    
    # Compute prediction
    prediction = predict(features, weights)
    
    # Get the weight for this class
    class_weight = bc_class_weights.value.get(label, 1.0)
    
    # Compute error
    error = prediction - label
    
    # Compute gradient for each feature
    gradient = [class_weight * error * x for x in features_with_bias]
    
    # Add L2 regularization to all weights except bias
    for i in range(1, len(weights)):
        gradient[i] += reg_param * weights[i]
    
    # Compute log loss with epsilon to avoid log(0)
    epsilon = 1e-9
    if label == 1.0:
        loss = -class_weight * math.log(prediction + epsilon)
    else:
        loss = -class_weight * math.log(1 - prediction + epsilon)
        
    # Add L2 regularization term to loss (exclude bias term)
    reg_loss = 0.0
    for i in range(1, len(weights)):
        reg_loss += 0.5 * reg_param * (weights[i] ** 2)
        
    return (gradient, loss + reg_loss)

## Gradient Descent Implementation
We implement gradient descent to train the logistic regression model:
- Initialize weights (including bias) to zeros.
- Set parameters: learning rate, max iterations, L2 regularization, and convergence tolerance.
- Iterate over the training data, computing gradients and loss per partition.
- Update weights, check for convergence, and print progress every 5 iterations.
- Print the final weights after training.

In [27]:
# Initialize weights (including bias term)
num_features_with_bias = num_features + 1  # Add 1 for bias term
initial_weights = [0.0] * num_features_with_bias

# Gradient Descent Parameters
learning_rate = 0.1
max_iterations = 50
reg_param = 0.01  # L2 regularization strength
tolerance = 1e-4  # Convergence threshold

# Training loop
current_weights = initial_weights.copy()
previous_loss = float('inf')
iteration_stats = []

print(f"\nStarting Low-Level Logistic Regression Training...")
print(f"Parameters: Learning Rate={learning_rate}, L2 Reg={reg_param}, Max Iterations={max_iterations}")

for iteration in range(max_iterations):
    start_time = time.time()
    
    # Process each partition to compute local gradients and losses
    def process_partition(iterator):
        local_gradients = [0.0] * num_features_with_bias
        local_loss = 0.0
        local_count = 0
        
        for record in iterator:
            grad, loss = compute_gradient_and_loss(record, current_weights, reg_param)
            # Add to local gradient sum
            local_gradients = [local_gradients[i] + grad[i] for i in range(num_features_with_bias)]
            local_loss += loss
            local_count += 1
            
        yield (local_gradients, local_loss, local_count)
    
    # Map partitions and reduce results
    result = train_rdd.mapPartitions(process_partition).reduce(
        lambda x, y: (
            [x[0][i] + y[0][i] for i in range(num_features_with_bias)],
            x[1] + y[1],
            x[2] + y[2]
        )
    )
    
    total_gradients, total_loss, count = result
    
    # Average the gradients and loss
    avg_gradients = [g / count for g in total_gradients]
    avg_loss = total_loss / count
    
    # Update weights using gradient descent
    for i in range(num_features_with_bias):
        current_weights[i] -= learning_rate * avg_gradients[i]
    
    # Compute time taken
    elapsed_time = time.time() - start_time
    
    # Check for convergence
    loss_change = abs(avg_loss - previous_loss)
    iteration_stats.append((iteration+1, avg_loss, loss_change, elapsed_time))
    
    # Print progress periodically
    if (iteration+1) % 5 == 0 or iteration == 0 or iteration == max_iterations-1:
        print(f"Iteration {iteration+1}/{max_iterations} | Loss: {avg_loss:.6f} | Change: {loss_change:.6f} | Time: {elapsed_time:.2f}s")
    
    if loss_change < tolerance:
        print(f"\nConverged at iteration {iteration+1}. Loss change below tolerance ({tolerance}).")
        break
        
    previous_loss = avg_loss

print("\n--- Training Complete ---")
print(f"Final Weights: {current_weights}")


Starting Low-Level Logistic Regression Training...
Parameters: Learning Rate=0.1, L2 Reg=0.01, Max Iterations=50
Iteration 1/50 | Loss: 0.693147 | Change: inf | Time: 2.59s
Iteration 5/50 | Loss: 0.685516 | Change: 0.001805 | Time: 2.97s
Iteration 10/50 | Loss: 0.676894 | Change: 0.001684 | Time: 2.63s
Iteration 15/50 | Loss: 0.668715 | Change: 0.001605 | Time: 2.71s
Iteration 20/50 | Loss: 0.660914 | Change: 0.001531 | Time: 2.70s
Iteration 25/50 | Loss: 0.653470 | Change: 0.001461 | Time: 2.63s
Iteration 30/50 | Loss: 0.646365 | Change: 0.001395 | Time: 2.69s
Iteration 35/50 | Loss: 0.639582 | Change: 0.001332 | Time: 2.64s
Iteration 40/50 | Loss: 0.633104 | Change: 0.001272 | Time: 2.64s
Iteration 45/50 | Loss: 0.626917 | Change: 0.001215 | Time: 2.70s
Iteration 50/50 | Loss: 0.621005 | Change: 0.001161 | Time: 2.67s

--- Training Complete ---
Final Weights: [0.06734645501449467, -0.05563508104503643, -0.0307177798823093, 0.09482797381106739, -0.08483287211062569, 0.249190080901662

## Model Evaluation
We evaluate the model on the test set using a threshold of 0.5:
- Compute the confusion matrix (TP, FP, TN, FN).
- Calculate accuracy, precision, recall, and F1-score.
- Print the results, focusing on recall (sensitivity) as it's critical for fraud detection.

In [28]:
def evaluate_model(test_data, weights, threshold=0.5):
    """Evaluate the model on test data"""
    # Function to predict and compare with actual label
    def predict_and_evaluate(record):
        features, actual_label = record
        prob = predict(features, weights)
        predicted_label = 1.0 if prob >= threshold else 0.0
        return (predicted_label, actual_label, prob)
    
    # Get predictions for all test records
    predictions = test_data.map(predict_and_evaluate)
    predictions.cache()
    
    # Calculate confusion matrix counts
    tp = predictions.filter(lambda x: x[0] == 1.0 and x[1] == 1.0).count()
    fp = predictions.filter(lambda x: x[0] == 1.0 and x[1] == 0.0).count()
    tn = predictions.filter(lambda x: x[0] == 0.0 and x[1] == 0.0).count()
    fn = predictions.filter(lambda x: x[0] == 0.0 and x[1] == 1.0).count()
    
    # Calculate metrics
    total = tp + tn + fp + fn
    accuracy = (tp + tn) / total if total > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Unpersist predictions RDD
    predictions.unpersist()
    
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "confusion_matrix": {"tp": tp, "fp": fp, "tn": tn, "fn": fn}
    }

# Evaluate model with default threshold of 0.5
print("\n--- Evaluating Model on Test Set ---")
evaluation = evaluate_model(test_rdd, current_weights, threshold=0.5)

print("Results with threshold=0.5:")
print(f"Accuracy:  {evaluation['accuracy']:.4f}")
print(f"Precision: {evaluation['precision']:.4f}")
print(f"Recall:    {evaluation['recall']:.4f} (Sensitivity)")
print(f"F1-Score:  {evaluation['f1_score']:.4f}")
print("Confusion Matrix:")
print(f"  True Positives:  {evaluation['confusion_matrix']['tp']}")
print(f"  False Positives: {evaluation['confusion_matrix']['fp']}")
print(f"  True Negatives:  {evaluation['confusion_matrix']['tn']}")
print(f"  False Negatives: {evaluation['confusion_matrix']['fn']}")


--- Evaluating Model on Test Set ---
Results with threshold=0.5:
Accuracy:  0.9992
Precision: 0.8400
Recall:    0.6429 (Sensitivity)
F1-Score:  0.7283
Confusion Matrix:
  True Positives:  63
  False Positives: 12
  True Negatives:  56534
  False Negatives: 35


## Threshold Tuning and AUC Calculation
We explore model performance further:
- Test different decision thresholds (0.1 to 0.9) to balance precision and recall, as the default 0.5 may not be optimal for imbalanced data.
- Calculate the Area Under the ROC Curve (AUC) using an approximation with the trapezoidal rule, a key metric for imbalanced datasets.
- Print the results for each threshold and the AUC value.

In [29]:
# Try different thresholds to find better precision-recall balance
print("\n--- Evaluating Different Thresholds ---")
thresholds = [0.1, 0.3, 0.5, 0.7, 0.9]
for threshold in thresholds:
    eval_result = evaluate_model(test_rdd, current_weights, threshold)
    print(f"Threshold={threshold:.1f} | Precision: {eval_result['precision']:.4f} | Recall: {eval_result['recall']:.4f} | F1: {eval_result['f1_score']:.4f}")

# Calculate AUC
def calculate_auc_approximation(test_data, weights):
    """Calculate an approximation of the AUC using discrete thresholds"""
    # Function to get prediction probabilities and actual labels
    def get_prediction_score(record):
        features, actual_label = record
        prob = predict(features, weights)
        return (prob, actual_label)
    
    # Get prediction scores and sort them
    pred_scores = test_data.map(get_prediction_score).collect()
    pred_scores.sort(key=lambda x: x[0], reverse=True)
    
    # Initialize counters
    num_positive = sum(1 for _, label in pred_scores if label == 1.0)
    num_negative = len(pred_scores) - num_positive
    
    if num_positive == 0 or num_negative == 0:
        print("Warning: Only one class present in test set. AUC calculation not possible.")
        return 0.0
    
    # Initialize the area
    auc = 0.0
    tp = 0
    fp = 0
    prev_fp = 0
    prev_tp = 0
    
    # Process each prediction
    for prob, label in pred_scores:
        if label == 1.0:
            tp += 1
        else:
            fp += 1
            
        # Add trapezoid area under the curve
        if fp > prev_fp:
            auc += (tp + prev_tp) * (fp - prev_fp) / (2.0 * num_positive * num_negative)
            prev_fp = fp
            prev_tp = tp
    
    return auc

print("\n--- AUC Approximation ---")
auc = calculate_auc_approximation(test_rdd, current_weights)
print(f"Area Under ROC Curve (AUC): {auc:.4f}")


--- Evaluating Different Thresholds ---
Threshold=0.1 | Precision: 0.0017 | Recall: 1.0000 | F1: 0.0035
Threshold=0.3 | Precision: 0.0017 | Recall: 1.0000 | F1: 0.0035
Threshold=0.5 | Precision: 0.8400 | Recall: 0.6429 | F1: 0.7283
Threshold=0.7 | Precision: 0.0000 | Recall: 0.0000 | F1: 0.0000
Threshold=0.9 | Precision: 0.0000 | Recall: 0.0000 | F1: 0.0000

--- AUC Approximation ---
Area Under ROC Curve (AUC): 0.9617


## Cleanup and Shutdown
We clean up resources to free memory and stop the Spark session:
- Unpersist cached RDDs (`train_rdd`, `test_rdd`).
- Unpersist broadcast variables.
- Stop the Spark session to ensure proper shutdown.

In [30]:
# Clean up

train_rdd.unpersist()
test_rdd.unpersist()
bc_min_features.unpersist()
bc_max_features.unpersist()
bc_feature_ranges.unpersist()
bc_class_weights.unpersist()

# Stop Spark session
print("--- Stopping Spark Session ---")
spark.stop()

--- Stopping Spark Session ---
