In [1]:
import os

try:
    from google.colab import drive
    drive.mount('/content/drive')
    os.chdir("/content/drive/MyDrive/Credit Card Fraud Analysis")
    print("Working directory set to:", os.getcwd())
except ModuleNotFoundError:
    print("Google Colab-specific setup skipped.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Working directory set to: /content/drive/MyDrive/Credit Card Fraud Analysis


In [2]:
!pip install -r requirements.txt



In [3]:
import tensorflow as tf

gpu_devices = tf.config.list_physical_devices('GPU')
print(f"Number of available GPUs: {len(gpu_devices)}")

Number of available GPUs: 0


In [4]:
import os
import pretty_errors
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import classification_report
from src.processing import LogTransformer, PairwiseTransformer

# Setting and data processing

In [5]:
spark = SparkSession \
    .builder \
    .appName("credit_card_fraud_analysis") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "32g") \
    .getOrCreate()

In [6]:
data = spark.read.csv("inputs/processed_data", header=True, inferSchema=True)
data.show(5)

+--------------------+-------------+------+------+----------------+-----+-------+---------+--------+--------------------+------------------+-----------+--------+-------+---+-----------+----------+-----------------+------------------+-----------------+-----------+
|            merchant|     category|   amt|gender|            city|state|    lat|     long|city_pop|                 job|         merch_lat| merch_long|is_fraud|portion|age|trans_month|trans_hour|trans_day_of_week|trans_day_of_month|trans_day_of_year|distance_km|
+--------------------+-------------+------+------+----------------+-----+-------+---------+--------+--------------------+------------------+-----------+--------+-------+---+-----------+----------+-----------------+------------------+-----------------+-----------+
|fraud_Raynor, Rei...|gas_transport| 61.15|     M|   West Hartford|   VT|43.7185| -72.4439|     140|Development worke...|         44.208355| -72.133812|       0|  train| 35|          8|        10|            

In [7]:
# Split data into training and testing sets
train_data = data.where(col('portion') == 'train')
test_data = data.where(col('portion') == 'test')

In [8]:
del data

In [9]:
from src.processing import calculate_distribution

train_sample = train_data.sampleBy('is_fraud', fractions={0: 0.10, 1: 1.0}, seed=123)
print('trainset proportions: ')
calculate_distribution(train_sample, 'is_fraud')

print('testset proportions: ')
calculate_distribution(test_data, 'is_fraud')

trainset proportions: 
+--------+------+-------------------+
|is_fraud| count|         proportion|
+--------+------+-------------------+
|       1|  7506|0.05508101444170482|
|       0|128766| 0.9449189855582951|
+--------+------+-------------------+

testset proportions: 
+--------+------+--------------------+
|is_fraud| count|          proportion|
+--------+------+--------------------+
|       1|  2145|0.003859864427885...|
|       0|553574|  0.9961401355721147|
+--------+------+--------------------+



In [10]:
cols_to_drop = ['portion', 'lat', 'long', 'merch_lat', 'merch_long', 'distance_km', 'state', 'city']
train_sample = train_sample.drop(*cols_to_drop)

### Process categorical columns

In [11]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer, StandardScaler

categorical_columns = [col for col, dtype in train_sample.dtypes if dtype == 'string']
categorical_columns

['merchant', 'category', 'gender', 'job']

In [12]:
categorical_index_cols = [f"{col}_index" for col in categorical_columns]
categorical_ohe_cols = [f"{col}_ohe" for col in categorical_columns]

string_indexer = StringIndexer(
    inputCols=categorical_columns,
    outputCols=categorical_index_cols,
    handleInvalid='keep'
)

one_hot_encoder = OneHotEncoder(
    inputCols=categorical_index_cols,
    outputCols=categorical_ohe_cols,
    #handleInvalid='keep'
)


### Process numerical columns

In [13]:
numerical_columns = [col for col in train_sample.columns if col not in categorical_columns and col not in ['is_fraud']]
numerical_columns

['amt',
 'city_pop',
 'age',
 'trans_month',
 'trans_hour',
 'trans_day_of_week',
 'trans_day_of_month',
 'trans_day_of_year']

In [14]:
log_transformed_cols = [f"ln_{col}" for col in numerical_columns]

log_transformer = LogTransformer(
    input_cols=numerical_columns,
    output_cols=log_transformed_cols
)

pairwise_transformer = PairwiseTransformer(inputCols=numerical_columns)

In [15]:
assembler_input_cols = (
    numerical_columns +
    log_transformer.get_output_cols() +
    pairwise_transformer.getOutputCols() +
    categorical_ohe_cols
)

assembler = VectorAssembler(
    inputCols=assembler_input_cols,
    outputCol="final_features",
    handleInvalid='skip'
)

# Pipeline training

In [16]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from xgboost.spark import SparkXGBClassifier


