In [33]:
# from google.colab import drive
# drive.mount('/content/drive')

In [34]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, unix_timestamp, to_date, lit
from pyspark.sql.types import DoubleType

In [35]:
# start a spark session
spark = SparkSession.builder.appName('Bank_fraud').getOrCreate()

In [36]:
fraud_train= spark.read.csv('/content/drive/Othercomputers/My Laptop - Personal/Files/Data_Files/Portfolio Projects/Bank_Fraud_Detection/fraudTrain.csv',
                   header=True, inferSchema=True)
fraud_test = spark.read.csv('/content/drive/Othercomputers/My Laptop - Personal/Files/Data_Files/Portfolio Projects/Bank_Fraud_Detection/fraudTest.csv',
                   header=True, inferSchema=True)
# load train data
fraud_train.show(5)

+---+---------------------+----------------+--------------------+-------------+------+---------+-------+------+--------------------+--------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|_c0|trans_date_trans_time|          cc_num|            merchant|     category|   amt|    first|   last|gender|              street|          city|state|  zip|    lat|     long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat| merch_long|is_fraud|
+---+---------------------+----------------+--------------------+-------------+------+---------+-------+------+--------------------+--------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|  0|  2019-01-01 00:00:18|2703186189652095|fraud_Rippin, Kub...|     misc_net|  4.97| Jennifer|  Banks|     F|      561 Perry 

In [37]:
# load test data
fraud_test.show(5)

+---+---------------------+----------------+--------------------+--------------+-----+------+--------+------+--------------------+----------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|_c0|trans_date_trans_time|          cc_num|            merchant|      category|  amt| first|    last|gender|              street|      city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat| merch_long|is_fraud|
+---+---------------------+----------------+--------------------+--------------+-----+------+--------+------+--------------------+----------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+-----------+--------+
|  0|  2020-06-21 12:14:25|2291163933867244|fraud_Kirlin and ...| personal_care| 2.86|  Jeff| Elliott|     M|   351 Da

# Data Understanding

In [38]:
# count the number of rows
print(f"There are {fraud_train.count()} rows in the fraud_train dataset")
print(f"There are {fraud_test.count()} rows in the fraud_test dataset")

There are 1296675 rows in the fraud_train dataset
There are 555719 rows in the fraud_test dataset


In [39]:
# check duplicates - fraud_train
fraud_train.exceptAll(fraud_train.dropDuplicates()).show()


+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|_c0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [40]:
# check duplicates - fraud_test
fraud_test.exceptAll(fraud_test.dropDuplicates()).show()

+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|_c0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [41]:
# check schema
print('Schema of fraud_train:')
fraud_train.printSchema()

Schema of fraud_train:
root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [42]:
# check schema
print('Schema of fraud_test:')
fraud_test.printSchema()

Schema of fraud_test:
root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)



In [43]:
# Check for null values in each column
# cast nulls to int: 1 for null, 0 for non-null. Summarize the count of these values.
print('Null values in fraud_train:')
fraud_train.select([sum(col(c).isNull().cast('int')).alias(c) for c in fraud_train.columns]).show()


Null values in fraud_train:
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|_c0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|  0|                    0|     0|       0|       0|  0|    0|   0|     0|     0|   0|    0|  0|  0|   0|       0|  0|  0|        0|        0|        0|         0|       0|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [44]:
print('Null values in fraud_test:')
fraud_test.select([sum(col(c).isNull().cast("int")).alias(c) for c in fraud_test.columns]).show()

Null values in fraud_test:
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|_c0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|unix_time|merch_lat|merch_long|is_fraud|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+
|  0|                    0|     0|       0|       0|  0|    0|   0|     0|     0|   0|    0|  0|  0|   0|       0|  0|  0|        0|        0|        0|         0|       0|
+---+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+---------+----------+--------+



In [45]:
# check number of unique customers (cc_num) and unique transactions (trans_num) from fraud_train
# Count unique customers (cc_num)
unique_customers = fraud_train.select("cc_num").distinct().count()
print(f"Number of unique customers: {unique_customers}")

# Count unique transactions (trans_num)
unique_transactions = fraud_train.select("trans_num").distinct().count()
print(f"Number of unique transactions: {unique_transactions}")

Number of unique customers: 983
Number of unique transactions: 1296675


In [46]:
# filter rows where is_fraud is 0 - Nn-fraudulent transaction
non_fraud = fraud_train.filter(fraud_train["is_fraud"] == 0)
print(f"Number of non-fraudulent transactions: {non_fraud.count()}")

# filter rows where is_fraud is 1 - Fraudulent transaction
fraud = fraud_train.filter(fraud_train["is_fraud"] == 1)
print(f"Number of fraudulent transactions: {fraud.count()}")

Number of non-fraudulent transactions: 1289169
Number of fraudulent transactions: 7506


- Fraud cases are 7506 in number, making that 0.5% iof the total transaction. The data is extremely imbalnced

