# <center>Credit Card Fraud Analysis Using Spark MLlib</center>

## Introduction

Credit card fraud increased by 10 percent from 2020 - 2021 and an estimated loss of over 30 Billion dollars [[1]](https://www.statista.com/statistics/1394119/global-card-fraud-losses/). The convenience of debit and credit cards has pushed cash payments into second place in consumer preference. A study in 2021 showed that 57% of payments in the UK were done via card payment with 32% of payments being contactless [[2]](https://www.ukfinance.org.uk/system/files/2022-08/UKF%20Payment%20Markets%20Summary%202022.pdf). Credit card fraud occurs when unauthorised access is gained to an individuals card to make purchases or other transactions [[3]](https://en.wikipedia.org/wiki/Credit_card_fraud). In 2021 it was identified as the second most common form of identity theft with 389,845 complaints [[4]](https://www.experian.com/blogs/ask-experian/identity-theft-statistics/).

Due to these alarming statistics and the magnitude of losses being incurred by credit card companies and merchants, there is an ever increasing need to be able to identify fraudulent transactions and stop them before they can go through. Historically, rule based fraud detection has been used, this took a pre programmed set of rules and used these to identify changes in behaviour that could be a sign of credit card fraud [[5]](https://fraud.net/d/rules-based-fraud-detection/). Due to the increases and variety of transactions and the increased sophistication of fraudsters these historic identity systems are no longer viable and in recent years machine learning and AI has increasingly been used in the fight against credit card fraud.

Fraud detection presents itself as a binary classification problem. Machine learning is used in credit card fraud detection by taking transactional datapoints and training models to detect the patterns in the data that can be used to identify fraudulent transactions. Generally supervised methods are used that train models on labelled datasets. These models can then be used on new unseen data to predict whether the transaction is real or fraudulent.

For this coursework I will analysis a credit card fraud data set to find a model suitable for fraud detection. This is a relevant subject to use due to the harm and financial damage it causes on both a personal level for victims and the wider global impact it has for companies and financial markets. This topic is suitable for this module as credit card fraud is a big data problem. Training the models involve a huge amount of data that will need to be analysed using big data methodologies such as parallel processing frameworks and distributed computing clusters. Due to the complexity of the subject, these models will need continuous tuning so the algorithms stay performing well on new data. The methodologies used in real time detection is also a big data problem with both the model training and real time fraud prediction both involving huge amounts of data and conforming to the typical 5 V's of big data [[6]](https://www.techtarget.com/searchdatamanagement/definition/5-Vs-of-big-data)

### Data Set

The data set I have used for my analysis comes from Kaggle and is called 'Credit Card Transactions Fraud Detection Dataset' [[7]](https://www.kaggle.com/datasets/kartik2112/fraud-detection). It is a simulated data set of credit card transactions for 1000 customers with all transactions taking place over the duration of 2019. On Kaggle the data set is split into a train and test data set, however for my analysis I have combined these whilst exploring and creating new features. 

In total the combined data set has 1,852,394 records and is c500mb in total. And although this is not particular big in the sense of big data my analysis and transformations will all be done using spark and big data methodologies that could be used on much larger data sets. From exploring the data in my EDA below, I found there are 10 numerical columns and 12 categorical. There are no nulls in the data set to deal with or duplicate records.

Predicting fraud presents and binary classification problem and due to the nature of fraud analysis the data set is highly imbalanced. There are 9,651 fraudulent records and 1,842,743 records labelled as not fraud. This means only 0.52% of the data is the fraud class. This brings a set of challenges to the modelling as training a model on imbalanced records can cause it to be biased towards the majority.

From my initial exploration the 'amt' column stands out as being one to explore - this is the transactional amount. The amounts are highly variable which is seen due to the standard deviation being much higher than the mean. The majority of the transactions are low with larger outliers causing the mean to be significantly higher than the median.

The data from this shows the median is lower than the mean because of high value outliers. It shows that 25% of the transactions are lower than or equal to 9.65 which shows a very large number of transactions are small. And 75% of the transactions are lower or equal to 83.09. My takeaway from this is the amount is highly variable and right skewed with the majority of the transactions being relatively low but a few high value transactions that push up the mean and standard deviation.

This is something I will be investigating later in conjunction with target variable as I would hypothesise the fraud transactions are likely to be smaller in value as high value transactions are more likely to be scrutinised.

Age is another variable I would like to explore further as part of my initial research into the subject suggests certain age groups are more susceptible to fraud. With the 30-39 age group being most likely [[8]](https://www.bankrate.com/finance/credit-cards/credit-card-fraud-statistics/#fraud),

Table showing summary statistics of the numerical variables in the data set

| summary | _c0       | amt      | zip      | lat      | long     | city_pop  | unix_time    | merch_lat | merch_long | is_fraud |
|---------|-----------|----------|----------|----------|----------|-----------|--------------|-----------|------------|----------|
| count   | 1852394.0 | 1852394.0| 1852394.0| 1852394.0| 1852394.0| 1852394.0 | 1852394.0    | 1852394.0 | 1852394.0  |1852394.0 |
| mean    | 537193.44 | 70.06    | 48813.26 | 38.54    | -90.23   | 88643.67  | 1.35867418E9 | 38.54     | -90.23     | 0.01     |
| stddev  | 366910.97 | 159.25   | 26881.85 | 5.07     | 13.75    | 301487.62 | 1.8195082E7  | 5.11      | 13.76      | 0.07     |
| min     | 0.0       | 1.0      | 1257.0   | 20.03    | -165.67  | 23.0      | 1.325376E9   | 19.03     | -166.67    | 0.0      |
| max     | 1296674.0 | 28948.9  | 99921.0  | 66.69    | -67.95   | 2906700.0 | 1.3885344E9  | 67.51     | -66.95     | 1.0      |


This shows 'amt' in more detail

| summary |               amt |
| ------- | ----------------- |
| count   |           1852394 |
| mean    | 70.06356747538618 |
| stddev  |  159.253974773983 |
| min     |               1.0 |
| 25%     |              9.65 |
| 50%     |             47.45 |
| 75%     |             83.09 |
| max     |           28948.9 |

As my analysis performs cross validation and gridsearch for hyperparameter tuning, the notebook takes a long time to run, therefore I have provided a very small sample set for marking purposes.

#### Dataset from Kaggle

https://www.kaggle.com/datasets/kartik2112/fraud-detection

#### Data sets stored in the following locations on hdfs

#### Full Data
hdfs:///user/rsmit001/CW2/fraudFiles/fraudTrain.csv

hdfs:///user/rsmit001/CW2/fraudFiles/fraudTrain.csv

#### Sample Data
hdfs:///user/rsmit001/CW2/fraudFiles/SAMPLE_fraudTrain_v2/part-00000-ce7b1ce2-c467-4c10-9451-31ed680a62c5-c000.csv

hdfs:///user/rsmit001/CW2/fraudFiles/SAMPLE_fraudTest_v2/part-00000-9c8ab8f3-8fb6-4fac-be83-528e13a62647-c000.csv

### Hypotheses and Empirical Tests

Forming part of my analysis I will be running a number of empirical tasks and testing hypotheses:

1. Sampling methods

To train an effective model I will need to apply sampling methods to balance out the datasets. I propose to test models where I have oversampled the data set and undersampled. Oversampling keeps all of the majority class and replicates records from the minority class to create a balanced data set. This prevents data loss because all original records are kept, however it does introduce bias as the minority records are repeated multiple times. Undersampling is the opposite and is done by deleting enough of the majority class to balance the data set that way, this though will cause a large amount of valuable data to be lost. I hypothesise that oversampling will produce better results as there will be a noteable amount more data to train the model with.

2. Testing different models

I propose to test 3 different models to see which performs best:

* Logistic regression
* Naive Bayes
* Random Forests

I hypothesise the best results will be achieved via a Random Forest model. The random forest algorithm is an ensemble method that uses bootstrapping methodology and creates multiple decision trees from the data and the averages the results to produce powerful predictions [[9]](https://towardsdatascience.com/random-forest-classification-678e551462f5). A benchmarking study in 2019 found that Random Forests performed better than logistic regression on 69% of the data sets tested [[10]](https://bmcbioinformatics.biomedcentral.com/articles/10.1186/s12859-018-2264-5)

3. Cross Validation and Gridsearch

I propose to use cross validation (CV) as part of the training pipeline. CV splits the data into k-folds so you can effecitvely use all the training data to train and test the models. This helps it to prevent overfitting and means the algorithms should generalise to new data more effectively.

Alongside CV I will use gridsearch to find the optimal hyperparameters to be used to train the final model. Gridsearch takes a given grid of hyperparameters and iterates through them performing CV on each combination. It then evaluates each model and gives the best set of hyperparameters as its final best model output.

4. Evaluation

After I've output the best model this will be used on the hold out test set to predict the target class. I will evaluate it's performance using a number of metrics. Due to the data being imbalanced certain methods such as accuracy will be misleading as you could predict all labels as not fraud and still get a 99.5% accuracy score. Evaluation methods such as AUC-ROC and AUC-PR will give a better sense of model performance.

### Planned Analysis

I plan to do all my analysis and coding using Pyspark. The data will be stored in HDFS and will use YARN for processing. I will be using Spark MLlib for model algorithms, pipeline tools and evaluation. This is my proposed pipeline:

* Exploratory data analysis
* Data cleaning and feature engineering
* Drop unnecessary columns
* Under/oversample the data 
* Build an ML Pipeline
    * One hot encoding
    * Transforming the data into a single vector for input into the model
    * scaling the data
    * CV/Gridearch
* Evaluate the best models

## Project Implementation

In [1]:
# imports 
from pyspark.sql import SparkSession, Row # imports SparkSession
import pyspark.sql.functions as F # functions for working with df's
from pyspark.sql.types import StringType, IntegerType, DoubleType # to help inspect the schema
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder \
        .appName('CCFraudAnalysis') \
        .master('yarn') \
        .config('spark.executor.memory', '4g') \
        .config('spark.executor.cores', '4') \
        .config('spark.cores.max', '4') \
        .config('spark.driver.memory','4g') \
        .getOrCreate()

In [3]:
# checks session is open
spark

### Note to marker:
Please change the code below to run on the sample sets included with my submission. It may still take a little while to run due to the gridsearch cv.

In [4]:
# load datasets
fraud_train = spark.read.csv('hdfs:///user/rsmit001/CW2/fraudFiles/fraudTrain.csv',header=True,inferSchema=True)
fraud_test = spark.read.csv('hdfs:///user/rsmit001/CW2/fraudFiles/fraudTest.csv',header=True,inferSchema=True)

The code below makes a sample of the full data sets to provide with my coursework submission so it can be run and tested by the markers. I've given a very small sample as the model training is time consuming due to gridsearch cross validation. This code is commented out as does not need to run each time.

In [5]:
# # creating sample, commenting out so it doesnt run when each time the notebook is run
# # used sampleby which returns a stratified sample to keep the ratio of fraud/not fraud
# # https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.sampleBy.html
# fractions = {0:0.001,1:0.001} # creates fraction of each to split by - I want even split
# stratified_fraud_train = fraud_train.sampleBy("is_fraud", fractions, seed=42)
# stratified_fraud_test = fraud_test.sampleBy("is_fraud", fractions, seed=42)

# # validation, checks the target class stays in the correct ratio
# print(stratified_fraud_train.filter(stratified_fraud_train.is_fraud==0).count()/stratified_fraud_train.count())
# print(stratified_fraud_test.filter(stratified_fraud_test.is_fraud==0).count()/stratified_fraud_test.count())

# # write to hdfs
# stratified_fraud_train.coalesce(1).write.csv('hdfs:///user/rsmit001/CW2/fraudFiles/'
#                                              'SAMPLE_fraudTrain_v2', header=True)
# stratified_fraud_test.coalesce(1).write.csv('hdfs:///user/rsmit001/CW2/fraudFiles/'
#                                             'SAMPLE_fraudTest_v2', header=True)

In [6]:
# False to run on full set, True to run on sample
SAMPLE = False
if SAMPLE:
    fraud_train = spark.read.csv('hdfs:///user/rsmit001/CW2/fraudFiles/SAMPLE_fraudTrain_v2/part-00000-ce7b1ce2'
                                 '-c467-4c10-9451-31ed680a62c5-c000.csv',header=True,inferSchema=True)
    fraud_test = spark.read.csv('hdfs:///user/rsmit001/CW2/fraudFiles/SAMPLE_fraudTest_v2/part-00000-9c8ab8f3-8fb6'
                                '-4fac-be83-528e13a62647-c000.csv',header=True,inferSchema=True)

### Basic data checks, exploration and statistics

I'm going to do a couple of basic checks on the data sets to make sure the data sets are the same:

In [7]:
# showing vertical as data looks very untidy horizontal
print(fraud_train.show(n=1,truncate=False,vertical=True)) # prints 1 record from fraud train

-RECORD 0-------------------------------------------------
 _c0                   | 0                                
 trans_date_trans_time | 2019-01-01 00:00:18              
 cc_num                | 2703186189652095                 
 merchant              | fraud_Rippin, Kub and Mann       
 category              | misc_net                         
 amt                   | 4.97                             
 first                 | Jennifer                         
 last                  | Banks                            
 gender                | F                                
 street                | 561 Perry Cove                   
 city                  | Moravian Falls                   
 state                 | NC                               
 zip                   | 28654                            
 lat                   | 36.0788                          
 long                  | -81.1781                         
 city_pop              | 3495                           

In [8]:
print(fraud_test.show(n=1,truncate=False,vertical=True)) # prints 1 record from fraud test

-RECORD 0-------------------------------------------------
 _c0                   | 0                                
 trans_date_trans_time | 2020-06-21 12:14:25              
 cc_num                | 2291163933867244                 
 merchant              | fraud_Kirlin and Sons            
 category              | personal_care                    
 amt                   | 2.86                             
 first                 | Jeff                             
 last                  | Elliott                          
 gender                | M                                
 street                | 351 Darlene Green                
 city                  | Columbia                         
 state                 | SC                               
 zip                   | 29209                            
 lat                   | 33.9659                          
 long                  | -80.9355                         
 city_pop              | 333497                         

In [9]:
# print the schemas to check they're the same
print(fraud_train.printSchema())
print(fraud_test.printSchema())

# Compare the schemas
if fraud_train.schema == fraud_test.schema:
    print('\nBoth dataframes have the same schema.')
else:
    print('\nThe dataframes have different schemas.')

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)

None
root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_

In [10]:
# check how many records are in the datasets
print(f'The training set has {fraud_train.count()} records')
print(f'The test set has {fraud_test.count()} records')

The training set has 1296675 records
The test set has 555719 records


At this stage we could discard the test data set as we already have nearly 1.3 million records to work with. However, the test set has another 555k records to inform the models so I believe the best route would be to combine these to do data cleaning then resplit later. Having them split at the point isn't really that helpful as both data sets would need to be cleaned which would add work.

In [11]:
# concatenate both data sets using union
concat_data = fraud_train.union(fraud_test)

In [12]:
# check the new data set
print(f'The concatenated set has {concat_data.count()} records')

The concatenated set has 1852394 records


In [13]:
# quick check of the schema again to make sure everything is as expected
# Compare the schemas
if concat_data.schema == fraud_train.schema == fraud_test.schema:
    print('\nThe dataframes have the same schema.')
else:
    print('\nThe dataframes have different schemas.')


The dataframes have the same schema.


The data has been combined succesfully ready for preprocessing. From looking at the schema a couple of variables jump straight out for looking into later, 'trans_date_trans_time' and 'dob'. These are both strings and I would expect date time to be the most usable format for these. I will look into these further later on.

First I'm going to check for NaN values and duplicate rows.

In [14]:
# check for Nan's
# https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/
concat_data.select([F.count(F.when(F.col(c).isNull() , c)).alias(c) for c in concat_data.columns]).show(vertical=True)

-RECORD 0--------------------
 _c0                   | 0   
 trans_date_trans_time | 0   
 cc_num                | 0   
 merchant              | 0   
 category              | 0   
 amt                   | 0   
 first                 | 0   
 last                  | 0   
 gender                | 0   
 street                | 0   
 city                  | 0   
 state                 | 0   
 zip                   | 0   
 lat                   | 0   
 long                  | 0   
 city_pop              | 0   
 job                   | 0   
 dob                   | 0   
 trans_num             | 0   
 unix_time             | 0   
 merch_lat             | 0   
 merch_long            | 0   
 is_fraud              | 0   



There are no null values to process

In [15]:
# check the data set for duplicate rows
distinct_count = concat_data.distinct().count()
total_count = concat_data.count()
duplicate_count = distinct_count - total_count
print(f'Number of distinct rows: {distinct_count}')
print(f'Number of distinct rows: {total_count}')
print(f'Number of distinct rows: {duplicate_count}')

Number of distinct rows: 1852394
Number of distinct rows: 1852394
Number of distinct rows: 0


There are no duplicate values to process

Now to check the variables in more detail, I'll be looking at some basic statistics and see if there are any outliers in the numerical variables or dirty data in the categoricals. I used geeksforgeeks [[11]](https://www.geeksforgeeks.org/selecting-only-numeric-or-string-columns-names-from-pyspark-dataframe/) for reference on using the .schema method to create lists of numeric and categoric variables.

In [16]:
# first to get a list of numeric and categorical variables
# https://www.geeksforgeeks.org/selecting-only-numeric-or-string-columns-names-from-pyspark-dataframe/

# assigning empty list
num_cols = []
cat_cols = []

schema = concat_data.schema # gets df schema

# update lists based on the data types of the columns (from schema)
for var in schema.fields:
    if isinstance(var.dataType, (IntegerType, DoubleType)):
        num_cols.append(var.name)
    elif isinstance(var.dataType, StringType):
        cat_cols.append(var.name)

print(f'Numerical columns: {len(num_cols)}')
print(f'Categorical columns: {len(cat_cols)}')

Numerical columns: 10
Categorical columns: 12


In [17]:
print(f'Numerical columns:\n:{num_cols}\n')
print(f'Categorical columns:\n:{cat_cols}')

Numerical columns:
:['_c0', 'amt', 'zip', 'lat', 'long', 'city_pop', 'unix_time', 'merch_lat', 'merch_long', 'is_fraud']

Categorical columns:
:['trans_date_trans_time', 'merchant', 'category', 'first', 'last', 'gender', 'street', 'city', 'state', 'job', 'dob', 'trans_num']


In [18]:
# basic statistics of the numerical values

# Run describe to get summary statistics for numerical columns
desc_df = concat_data.describe(num_cols)

# Round each of the numerical statistics to 2 decimal places - this is so the summary df is displayed
# in a readable form
for col_name in num_cols:
    desc_df = desc_df.withColumn(col_name, F.round(F.col(col_name).cast('float'), 2))
    
desc_df.show()

+-------+---------+---------+---------+---------+---------+---------+------------+---------+----------+---------+
|summary|      _c0|      amt|      zip|      lat|     long| city_pop|   unix_time|merch_lat|merch_long| is_fraud|
+-------+---------+---------+---------+---------+---------+---------+------------+---------+----------+---------+
|  count|1852394.0|1852394.0|1852394.0|1852394.0|1852394.0|1852394.0|   1852394.0|1852394.0| 1852394.0|1852394.0|
|   mean|537193.44|    70.06| 48813.26|    38.54|   -90.23| 88643.67|1.35867418E9|    38.54|    -90.23|     0.01|
| stddev|366910.97|   159.25| 26881.85|     5.07|    13.75|301487.62| 1.8195082E7|     5.11|     13.76|     0.07|
|    min|      0.0|      1.0|   1257.0|    20.03|  -165.67|     23.0|  1.325376E9|    19.03|   -166.67|      0.0|
|    max|1296674.0|  28948.9|  99921.0|    66.69|   -67.95|2906700.0| 1.3885344E9|    67.51|    -66.95|      1.0|
+-------+---------+---------+---------+---------+---------+---------+------------+------

The main variable that stands out to me here is 'amt' which is the transaction amount. With a mean transaction amount of 70.06 but min and max of 1 and 28,948.90 respectively. This shows there are big differences in the value of the transactions being made and with a standard deviation of 159.25 it shows the data has high variability. To investigate this a bit further I'll run the df.summary() function as this includes quartiles too.

In [19]:
concat_data.select('amt').summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

+-------+------------------+
|summary|               amt|
+-------+------------------+
|  count|           1852394|
|   mean| 70.06356747538561|
| stddev|159.25397477398326|
|    min|               1.0|
|    25%|              9.65|
|    50%|             47.45|
|    75%|             83.09|
|    max|           28948.9|
+-------+------------------+



The statistical data shows the median is lower than the mean due to the effects of high value outliers. It shows that 25% of the transactions are lower than or equal to 9.65 which means a large amount of transactions are small in value. And 75% of the transactions are lower or equal to 83.09. My takeaway from this is the amount is highly variable and right skewed with the majority of the transactions being relatively low but a few high value transactions that push up the mean and standard deviation.

This is something I will be investigating later in conjunction with the target variable as I would hypothesise the fraud transactions are likely to be smaller in value as high value transactions are more likely to be scrutinised.

I'll have a quick check of the categorical variables and see how many unique values there are before delving into analysis of the target variable.

In [20]:
for col in cat_cols:
    print(f'number of unique values in variable "{col}": {concat_data.select(col).distinct().count()}')

number of unique values in variable "trans_date_trans_time": 1819551
number of unique values in variable "merchant": 693
number of unique values in variable "category": 14
number of unique values in variable "first": 355
number of unique values in variable "last": 486
number of unique values in variable "gender": 2
number of unique values in variable "street": 999
number of unique values in variable "city": 906
number of unique values in variable "state": 51
number of unique values in variable "job": 497
number of unique values in variable "dob": 984
number of unique values in variable "trans_num": 1852394


The categorical variables that stand out as being woth looking at further are:

* 'merchant' - are there particular merchants with lax security that fraudsters target?
* 'category' - are certain categories of transaction more likely to fraudulant?
* 'gender' - is this a factor? Are people of one gender more likely to be targeted 
* 'city'/'state' - is location a factor
* 'job' - can fraud be corralated with job role. Are certain people more likely to be targeted

* 'dob' - I'll look at this further after creating an age variable

#### Inspecting the target class variable

In [21]:
# I've created a function to output this summary as I've repeated it a few times below
def fraud_not_fraud_summary(df):
    """
    takes a dataframe as input and calculates the number of class 1 (fraud), class 0 (not fraud) records
    and the percentage not fraud to fraud
    
    args:
        df (dataframe): the data frame
    
    returns:
        it prints the number of class 1 (fraud), class 0 (not fraud) records
        and the percentage not fraud to fraud
    
    """
    total_records = df.count()
    fraud_total = df.filter(df.is_fraud==1).count() # total records that are fraud
    not_fraud_total = df.filter(df.is_fraud==0).count() # total records that are NOT fraud

    print(f'Number of class 1 (fraudulant records): {fraud_total}')
    print(f'Number of class 0 (Non-fraudulant records): {not_fraud_total}')
    print(f'{float(not_fraud_total)/float(total_records)*100}% are in class 0')

In [22]:
fraud_not_fraud_summary(concat_data)

Number of class 1 (fraudulant records): 9651
Number of class 0 (Non-fraudulant records): 1842743
99.47899852839083% are in class 0


With 99.5% of the records being not fraud the data set is heavily imbalanced. This will need to be dealt with later on or the model will overfit when training. I will need to apply under or oversampling to balance the classes out. I'll discuss this in more detail later.

I will now look at how the classes are distributed in comparison to the other variables. I would like to see how the target variable is distributed and see of I can see any initial trends that could highlight key variables for the model. I'll look at the categorical variables highlighted above first. My technique will be to normalise the data by creating a within category percentage and sorting on this. This will take into account categories that have values with large numbers of transactions by getting a percentage based on total fraud transactions:

In [23]:
# add docstring
# function to print top 10 categories with highest % of fraud
def percentage_fraud_cat(df,column,n):
    """
    takes a dataframe as input and for a given column prints top 10 categories with highest % of fraud
    
    args:
        df (dataframe): the dataframe
        column: (dataframe column) specific column to do calculations on
        n (int): number of within feature values to summarise on (e.g. top 10 values)
        
    
    returns:
        it displays the resulting dataframe sorted by 'within_category_fraud_%' and top values
    
    """
    # creates df grouped by categorical variable, totals per category then renames col
    all_df = df.groupBy(column).count().withColumnRenamed('count', 'total_count')

    # creates df similar to above but filtered to NON fraud cases
    non_fraud_df = df.filter(df.is_fraud == 0).groupBy(column).count()\
    .withColumnRenamed('count', 'non_fraud_count')

    # creates df similar to above but filtered to fraud cases
    fraud_df = df.filter(df.is_fraud == 1).groupBy(column).count()\
    .withColumnRenamed('count', 'fraud_count')

    # joins all df's based on categorical col
    joined_df = all_df.join(non_fraud_df, column, 'outer').join(fraud_df, column, 'outer').fillna(0)

    # calculate within cat fraud % and rounds
    result_df = joined_df.withColumn\
    ('within_category_fraud_%', F.round((F.col('fraud_count') / F.col('total_count') * 100), 2))

    # sorts
    result_df = result_df.orderBy(F.desc('within_category_fraud_%'))

    # shows top n
    result_df.show(n)

I havent applied this to all categorical columns as first and last names and transaction number and date will not give any meaningful information

In [24]:
# applies the function on a subset of the cateorical columns
cats = ['merchant','category','gender','city','state','job'] # list of cols to check
for i in cats:
    print(i.capitalize())
    percentage_fraud_cat(concat_data,i,10) # shows top 10

Merchant
+--------------------+-----------+---------------+-----------+-----------------------+
|            merchant|total_count|non_fraud_count|fraud_count|within_category_fraud_%|
+--------------------+-----------+---------------+-----------+-----------------------+
|   fraud_Kozey-Boehm|       2758|           2698|         60|                   2.18|
|fraud_Herman, Tre...|       1870|           1832|         38|                   2.03|
|    fraud_Terry-Huel|       2864|           2808|         56|                   1.96|
|fraud_Kerluke-Abs...|       2635|           2585|         50|                    1.9|
|fraud_Mosciski, Z...|       2821|           2768|         53|                   1.88|
|fraud_Schmeler, B...|       2788|           2736|         52|                   1.87|
|     fraud_Kuhic LLC|       2842|           2789|         53|                   1.86|
|      fraud_Jast Ltd|       2757|           2706|         51|                   1.85|
|fraud_Langworth, ...|       2817|

The standout feature from this analysis is transaction category, this shows there are 3 categories with significantly higher percentages of fraud (shopping_net,misc_net,grocery-pos). This looks like a feature that would be beneficial for the model.

Gender shows there is a higher chance of fraud if you are male over female but it is not too drastic. Some of the other features such as city and state look like there are within category values that have high fraud but this is due to all transactions within that category being fraud but will not prove to be significant but just chance occurrences.

### Feature Engineering

#### Creating a distance feature

My thoughts are the distance between the credit card holder and the fraudulant transaction might give an indication of fraud. Would it be more likely stolen details would be used close to the victim or further away? I've created a feature for the distance between the coordinates of the credit card holder and the merchant where the transaction took place. I've used these two websites for reference in creating this feature [[12]](https://en.wikipedia.org/wiki/Haversine_formula) [[13]](https://gist.github.com/pavlov99/bd265be244f8a84e291e96c5656ceb5c)

In [25]:
# distancew between two coordinates
# https://en.wikipedia.org/wiki/Haversine_formula
# using code from here https://gist.github.com/pavlov99/bd265be244f8a84e291e96c5656ceb5c

concat_data = concat_data.withColumn("a", (
    F.pow(F.sin(F.radians(F.col("lat") - F.col("merch_lat")) / 2), 2) +
    F.cos(F.radians(F.col("merch_lat"))) * F.cos(F.radians(F.col("lat"))) *
    F.pow(F.sin(F.radians(F.col("long") - F.col("merch_long")) / 2), 2)
)).withColumn("distance", F.atan2(F.sqrt(F.col("a")), F.sqrt(-F.col("a") + 1)) * 12742)

In [26]:
# all records summary
concat_data.select('distance').summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

+-------+--------------------+
|summary|            distance|
+-------+--------------------+
|  count|             1852394|
|   mean|   76.11172606007756|
| stddev|  29.116970235829854|
|    min|0.022254515638296817|
|    25%|   55.31745761253762|
|    50%|    78.2134066416527|
|    75%|   98.50607848728853|
|    max|  152.11717310594932|
+-------+--------------------+



In [27]:
# fraud records summary
concat_data.filter(concat_data.is_fraud == 1).select('distance').\
summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

+-------+------------------+
|summary|          distance|
+-------+------------------+
|  count|              9651|
|   mean|  76.2562333827283|
| stddev|28.865544711903578|
|    min|0.7387691216521973|
|    25%|144.52241007387727|
|    50%| 78.10192247309428|
|    75%|144.52241007387727|
|    max|144.52241007387727|
+-------+------------------+



In [28]:
# not fraud records summary
concat_data.filter(concat_data.is_fraud == 0).select('distance').\
summary("count", "mean", "stddev", "min", "25%", "50%", "75%", "max").show()

+-------+--------------------+
|summary|            distance|
+-------+--------------------+
|  count|             1842743|
|   mean|   76.11096923171304|
| stddev|  29.118287189373707|
|    min|0.022254515638296817|
|    25%|   55.31745761253762|
|    50%|   78.21634859215874|
|    75%|   98.50607848728853|
|    max|  152.11717310594932|
+-------+--------------------+



Becuase there are is such a large imbalance between fraud and not fraud, the means are going to be quite similar, however, the median shows the median fraudulant transaction is much further away than those of non fraud, or the overall median

#### Transaction Time

Changing transaction time from string to date time and extracting day, hour features. I've used these sources for reference [[14]](https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/) [[15]](https://sparkbyexamples.com/spark/spark-extract-hour-minute-and-second-from-timestamp/)

In [29]:
# Timestamp String to DateType
# https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/
# https://sparkbyexamples.com/spark/spark-extract-hour-minute-and-second-from-timestamp/
concat_data = concat_data.withColumn('trans_timestamp',F.to_timestamp('trans_date_trans_time'))

# extracting day and hour
concat_data = concat_data.withColumn('trans_day', F.date_format('trans_timestamp', 'E'))\
.withColumn("trans_hour", F.hour(F.col("trans_timestamp")))

exploring day of the week:

In [30]:
concat_data.filter(concat_data.is_fraud == 1).groupBy('trans_day').count().orderBy(F.desc('count')).show()

+---------+-----+
|trans_day|count|
+---------+-----+
|      Sun| 1590|
|      Sat| 1493|
|      Mon| 1484|
|      Fri| 1376|
|      Thu| 1317|
|      Tue| 1266|
|      Wed| 1125|
+---------+-----+



In [31]:
# using function from above
percentage_fraud_cat(concat_data,'trans_day',7)

+---------+-----------+---------------+-----------+-----------------------+
|trans_day|total_count|non_fraud_count|fraud_count|within_category_fraud_%|
+---------+-----------+---------------+-----------+-----------------------+
|      Thu|     206741|         205424|       1317|                   0.64|
|      Fri|     215078|         213702|       1376|                   0.64|
|      Wed|     183913|         182788|       1125|                   0.61|
|      Sat|     263227|         261734|       1493|                   0.57|
|      Tue|     270340|         269074|       1266|                   0.47|
|      Sun|     343677|         342087|       1590|                   0.46|
|      Mon|     369418|         367934|       1484|                    0.4|
+---------+-----------+---------------+-----------+-----------------------+



From the above analysis, fraud is likeliest on Thursday and Friday and least likely on Mondays

Exploring hour of day:

In [32]:
concat_data.filter(concat_data.is_fraud == 1).groupBy('trans_hour').count().orderBy(F.desc('count')).show(10)

+----------+-----+
|trans_hour|count|
+----------+-----+
|        22| 2481|
|        23| 2442|
|         1|  827|
|         0|  823|
|         3|  803|
|         2|  793|
|        18|  111|
|        19|  105|
|        21|  101|
|        15|  100|
+----------+-----+
only showing top 10 rows



In [33]:
percentage_fraud_cat(concat_data,'trans_hour',24)

+----------+-----------+---------------+-----------+-----------------------+
|trans_hour|total_count|non_fraud_count|fraud_count|within_category_fraud_%|
+----------+-----------+---------------+-----------+-----------------------+
|        22|      95370|          92889|       2481|                    2.6|
|        23|      95902|          93460|       2442|                   2.55|
|         0|      60655|          59832|        823|                   1.36|
|         1|      61330|          60503|        827|                   1.35|
|         3|      60968|          60165|        803|                   1.32|
|         2|      60796|          60003|        793|                    1.3|
|         5|      60088|          60008|         80|                   0.13|
|         7|      60301|          60229|         72|                   0.12|
|        18|      94052|          93941|        111|                   0.12|
|        14|      93089|          92989|        100|                   0.11|

This is pretty conclusive, fraud is much more likely to happen between 22:00 and 04:00. To capture this in my data I'm going to create a binary category 'hour_binary' where 1 if between 22:00 and 04:00 and 0 for any other times. I've used this source for reference here [[16]](https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else)

In [34]:
# https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else
# https://www.kaggle.com/datasets/kartik2112/fraud-detection/discussion/197336

concat_data = concat_data.withColumn('hour_binary', F.when(concat_data.trans_hour > 21,1)
                                                    .when(concat_data.trans_hour < 4,1)
                                                    .otherwise(0))

## Create age from dob

In [35]:
concat_data = concat_data.withColumn('dob_timestamp',F.to_timestamp('dob')) # transform string to datetime

# calculates years between dob and transaction date to get age
concat_data = concat_data.withColumn('age', F.round(F.datediff(F.col('trans_timestamp'), F.col('dob_timestamp'))/365))

In [36]:
percentage_fraud_cat(concat_data,'age',10)

+----+-----------+---------------+-----------+-----------------------+
| age|total_count|non_fraud_count|fraud_count|within_category_fraud_%|
+----+-----------+---------------+-----------+-----------------------+
|96.0|        269|            262|          7|                    2.6|
|87.0|       4291|           4232|         59|                   1.37|
|18.0|       4492|           4434|         58|                   1.29|
|77.0|       6093|           6024|         69|                   1.13|
|78.0|      10046|           9934|        112|                   1.11|
|92.0|       6922|           6845|         77|                   1.11|
|63.0|      19159|          18966|        193|                   1.01|
|71.0|      13401|          13265|        136|                   1.01|
|86.0|       5587|           5537|         50|                   0.89|
|80.0|       9553|           9469|         84|                   0.88|
+----+-----------+---------------+-----------+-----------------------+
only s

Although research highlighted above showed the 30-39 age group was most susceptible to fraud, the top 10 in this dataset are mostly over 70 years old.

### Dropping Columns

I've decided to drop the following columns as I don't feel they will add anything to the model

In [37]:
# dropping columns I don't need
cols_to_drop = ['_c0''trans_date_trans_time','merchant','first', 'last','street','city',\
                'state','zip','job','trans_num','dob','a','trans_timestamp','dob_timestamp']

concat_data = concat_data.drop(*cols_to_drop)

In [38]:
concat_data.show(n=1,vertical=True)

-RECORD 0------------------------------------
 _c0                   | 0                   
 trans_date_trans_time | 2019-01-01 00:00:18 
 cc_num                | 2703186189652095    
 category              | misc_net            
 amt                   | 4.97                
 gender                | F                   
 lat                   | 36.0788             
 long                  | -81.1781            
 city_pop              | 3495                
 unix_time             | 1325376018          
 merch_lat             | 36.011293           
 merch_long            | -82.048315          
 is_fraud              | 0                   
 distance              | 78.59756848823127   
 trans_day             | Tue                 
 trans_hour            | 0                   
 hour_binary           | 1                   
 age                   | 31.0                
only showing top 1 row



In [39]:
concat_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: string (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- trans_day: string (nullable = true)
 |-- trans_hour: integer (nullable = true)
 |-- hour_binary: integer (nullable = false)
 |-- age: double (nullable = true)



### Correlation against the target variable

The code below calculates the correlation between the numerical predictor variables and the target variables. I've used this source for reference whilst calculating [[17]](https://www.projectpro.io/recipes/calculate-correlation-pyspark). This shows that 'amt' is the highest correlating feature with the hour_binary column being second highest.

In [40]:
## correlation
## https://www.projectpro.io/recipes/calculate-correlation-pyspark

# assigning empty list
num_cols = []

schema = concat_data.schema # gets df schema

# update lists based on the data types of the columns (from schema)
for var in schema.fields:
    if isinstance(var.dataType, (IntegerType, DoubleType)):
        num_cols.append(var.name)

num_cols.remove('is_fraud')
print('Correlation between each numerical feature and the target "is_fraud" class\n')
for col in num_cols:
    corr = concat_data.stat.corr(col,'is_fraud')
    print(f'{col}: {round(corr,4)}')

Correlation between each numerical feature and the target "is_fraud" class

_c0: 0.0005
amt: 0.2093
lat: 0.0029
long: 0.001
city_pop: 0.0003
unix_time: -0.0133
merch_lat: 0.0028
merch_long: 0.001
distance: 0.0004
trans_hour: 0.0132
hour_binary: 0.1044
age: 0.0107


### Splitting the data

Due to the data being imbalanced I can't do a straight random split to create the training and test sets. Instead of used stratified split [[18]](https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark). With this you can set the fraction you want to split the target class by. I've set this to 0.8 for both classes so we get an even proportional split. This creates the training data set, I've then subtracted the training set from the main data frame to get the test data set. In the cell below that I have run the function I created earlier in the analysis to check that the split has been applied correctly and kept the class ratio.

In [41]:
# train test split (stratified)
# https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark
fractions = {0:0.8,1:0.8} # creates fraction of each to split by - I want even split
train_df = concat_data.sampleBy("is_fraud", fractions, seed=42)

test_df = concat_data.subtract(train_df) # subtracting a df is like subtracting sets

In [42]:
# checks atrain/test split using function created above
print('Train Set:')
fraud_not_fraud_summary(train_df)
print('\nTest Set:')
fraud_not_fraud_summary(test_df)

Train Set:
Number of class 1 (fraudulant records): 7659
Number of class 0 (Non-fraudulant records): 1474546
99.48326985808306% are in class 0

Test Set:
Number of class 1 (fraudulant records): 1992
Number of class 0 (Non-fraudulant records): 368197
99.4618964907115% are in class 0


### Under/Over Sampling

Due to the imbalance of the dataset, the training set will need the instances of target classes balancing out. to test the hypothesis that oversampling will produce better results than undersampling I will apply both techniques to the train_df dataframe so that I can test both methods. I've used code and ideas from this blog post to help with sampling [[19]](https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253)

First you find the ratio of each class:

In [43]:
fraud_df = train_df.filter(F.col('is_fraud') == 1) # filters to fraud
not_fraud_df = train_df.filter(F.col('is_fraud') == 0) # filters to not fraud
ratio = int(not_fraud_df.count()/fraud_df.count()) # gets ratio
print(f'The ratio is {ratio} to 1')

The ratio is 192 to 1


In [44]:
## creates oversampled data

a = range(ratio)
# this duplicates the minority rows
oversampled_data = fraud_df.withColumn('dummy', F.explode(F.array([F.lit(x) for x in a]))).drop('dummy') 
# combine both oversampled minority rows and previous majority rows 
oversampled_data = not_fraud_df.unionAll(oversampled_data)

In [45]:
# check the oversampled set
print('Oversampled Set:')
fraud_not_fraud_summary(oversampled_data)

Oversampled Set:
Number of class 1 (fraudulant records): 1470528
Number of class 0 (Non-fraudulant records): 1474546
50.068215603410984% are in class 0


In [46]:
## creates undersampled data set

undersampled_data = not_fraud_df.sample(False, 1/ratio) # creates subset of not fraud using the ratio from above
undersampled_data = undersampled_data.unionAll(fraud_df) #joins all the fraud records to the subset of not fraud records

In [47]:
# check the undersampled set
print('Undersampled Set:')
fraud_not_fraud_summary(undersampled_data)

Undersampled Set:
Number of class 1 (fraudulant records): 7659
Number of class 0 (Non-fraudulant records): 7850
50.615771487523375% are in class 0


## Building the pipeline

I will now put together the pipeline to apply the final transformations of the data to feed into the MLLib models. A pipeline can be used to chain together a number of transformers and estimators to produce a workflow or pipeline of steps to apply to your data [[20]](https://spark.apache.org/docs/1.6.1/ml-guide.html)[[21]](https://www.analyticsvidhya.com/blog/2022/09/implementing-a-machine-learning-pipeline-using-pyspark-library/). The beneifts of using this is once it is set up it can be used repeateldy and each time will replicate the steps in the same order. This pipeline can be applied on the training data to create a model and then used on the test data set by calling .transform() and the same steps including the trained model will be applied.

For my pipeline I will be applying these steps:

* One hot encoding [[22]](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html)[[23]](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html)
* Transforming the data into a single vector for input into the model [[24]](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)
* scaling the data [[25]](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StandardScaler.html)

One hot encoding deals with categorical columns and tranforms them into numerical coded variables that can be input into and interpreted by the machine learning models. To do this I've used a combination of StringIndexer and OneHotEncoder. StringIndexer converts the category strings into numerical indexed labels then OneHotEncoder converts these into binary vectors that the machine learning model can read.

The VectorAssembler merges multiple columns into a single vector 'features' column. Most machine learning models in MLLib require this to be done.

Finally I've scaled the data as this is a requirement for some machine learning models. In my use case it is a requirement for Logistic regression but is not for Random Forest.

In [48]:
# StringIndexer and Onehotencoding

# https://www.analyticsvidhya.com/blog/2022/09/implementing-a-machine-learning-pipeline-using-pyspark-library/
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html

stages = []

# creates the indexers and encoders, setting handleInvalid='keep' as this was causing issues with the sample data
# as the sample didnt include all categories
category_indexer = StringIndexer(inputCol='category',outputCol='category_indexed',handleInvalid='keep')
category_encoder = OneHotEncoder(inputCol='category_indexed',outputCol='category_encoded')

gender_indexer = StringIndexer(inputCol='gender',outputCol='gender_indexed',handleInvalid='keep')
gender_encoder = OneHotEncoder(inputCol='gender_indexed',outputCol='gender_encoded')

trans_day_indexer = StringIndexer(inputCol='trans_day',outputCol='trans_day_indexed',handleInvalid='keep')
trans_day_encoder = OneHotEncoder(inputCol='trans_day_indexed',outputCol='trans_day_encoded')

# adding them to lists to use in the pipeline
indexers = [category_indexer,gender_indexer,trans_day_indexer]
encoders = [category_encoder,gender_encoder,trans_day_encoder]

stages += indexers + encoders # adds indexers and encoders to stages for the pipeline

In [49]:
# vector assembler
input_cols = ['cc_num','amt','lat','long','city_pop','unix_time','merch_lat',\
              'merch_long','distance','hour_binary','age','category_encoded','gender_encoded','trans_day_encoded']
vector_assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

stages += [vector_assembler]

In [50]:
# normalises 
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

stages += [scaler]

In [51]:
print(stages)

[StringIndexer_c3ea4f0fff60, StringIndexer_da738e90e5bd, StringIndexer_359e76561c51, OneHotEncoder_97e478afc66e, OneHotEncoder_5cc378249b99, OneHotEncoder_b558e4bc62c8, VectorAssembler_96088e5fba9c, StandardScaler_2affe1ca180e]


## Models and cross validation with gridsearch

I was hoping to train 3 machine learning models but due to long running times I had to scale this back and therefore decided not to use Support vector Machines. Fraud detection is a classification problem and the models I will be using are Logistic Regression and a Random Forest classifier. I have decided to use Cross Validation [[26]](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html) in conjunction with ParamGridBuilder [[27]](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html) which is a form of gridsearch that finds the best hyperparameters to use in your model. Due to compute time constraints I also had to tone down the number of folds of the cross validation down to 3 and tune less hyperparameters as it was taking too long to run to be feasible for this project.

I have first trained the models on the oversampled data and then the undersampled data:

### Oversampled data set

#### Logistic Regression

In [52]:
# initialise model
lr = LogisticRegression(labelCol="is_fraud", featuresCol="scaled_features")

# creates a new stages variable appending the model
lr_stages = stages + [lr]  # adds the model to stages
# pipeline - add stages to pipeline
lr_pipeline = Pipeline(stages=lr_stages)

# it did have 10 in max iter too
# create the parameter grid to search through
lr_param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [50, 100]) \
    .build()

In [53]:
# cross validation
# cross validator does k fold cross validation and then fits the best model on the whole of the training data
lr_cross_val = CrossValidator(estimator=lr_pipeline,
                           estimatorParamMaps=lr_param_grid,
                           evaluator=BinaryClassificationEvaluator(labelCol="is_fraud"),
                           numFolds=3)

In [54]:
%%time
# Fit model to data
oversampled_lr_cv_model = lr_cross_val.fit(oversampled_data)

# Best model
oversampled_lr_best_model = oversampled_lr_cv_model.bestModel

CPU times: user 9.04 s, sys: 2.38 s, total: 11.4 s
Wall time: 41min 17s


#### Random Forest

In [55]:
# initialise model
rf = RandomForestClassifier(labelCol="is_fraud", featuresCol="scaled_features")

rf_stages = stages + [rf] # adds the model to stages
# pipeline - add staages to pipeline
rf_pipeline = Pipeline(stages=rf_stages)

# creates the parameter grid to search through
rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .addGrid(rf.impurity, ["entropy", "gini"]) \
    .build()

In [56]:
# cross validation
# cross validator does k fold cross validation and then fits the best model on the whole of the training data
rf_cross_val = CrossValidator(estimator=rf_pipeline,
                           estimatorParamMaps=rf_param_grid,
                           evaluator=BinaryClassificationEvaluator(labelCol="is_fraud"),
                           numFolds=3)

In [57]:
%%time
# fits model to data
oversampled_rf_cv_model = rf_cross_val.fit(oversampled_data)

# gets best model
oversampled_rf_best_model = oversampled_rf_cv_model.bestModel

CPU times: user 9.98 s, sys: 2.99 s, total: 13 s
Wall time: 2h 47min 18s


## Undersampled data set

### Logistic Regression

In [58]:
%%time
# Fit model to data
undersampled_lr_cv_model = lr_cross_val.fit(undersampled_data)

# Best model
undersampled_lr_best_model = undersampled_lr_cv_model.bestModel

CPU times: user 8.72 s, sys: 2.3 s, total: 11 s
Wall time: 12min 2s


### Random Forest

In [59]:
%%time
# fits model to data
undersampled_rf_cv_model = rf_cross_val.fit(undersampled_data)

# gets best model
undersampled_rf_best_model = undersampled_rf_cv_model.bestModel

CPU times: user 9.08 s, sys: 2.53 s, total: 11.6 s
Wall time: 15min 10s


## Evaluating

To assess the performance of the models I've used multiple evaluation metrics to ascertain which works best on this dataset.

For the best overall performing model I've also extracted the hyperparameters [[28]](https://stackoverflow.com/questions/52498970/how-to-get-the-best-hyperparameter-value-after-crossvalidation-in-pyspark) used and the feature importance [[29]](https://saturncloud.io/blog/understanding-pyspark-random-forest-classifier-feature-importance-with-column-names/)[[30]](https://www.timlrx.com/blog/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator)[[31]](https://medium.com/@derekfan/python-sorting-and-its-custom-key-5a22fccc04f8). These will be discussed further in my conclusion.

For evaluating the model I have used the hold out test set that was not 'seen' in the training stage and it is transformed using the best model fitted in the gridsearchcv stage. Transformations such as StandardScaler are based on the training set statistics which should prevent data leakage.

In [60]:
# function to get evaluation metrics

def evaluation(predictions):
    """
    takes the dataframe that has been transformed using the best model after cross validation and returns
    various evaluation metrics
    
    args:
        predictions (dataframe): the transformed dataframe
        
    
    returns:
        AUC-ROC
        AUC-PR
        Accuracy
        Precision
        Recall
        F1 Score
    
    """
    # evaluators for auc-roc and auc-pr
    evaluator_roc = BinaryClassificationEvaluator(labelCol='is_fraud', metricName='areaUnderROC')
    evaluator_pr = BinaryClassificationEvaluator(labelCol='is_fraud', metricName='areaUnderPR')

    # uses predictions to calculate auc-roc and auc-pr
    auc_roc = evaluator_roc.evaluate(predictions)
    auc_pr = evaluator_pr.evaluate(predictions)

    print(f'AUC-ROC: {round(auc_roc,4)}')
    print(f'AUC-PR: {round(auc_pr,4)}')

    # evaluators for accuracy, precision and recall
    evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='is_fraud', metricName='accuracy')
    evaluator_precision = MulticlassClassificationEvaluator(labelCol='is_fraud', metricName='weightedPrecision')
    evaluator_recall = MulticlassClassificationEvaluator(labelCol="is_fraud", metricName='weightedRecall')
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol='is_fraud', metricName='f1')

    # Calculate the metric values
    accuracy = evaluator_accuracy.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)

    print(f'Accuracy: {round(accuracy,4)}')
    print(f'Precision: {round(precision,4)}')
    print(f'Recall: {round(recall,4)}')
    print(f'F1 Score: {round(f1,4)}')

### Oversampled Evaluation

#### Logistic Regression

In [61]:
# make predictions on the test set
oversampled_lr_predictions = oversampled_lr_best_model.transform(test_df)
print('Evaluation of Logistic Regression model trained with oversampled data:\n')
evaluation(oversampled_lr_predictions) # runs evaluation function

Evaluation of Logistic Regression model trained with oversampled data:

AUC-ROC: 0.9519
AUC-PR: 0.2167
Accuracy: 0.8634
Precision: 0.9942
Recall: 0.8634
F1 Score: 0.9217


#### Random Forest

In [62]:
%%time
# make predictions on the test set
oversampled_rf_predictions = oversampled_rf_best_model.transform(test_df)
print('Evaluation of Random Forest model trained with oversampled data:\n')
evaluation(oversampled_rf_predictions) # runs evaluation function

Evaluation of Random Forest model trained with oversampled data:

AUC-ROC: 0.9954
AUC-PR: 0.8188
Accuracy: 0.9928
Precision: 0.9963
Recall: 0.9928
F1 Score: 0.9941
CPU times: user 214 ms, sys: 111 ms, total: 325 ms
Wall time: 7min 32s


### Undersampled Evaluation

#### Logistic Regression

In [63]:
# make predictions on the test set
undersampled_lr_predictions = undersampled_lr_best_model.transform(test_df)
print('Evaluation of Logistic Regression model trained with undersampled data:\n')
evaluation(undersampled_lr_predictions) # runs evaluation function

Evaluation of Logistic Regression model trained with undersampled data:

AUC-ROC: 0.9517
AUC-PR: 0.2214
Accuracy: 0.8615
Precision: 0.9942
Recall: 0.8615
F1 Score: 0.9206


#### Random Forest

In [64]:
# make predictions on the test set
undersampled_rf_predictions = undersampled_rf_best_model.transform(test_df)
print('Evaluation of Random Forest model trained with undersampled data:\n')
evaluation(undersampled_rf_predictions) # runs evaluation function

Evaluation of Random Forest model trained with undersampled data:

AUC-ROC: 0.9949
AUC-PR: 0.7587
Accuracy: 0.9727
Precision: 0.9952
Recall: 0.9727
F1 Score: 0.9822


## Hyperparameters for the best model

In [69]:
# (https://stackoverflow.com/questions/52498970/how-to-get-the-best-hyperparameter-value-after-
# crossvalidation-in-pyspark)

# extract the model
best_rf_model = oversampled_rf_best_model.stages[-1]  # Assuming the last stage is the random forest model

# get hyperparameters of the best model
best_max_depth = best_rf_model._java_obj.getMaxDepth()
best_num_trees = best_rf_model._java_obj.getNumTrees()
best_impurity = best_rf_model._java_obj.getImpurity()

print(f'Best RF Parameters:\n')
print(f'Max Depth: {best_max_depth}')
print(f'Number of Trees: {best_num_trees}')
print(f'Impurity: {best_impurity}')

Best RF Parameters:

Max Depth: 20
Number of Trees: 200
Impurity: gini


## Feature Importance for the best model

In [70]:
%%time
# https://saturncloud.io/blog/understanding-pyspark-random-forest-classifier-feature-importance-with-column-names/
# https://www.timlrx.com/blog/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator
#https://medium.com/@derekfan/python-sorting-and-its-custom-key-5a22fccc04f8

# Extract the Random Forest model from the pipeline
rf_model = oversampled_rf_best_model.stages[-1]

# create a feature importance instance
feature_importance = rf_model.featureImportances

# because of the onehotencoding I can't just use the column names from the training data set as these were 
# transformed into multiple new ones. I need to get the number of categories in each encoder and create feature
# names to map to each feature importance

num_category_categories = 14  # no. of categories
num_gender_categories = 2    # no. of gender categories
num_trans_day_categories = 7 # no. of day categories

# extends the feature names in relation to the number of categories in the one hot encoded features
# makes it category_1, category_2 etc
one_hot_feature_names = []
for feature in input_cols:
    if feature == 'category_encoded':
        one_hot_feature_names.extend([f'category_{i}' for i in range(num_category_categories)])
    elif feature == 'gender_encoded':
        one_hot_feature_names.extend([f'gender_{i}' for i in range(num_gender_categories)])
    elif feature == 'trans_day_encoded':
        one_hot_feature_names.extend([f'trans_day_{i}' for i in range(num_trans_day_categories)])
    else:
        one_hot_feature_names.append(feature)

# maps feature importances to new feature names
feature_importance_with_names = [(one_hot_feature_names[i], feature_importance[i]) \
                                 for i in range(len(feature_importance))]

#https://medium.com/@derekfan/python-sorting-and-its-custom-key-5a22fccc04f8
sorted_feature_importance = sorted(feature_importance_with_names, key=lambda x: x[1], reverse=True)

# prints top 15 items
sorted_feature_importance[:15]

CPU times: user 4.5 ms, sys: 50 µs, total: 4.55 ms
Wall time: 509 ms


[('amt', 0.5356207758104532),
 ('hour_binary', 0.22565363404917346),
 ('category_1', 0.021973933768052806),
 ('category_3', 0.019999205887936196),
 ('category_0', 0.019259556936518752),
 ('age', 0.01694829882922136),
 ('unix_time', 0.014409975948294242),
 ('city_pop', 0.011161945740649236),
 ('category_5', 0.010314941669383591),
 ('category_10', 0.010163776761393553),
 ('category_2', 0.010105323593246623),
 ('category_4', 0.010028605614858082),
 ('cc_num', 0.008000608942503297),
 ('distance', 0.007684038067793089),
 ('category_7', 0.00723697527722333)]

### Benchmark model using data that hasnt been sampled

Although some form of sampling is the suggested method to get the best results when dealing with an imbalanced data set. I thought it would be sensible to benchmark this against running a model on the unsampled data to check the results. As you can see below it actually scored pretty well but not as well as the best Random Forest model on sampled data.

In [67]:
%%time
# initialise model - uses default parmeters
no_sampling_rf = RandomForestClassifier(labelCol="is_fraud",featuresCol="scaled_features")

no_sampling_rf_stages = stages + [no_sampling_rf] # adds the model to stages
# pipeline - add stages to pipeline
no_sampling_rf_pipeline = Pipeline(stages=no_sampling_rf_stages)

# fit unsampled train_df
no_sampling_rf_model = no_sampling_rf_pipeline.fit(train_df)

# test against test set
no_sampling_rf_predictions = no_sampling_rf_model.transform(test_df)
print('Evaluation of Random Forest model with data that was not sampled:\n')
evaluation(no_sampling_rf_predictions) # runs evaluation function

Evaluation of Random Forest model with data that was not sampled:

AUC-ROC: 0.9667
AUC-PR: 0.5863
Accuracy: 0.9946
Precision: 0.9893
Recall: 0.9946
F1 Score: 0.9919
CPU times: user 165 ms, sys: 41 ms, total: 206 ms
Wall time: 1min 25s


In [71]:
spark.stop() # this stops the spark session

## Summary and Conclusions

The results show the best model was the Random Forest classifier trained on the oversampled data set. I used the following metrics for evaluation:

* AUC-ROC: 0.9954
* AUC-PR: 0.8188
* Accuracy: 0.9928
* Precision: 0.9963
* Recall: 0.9928
* F1 Score: 0.9941

Often accuracy is the main benchmark used in analysing the performance of models. However, this can be misleading, especially in the case of imbalanced data sets. With this set in particular the majority class (class 0) is c99.5% of the data so predicting all as class 0 would give a model accuracy of 99.5%, but obviously this would not be a good model as no fraud would be caught. Therefore, other metrics give a better picture of the actual effectiveness of the model.

Precision, recall and F1 score are all >99% which suggests the model performs well in these areas. Precision relates to the instances of correctly predicted positive class and recall relates to haw many of the actual positives were correctly predicted. A high score like this shows there were low rates of false positives and false negatives. In fraud prediction generally you would be more concerned with minimising false negatives i.e. not catching fraudulent transactions. The F1 score is the harmonic mean between precision and recall. The score of my model suggests this is doing well in relation to these metrics[[32]](https://www.analyticsvidhya.com/blog/2021/07/metrics-to-evaluate-your-classification-model-to-take-the-right-decisions).

These metrics can sometimes also be misleading especially when it comes to imbalanced data, so I've also used AUC-ROC and AUC-PR. AUC-ROC looks at the false positive rate and true positive rate a number closer to 1 suggests the model is able to distinguish well between the positive and negative class. AUC-PR is good for evaluating imbalanced data as it focuses more on the performance in relation to the positive minority class - my model score of 0.82 suggests the model could improve in this area and fails to predict the minority (fraud) class in some instances [[33]](https://medium.com/alliedoffsets/boost-your-binary-classification-game-auc-roc-vs-auc-pr-which-one-should-you-use-28f6518d7bda).

Random Forest outperformed Logistic Regression for this problem. This wasn't a surprise as RF is a powerful method that often proves to outperform other models. The oversampled data set also proved to be the best to train the model although not by that far and not by too much, the undersampled RF model was the next best with the AUC-PR for the undersampled model being 0.76. The benchmarked model where the training set was not sampled at all was 0.59 for AUC-PR, this is interesting as it outperforms both Logistic Regression models using the sampled data by a considerable amount. The AUC-PR for the best Logistic Regression model (undersampled) was 0.22.

Oversampling although performing best here, has a tendency to overfit because the minority class records are replicated which is likely to have caused the lower AUC-PR. It would have been interesting to also try SMOTE to see if this would get better results. Using the Synthetic Minority Oversampling Technique, rather than replicating the minority class, it uses statistical methods to synthetically create new unique minority records to balance the data and often proves to get good results.

Obtaining the feature importance results from the best random forest model gave good insights into which features created a good model. The top features by far were the transaction amount and the time features with age and category following. These were also highlighted when I looked at the correlation between features and the target class before I put the pipeline together. It would be interesting to see if reducing the features thus reducing the noise in the data might get better results.

#### Limitations and further work

One of the biggest obstacles with this project was the compute time. Because I was using grid search cross validation each model was doing multiple fits and causing the run time to be really long. I initially wanted to test more than two models, but it proved to be taking too much time to run to be realistic. I was hoping to try Support Vector Machines as another model to validate. And also, XGBoost as that tends to get better results than Random Forests but that seemed to require installation on the Lena cluster as it isn't included in Spark MLlib. Using the heavily reduced sample set I've provided with the submission helped with testing but due to the nature of the dataset it didn't work using the sample to get results as they were very different to the full set. I would also have liked to increase the k-folds of the cross validation to at least 5 folds and tuned more parameters within the gridsearch but again this makes it even more compute time expensive as it increases the number of fits needed. I also struggled with the speeds on the Lena cluster. It was often really slow or would completely crash which caused some frustration when I was trying to run the project, it would be good to try this project using a cloud service to see if this reduced the run times. If I was to do this again it would be interesting to try reducing the features down further to try and get better results as the correlation and feature importance suggested this could be the case. Finally, I was hoping to use a confusion matrix as part of the evaluation however, I ran out of time, and it seemed more convoluted than using SKlearn where it's very straightforward. I was reluctant to use anything outside of Pyspark for coding and analysis as the project requirements were to use Pyspark, which was not something I had used before starting this module.

### References

1.	Card fraud - credit cards and debit cards combined - worldwide 2014-2021 [Internet]. Statista. [cited 2023 Sep 14]. Available from: https://www.statista.com/statistics/1394119/global-card-fraud-losses/
 	 
2.	UK Payments Markets Summary 2022 [Internet]. ukfinance.org.uk. 2022 [cited 2023 Sep 14]. Available from: https://www.ukfinance.org.uk/system/files/2022-08/UKF%20Payment%20Markets%20Summary%202022.pdf
 	 
3.	Wikipedia contributors. Credit card fraud [Internet]. Wikipedia, The Free Encyclopedia. 2023 [cited 2023 Sep 14]. Available from: https://en.wikipedia.org/w/index.php?title=Credit_card_fraud&oldid=1173126243
 	 
4.	Akin J. Identity theft is on the rise, both in incidents and losses [Internet]. Experian.com. Experian; 2022 [cited 2023 Sep 14]. Available from: https://www.experian.com/blogs/ask-experian/identity-theft-statistics/
 	 
5.	Rules-based fraud detection [Internet]. Fraud.net. 2019 [cited 2023 Sep 14]. Available from: https://fraud.net/d/rules-based-fraud-detection/
 	 
6.	Gillis AS. The 5 V’s of big data [Internet]. Data Management. TechTarget; 2021 [cited 2023 Sep 14]. Available from: https://www.techtarget.com/searchdatamanagement/definition/5-Vs-of-big-data
 	 
7.	Shenoy K. Credit card transactions fraud detection dataset [Internet]. 2020 [cited 2023 Sep 14]. Available from: https://www.kaggle.com/datasets/kartik2112/fraud-detection
 	 
8.	Egan J. Credit card fraud statistics [Internet]. Bankrate. Bankrate.com; 2023 [cited 2023 Sep 14]. Available from: https://www.bankrate.com/finance/credit-cards/credit-card-fraud-statistics/
 	 
9.	Beheshti N. Random forest classification [Internet]. Towards Data Science. 2022 [cited 2023 Sep 14]. Available from: https://towardsdatascience.com/random-forest-classification-678e551462f5
 	 
10.	Couronné R, Probst P, Boulesteix A-L. Random forest versus logistic regression: a large-scale benchmark experiment. BMC Bioinformatics [Internet]. 2018;19(1). Available from: http://dx.doi.org/10.1186/s12859-018-2264-5
 	 
11.	Selecting only numeric or string columns names from PySpark DataFrame [Internet]. GeeksforGeeks. 2021 [cited 2023 Sep 14]. Available from: https://www.geeksforgeeks.org/selecting-only-numeric-or-string-columns-names-from-pyspark-dataframe/
 	 
12.	Wikipedia contributors. Haversine formula [Internet]. Wikipedia, The Free Encyclopedia. 2023 [cited 2023 Sep 14]. Available from: https://en.wikipedia.org/w/index.php?title=Haversine_formula&oldid=1168079505
 	 
13.	Pavlov K. Spherical distance calcualtion based on latitude and longitude with Apache Spark [Internet]. https://gist.github.com/. [cited 2023 Sep 14]. Available from: https://gist.github.com/pavlov99/bd265be244f8a84e291e96c5656ceb5c
 	 
14.	PySpark to_timestamp() – Convert String to Timestamp type [Internet]. Sparkbyexamples.com. 2023 [cited 2023 Sep 14]. Available from: https://sparkbyexamples.com/spark/pyspark-to_timestamp-convert-string-to-timestamp-type/
 	 
15.	Spark Timestamp – Extract hour, minute and second [Internet]. Sparkbyexamples.com. 2022 [cited 2023 Sep 14]. Available from: https://sparkbyexamples.com/spark/spark-extract-hour-minute-and-second-from-timestamp/
 	 
16.	Spark equivalent of IF then ELSE [Internet]. Stack Overflow. [cited 2023 Sep 14]. Available from: https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else
 	 
17.	How to calculate correlation in PySpark [Internet]. ProjectPro. [cited 2023 Sep 14]. Available from: https://www.projectpro.io/recipes/calculate-correlation-pyspark
 	 
18.	Stratified sampling with pyspark [Internet]. Stack Overflow. [cited 2023 Sep 14]. Available from: https://stackoverflow.com/questions/47637760/stratified-sampling-with-pyspark
 	 
19.	Wan J. Oversampling and undersampling with PySpark - jun wan [Internet]. Medium. 2020 [cited 2023 Sep 14]. Available from: https://medium.com/@junwan01/oversampling-and-undersampling-with-pyspark-5dbc25cdf253
 	 
20.	Overview: estimators, transformers and pipelines - spark.ml - Spark 1.6.1 Documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/1.6.1/ml-guide.html
 	 
21.	Gulati AP. Implementing a machine learning pipeline using PySpark library [Internet]. Analytics Vidhya. 2022 [cited 2023 Sep 14]. Available from: https://www.analyticsvidhya.com/blog/2022/09/implementing-a-machine-learning-pipeline-using-pyspark-library/
 	 
22.	StringIndexer — PySpark 3.4.1 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html
 	 
23.	OneHotEncoder — PySpark 3.1.1 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html
 	 
24.	VectorAssembler — PySpark 3.1.3 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html
 	 
25.	StandardScaler — PySpark 3.4.1 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StandardScaler.html
 	 
26.	CrossValidator — PySpark 3.4.1 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html
 	 
27.	ParamGridBuilder — PySpark 3.4.1 documentation [Internet]. Apache.org. [cited 2023 Sep 14]. Available from: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html
 	 
28.	How to get the best hyperparameter value after crossvalidation in Pyspark? [Internet]. Stack Overflow. [cited 2023 Sep 14]. Available from: https://stackoverflow.com/questions/52498970/how-to-get-the-best-hyperparameter-value-after-crossvalidation-in-pyspark
 	 
29.	Understanding PySpark Random Forest Classifier feature importance with column names [Internet]. Saturncloud.io. 2023 [cited 2023 Sep 14]. Available from: https://saturncloud.io/blog/understanding-pyspark-random-forest-classifier-feature-importance-with-column-names/
 	 
30.	Feature selection using feature importance score - creating a PySpark estimator [Internet]. Quasilinear Musings. 2018 [cited 2023 Sep 14]. Available from: https://www.timlrx.com/blog/feature-selection-using-feature-importance-score-creating-a-pyspark-estimator
 	 
31.	Fan D. [Python] Sorting and its Custom Key [Internet]. Medium. 2019 [cited 2023 Sep 14]. Available from: https://medium.com/@derekfan/python-sorting-and-its-custom-key-5a22fccc04f8
 	 
32.	Agrawal SK. Metrics to Evaluate your Classification Model to take the right decisions [Internet]. Analytics Vidhya. 2021 [cited 2023 Sep 14]. Available from: https://www.analyticsvidhya.com/blog/2021/07/metrics-to-evaluate-your-classification-model-to-take-the-right-decisions
 	 
33.	Babbar T. Boost your binary classification game: AUC-ROC vs AUC-PR — which one should you use? [Internet]. AlliedOffsets. 2023 [cited 2023 Sep 14]. Available from: https://medium.com/alliedoffsets/boost-your-binary-classification-game-auc-roc-vs-auc-pr-which-one-should-you-use-28f6518d7bda