gbt_classifier = GBTClassifier(featuresCol='final_features',
                               labelCol='is_fraud',
                               seed=0)

xgboost =  SparkXGBClassifier(features_col='final_features',
                              label_col='is_fraud',
                              num_workers=1 if gpu_devices else 2,
                              device='cuda' if gpu_devices else 'cpu',
                              tree_method='gpu_hist',
                              verbosity=0,
                              seed=0)

models = {'XGBoost': xgboost,
          'GBTClassifier': gbt_classifier,
          }

In [None]:
from pyspark.ml import Pipeline
from tqdm import tqdm

trained_models = dict()

for model_name, model in tqdm(models.items(), desc='Training models'):
    tqdm.write(f'\nTraining model: {model_name} \n')

    pl = Pipeline(stages=[
        string_indexer,
        one_hot_encoder,
        log_transformer,
        pairwise_transformer,
        assembler,
        model])

    trained_models[model_name] = pl.fit(train_sample)

In [18]:
from src.metrics import generate_model_results

models_results = generate_model_results(trained_models, test_data)

Generating models results:   0%|          | 0/2 [00:00<?, ?it/s]


Area under ROC curve for XGBoost: 0.9965


Classification report for XGBoost: 
              precision    recall  f1-score   support

           0       1.00      1.00      1.00    535609
           1       0.62      0.85      0.72      1957

    accuracy                           1.00    537566
   macro avg       0.81      0.93      0.86    537566
weighted avg       1.00      1.00      1.00    537566



Generating models results:  50%|█████     | 1/2 [01:41<01:41, 101.86s/it]


Area under ROC curve for GBTClassifier: 0.9841


Classification report for GBTClassifier: 
              precision    recall  f1-score   support

           0       1.00      1.00      1.00    535609
           1       0.58      0.78      0.67      1957

    accuracy                           1.00    537566
   macro avg       0.79      0.89      0.83    537566
weighted avg       1.00      1.00      1.00    537566



Generating models results: 100%|██████████| 2/2 [02:21<00:00, 70.56s/it]


In [17]:
final_model = models['GBTClassifier']

# Optimizing

In [18]:
import numpy as np
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='is_fraud', metricName='f1')

param_dict = {
    'maxDepth': list(range(4, 16)),       # Controls tree depth (4 to 15).
    'maxBins': [16, 32, 64, 128],        # Number of bins for continuous features.
    'stepSize': [0.01, 0.05, 0.1, 0.2],  # Learning rate for gradient boosting.
    'maxIter': [10, 50, 100],            # Number of boosting iterations (trees).
    'subsamplingRate': [0.5, 0.7, 1.0],  # Fraction of data used for each tree.
    'minInstancesPerNode': [1, 5, 10],   # Minimum data points per tree node.
    'minInfoGain': [0.0, 0.01, 0.1]      # Minimum gain required for a split.
}

print(param_dict)


pipeline = Pipeline(stages=[
                    string_indexer,
                    one_hot_encoder,
                    log_transformer,
                    pairwise_transformer,
                    assembler,
                    final_model
    ])