# Data Cleaning
- Rename the columns for better understanding
  - `first` to `first_name`
  - `last` to `last_name`
  - `city_pop` to `city_population`
  - `dob` to `date_of_birth`


In [47]:
# rename the columns for both train and test datasets
# train dataset
fraud_train = fraud_train.withColumnRenamed('first', 'first_name') \
                         .withColumnRenamed('last', 'last_name') \
                         .withColumnRenamed('city_pop', 'city_population') \
                         .withColumnRenamed('dob', 'date_of_birth')

# test dataset
fraud_test = fraud_test.withColumnRenamed('first', 'first_name') \
                       .withColumnRenamed('last', 'last_name') \
                       .withColumnRenamed('city_pop', 'city_population') \
                       .withColumnRenamed('dob', 'date_of_birth')

# Verify the columns were dropped
fraud_train.printSchema()
fraud_test.printSchema()


root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_population: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp 

# Feature Engineering

In [48]:
from pyspark.sql.functions import col, current_date, year, month, dayofweek, hour, sqrt, pow

# Create 'age' column from 'date_of_birth'
fraud_train = fraud_train.withColumn('age', year(current_date()) - year(col('date_of_birth')))
fraud_test = fraud_test.withColumn('age', year(current_date()) - year(col('date_of_birth')))

# Create 'transaction_hour' and 'transaction_day', 'transaction_month'
# train dataset
fraud_train = fraud_train.withColumn('transaction_month', month(col('trans_date_trans_time')))
fraud_train = fraud_train.withColumn('transaction_hour', hour(col('trans_date_trans_time')))
fraud_train = fraud_train.withColumn('transaction_day', dayofweek(col('trans_date_trans_time')))

# test dataset
fraud_test = fraud_test.withColumn('transaction_month', month(col('trans_date_trans_time')))
fraud_test = fraud_test.withColumn('transaction_hour', hour(col('trans_date_trans_time')))
fraud_test = fraud_test.withColumn('transaction_day', dayofweek(col('trans_date_trans_time')))

# Calculate distance between customer and merchant in km. 111 is usied to get the estimate distance
# train dataset
fraud_train = fraud_train.withColumn(
    'distance_cust_to_merch(km)',
    sqrt(pow(col('lat') - col('merch_lat'), 2) + pow(col('long') - col('merch_long'), 2)) * 111)
# test dataset
fraud_test = fraud_test.withColumn(
    'distance_cust_to_merch(km)',
    sqrt(pow(col('lat') - col('merch_lat'), 2) + pow(col('long') - col('merch_long'), 2)) * 111)

# drop unnecessary columns
fraud_train = fraud_train.drop('_c0', 'cc_num', 'trans_num', 'unix_time', 'first_name', 'last_name', 'street', 'city')
fraud_test = fraud_test.drop('_c0', 'cc_num', 'trans_num', 'unix_time', 'first_name', 'last_name', 'street', 'city')

# Verify updated data
fraud_train.show(5)
fraud_test.show(5)

+---------------------+--------------------+-------------+------+------+-----+-----+-------+---------+---------------+--------------------+-------------+------------------+-----------+--------+---+-----------------+----------------+---------------+--------------------------+
|trans_date_trans_time|            merchant|     category|   amt|gender|state|  zip|    lat|     long|city_population|                 job|date_of_birth|         merch_lat| merch_long|is_fraud|age|transaction_month|transaction_hour|transaction_day|distance_cust_to_merch(km)|
+---------------------+--------------------+-------------+------+------+-----+-----+-------+---------+---------------+--------------------+-------------+------------------+-----------+--------+---+-----------------+----------------+---------------+--------------------------+
|  2019-01-01 00:00:18|fraud_Rippin, Kub...|     misc_net|  4.97|     F|   NC|28654|36.0788| -81.1781|           3495|Psychologist, cou...|   1988-03-09|         36.011293|

In [49]:
fraud_train.printSchema()
fraud_test.printSchema()

root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_population: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- transaction_month: integer (nullable = true)
 |-- transaction_hour: integer (nullable = true)
 |-- transaction_day: integer (nullable = true)
 |-- distance_cust_to_merch(km): double (nullable = true)

root
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt:

In [53]:
# drop the 'trans_date_trans_time' column in both fraud_train and fraud_test datasets
fraud_train = fraud_train.drop('trans_date_trans_time')
fraud_test = fraud_test.drop('trans_date_trans_time')

# Data Encoding

In [50]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# define categorical columns
# categorical_columns = ['merchant', 'category', 'gender', 'state', 'job']
categorical_columns = ['category','gender', 'state', 'job']

# list to hold stages for the pipeline
indexers = []
encoders = []

# stringIndexer for each categorical column
for col_name in categorical_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + '_index', handleInvalid="skip")  # handle invalid categories
    encoder = OneHotEncoder(inputCol=col_name + '_index', outputCol=col_name + '_onehot')
    indexers.append(indexer)
    encoders.append(encoder)