{'maxDepth': [4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 'maxBins': [16, 32, 64, 128], 'stepSize': [0.01, 0.05, 0.1, 0.2], 'maxIter': [10, 50, 100], 'subsamplingRate': [0.5, 0.7, 1.0], 'minInstancesPerNode': [1, 5, 10], 'minInfoGain': [0.0, 0.01, 0.1]}


In [34]:
import time
import random
import logging
import matplotlib.pyplot as plt
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from tqdm import tqdm

logger = logging.getLogger()

class GeneticAlgorithm:
    """
    A class implementing a Genetic Algorithm for hyperparameter optimization of PySpark machine learning models.

    Attributes:
        estimator (object): The PySpark MLlib estimator (e.g., Regression, Classifier) to optimize.
        parameters_ranges (dict): A dictionary containing parameter names as keys and their possible values as lists.
        train_data (DataFrame): Training data as a PySpark DataFrame.
        evaluator (object): PySpark evaluator for model evaluation (e.g., MulticlassClassificationEvaluator).
        size_of_population (int): The number of chromosomes (individuals) in the population.
        fitness_limit (float): The target fitness value to stop the algorithm.
        time_limit (float): The maximum runtime for the algorithm in minutes.
        probability (float): Probability of mutation for each gene.
        mutation_number (int): Number of mutations per chromosome.
    """

    def __init__(self, pipeline, evaluator, parameters_ranges, train_data, size_of_population,
                 score_limit, time_limit, probability=0.1, mutation_number=1, spark=None):
        """
        Initializes the GeneticAlgorithm class with the given parameters.
        """
        self.pipeline = pipeline.copy()
        self.parameters_ranges = parameters_ranges
        self.train_data = train_data
        self.evaluator = evaluator
        self.size_of_population = size_of_population
        self.probability = probability
        self.mutation_number = mutation_number
        self.fitness_limit = score_limit
        self.time_limit = time_limit
        self.scores = []  # Tracks the best score per generation
    "---------------------------------------------------------------------------utils--------------------------------------------------------------------------------"
    def get_scores(self):
        return self.scores

    def generate_chromosome(self):
        return {key: random.choice(values) for key, values in self.parameters_ranges.items()}

    def generate_population(self):
        return [self.generate_chromosome() for _ in range(self.size_of_population)]

    "---------------------------------------------------------------------------genetic algo functions---------------------------------------------------------------"
    def mean_score_population(self, population):
        return np.mean([self.fitness_function(chromosome) for chromosome in population])

    def sort_population(self, population):
        return sorted(population, key = lambda chromosome: self.fitness_function(chromosome), reverse = True)

    def selection_pair(self, population):
        sorted_population = self.sort_population(population)
        return sorted_population[:2]

    def uniform_crossover(self, parent1, parent2):
        child1, child2 = parent1.copy(), parent2.copy()
        for param in self.parameters_ranges.keys():
            if random.random() < 0.5:
                child1[param], child2[param] = child2[param], child1[param]
        return child1, child2

    def mutation(self, chromosome):
        mutant = chromosome.copy()
        for _ in range(self.mutation_number):
            if random.random() < self.probability:
                param = random.choice(list(self.parameters_ranges.keys()))
                mutant[param] = random.choice(self.parameters_ranges[param])
        return mutant

    def fitness_function(self, chromosome):
        estimator = self.pipeline.getStages()[-1]
        estimator.setParams(**chromosome)

        new_stages = self.pipeline.getStages()[:-1] + [estimator]
        pipeline = Pipeline(stages=new_stages)

        paramGrid = ParamGridBuilder().build()
        crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=self.evaluator, numFolds=5)
        cvModel = crossval.fit(self.train_data)
        f1_score = self.evaluator.evaluate(cvModel.bestModel.transform(self.train_data))
        return f1_score


    def run(self):
        """
        Runs the genetic algorithm to optimize the hyperparameters of the model.
        """
        print('Starting genetic algorithm')
        population = self.generate_population()

        print('First Population generated')
        population = self.sort_population(population)

        print('Starting hyper-parameters tuning!')
        time_limit_seconds = self.time_limit * 60

        start_time = time.time()
        while (self.fitness_function(population[0]) < self.score_limit):

            parent1, parent2 = self.selection_pair(population)
            offspring1, offspring2 = self.uniform_crossover(parent1, parent2)
            population = [offspring1, offspring2, parent1, parent2]

            offspring1 = self.mutation(offspring1)
            offspring2 = self.mutation(offspring2)
            population.extend([offspring1, offspring2])

            population = self.sort_population(population)
            best_score = self.fitness_function(population[0])
            self.scores.append(best_score)

            if len(self.scores) > 1 and best_score > self.scores[-2]:
              logger.info(f'Score improved from {self.scores[-2]} to {best_score}')

            if (time.time() - start_time > time_limit_seconds):
              logger.info('Time limit reached')
              break

        population = self.sort_population(population)
        return population

    "---------------------------------------------------------------------------plot functions--------------------------------------------------------------------------"
    def plot_generations_scores(self):
        plt.figure(figsize=(15, 10))
        plt.plot(self.scores, label='Max score per generation', color='seagreen')
        plt.xlabel('Best score per generation')
        plt.ylabel('5-Fold CV Score')
        plt.legend()
        plt.show()

In [36]:
genetic_algorithm = GeneticAlgorithm(pipeline=pipeline,
                                     evaluator=evaluator,
                                     train_data=train_sample,
                                     parameters_ranges=param_dict,
                                     size_of_population=6,
                                     time_limit=2,
                                     score_limit=0.8)

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
population = genetic_algorithm.run()

In [31]:
def fitness_function(chromosome, pipeline): # Add spark as argument
  estimator = pipeline.getStages()[-1]
  estimator.setParams(**chromosome)

  new_stages = pipeline.getStages()[:-1] + [estimator]
  pipeline = Pipeline(stages=new_stages)

  paramGrid = ParamGridBuilder().build()
  crossval = CrossValidator(estimator=pipeline, evaluator=evaluator, numFolds=2)
  cvModel = crossval.fit(train_data) # train_data is assumed to be accessible
  f1_score = evaluator.evaluate(cvModel.bestModel.transform(train_data))
  return f1_score

In [32]:
pipeline = Pipeline(stages=[
                    string_indexer,
                    one_hot_encoder,
                    log_transformer,
                    pairwise_transformer,
                    assembler,
                    final_model
    ])

In [33]:
fitness_function(chromosome = {
          'maxDepth': 1},
           pipeline = pipeline,)

KeyError: Param(parent='CrossValidator_5a5170ee14bd', name='estimatorParamMaps', doc='estimator param maps')