# list of numerical columns
numerical_columns = ['amt', 'city_population', 'age', 'transaction_month', 'transaction_hour',
                     'transaction_day', 'distance_cust_to_merch(km)']

# combine all encoded columns with numerical columns
assembler = VectorAssembler(
    inputCols=[col_name + '_onehot' for col_name in categorical_columns] + numerical_columns,
    outputCol="features"
)

# create the Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit the pipeline on the training data
pipeline_model = pipeline.fit(fraud_train)

# Transform both the training and test datasets
fraud_train_transformed = pipeline_model.transform(fraud_train)
fraud_test_transformed = pipeline_model.transform(fraud_test)


In [51]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

# Define the list of features you want to check correlation for
features = ['amt', 'city_population', 'age', 'transaction_month', 'transaction_hour',
            'transaction_day', 'distance_cust_to_merch(km)']

# Drop the existing 'features' column if it exists
fraud_train_transformed = fraud_train_transformed.drop("features")

# Assemble features into a new vector column
assembler = VectorAssembler(inputCols=features, outputCol="new_features")
fraud_train_transformed = assembler.transform(fraud_train_transformed)

# Compute correlations between each feature and 'is_fraud'
for feature in features:
    correlation = fraud_train_transformed.stat.corr(feature, 'is_fraud')
    print(f"Correlation between {feature} and is_fraud: {correlation}")

Correlation between amt and is_fraud: 0.21940388895887128
Correlation between city_population and is_fraud: 0.0021359024181982463
Correlation between age and is_fraud: 0.012378101674716485
Correlation between transaction_month and is_fraud: -0.012409331585155019
Correlation between transaction_hour and is_fraud: 0.01379937052344759
Correlation between transaction_day and is_fraud: 0.009620213899482673
Correlation between distance_cust_to_merch(km) and is_fraud: 0.00043417047602957134


#### Oversampling the Minority Class (Fraudulent transactions)
- First convert the data to pandas dataframe, apply the SMOTE and then convert back to pyspark dataframe.

In [57]:
from imblearn.over_sampling import SMOTE
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
import pandas as pd

# Convert features and target column to Pandas
train_data = fraud_train_transformed.select("new_features", "is_fraud").toPandas()

# Separate features (as dense arrays) and target variable
X = train_data["new_features"].apply(lambda x: x.toArray()).tolist()  # Convert SparseVector to dense
y = train_data["is_fraud"]

# Initialize and apply SMOTE
smote = SMOTE(random_state=42)
X_resampled, y_resampled = smote.fit_resample(X, y)

# Initialize Spark session if not already done
spark = SparkSession.builder.getOrCreate()

# Create Pandas DataFrame for the resampled data
resampled_data = pd.DataFrame(X_resampled, columns=[f"new_features_{i}" for i in range(len(X_resampled[0]))])
resampled_data["is_fraud"] = y_resampled

# Convert to PySpark DataFrame
resampled_spark = spark.createDataFrame(
    resampled_data.apply(lambda row: Row(features=Vectors.dense(row[:-1]), is_fraud=int(row[-1])), axis=1).tolist()
)

  resampled_data.apply(lambda row: Row(features=Vectors.dense(row[:-1]), is_fraud=int(row[-1])), axis=1).tolist()


# Modelling

In [59]:
# linear regressio
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# Select only 'amt' and 'is_fraud' columns
fraud_train_single_feature = fraud_train_transformed.select("amt", "is_fraud")
fraud_test_single_feature = fraud_test_transformed.select("amt", "is_fraud")

# Assemble 'amt' into a feature vector
assembler = VectorAssembler(inputCols=["amt"], outputCol="features")
fraud_train_single_feature = assembler.transform(fraud_train_single_feature)
fraud_test_single_feature = assembler.transform(fraud_test_single_feature)

# Initialize the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="is_fraud")

# Train the model
lr_model = lr.fit(fraud_train_single_feature)

# Make predictions
predictions = lr_model.transform(fraud_test_single_feature)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")


Root Mean Squared Error (RMSE) on test data: 0.06081441120345995


In [60]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

# 'amt' as the only feature
assembler = VectorAssembler(inputCols=["amt"], outputCol="features")

# Transform the data with the assembler
fraud_train_transformed = assembler.transform(fraud_train)
fraud_test_transformed = assembler.transform(fraud_test)

# Initialize Random Forest Classifier
rf_classifier = RandomForestClassifier(labelCol="is_fraud", featuresCol="features", numTrees=100)

# Train the model
rf_model = rf_classifier.fit(fraud_train_transformed)

# Make predictions
fraud_predictions = rf_model.transform(fraud_test_transformed)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction")

# Calculate AUC (Area Under ROC curve)
auc = evaluator.evaluate(fraud_predictions)
print(f"Model AUC: {auc}")


Model AUC: 0.5
