In [0]:
# Installing Required Libraries
%pip install pyspark
%pip install pyspark_dist_explore
%pip install imbalanced-learn
%pip install pandas

In [0]:
# Importing Required Libraries
%matplotlib inline 
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession

#Initializing Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("spark.driver.memory", "10g")\
    .appName("Fraud Detection BigD") \
    .getOrCreate()

sc = spark.sparkContext

In [0]:
# Loading Transaction and Identity Datasets as Spark Dataframes
identity = spark.read.csv("/mnt/bigd/train_identity.csv", inferSchema =True, header=True,)
transaction = spark.read.csv("/mnt/bigd/train_transaction.csv", inferSchema =True, header=True)

# Joining Datasets on TransactionID
df = transaction.join(identity,['TransactionID'],how='left')
df.cache()

In [0]:
# Counting Number of Rows and Columns in Dataset
num_rows = df.count()
num_columns = len(df.columns)
print("Number of rows : ", num_rows)
print("Number of columns : ", num_columns)

In [0]:
# Showing top 5 Rows of the Dataset
df.show(5)

In [0]:
# Printing Schema
df.printSchema()

In [0]:
# Data Exploration
from pyspark_dist_explore import hist
from pyspark.sql.types import StringType

# Visualising Class Ratio
def check_classes(df) :
  plt.figure()
  df2 = df.withColumn('isFraud',df['isFraud'].cast(StringType()))
  temp = df2.groupBy('isFraud').count()
  hist_elect = temp.select('isFraud', "count").rdd.map(tuple).collect()
  (x_values, y_values) = zip(*hist_elect)
  plt.title('isFraud')
  plt.bar(x_values, y_values)
  plt.show()

check_classes(df)

In [0]:
# Visualising Distribution of each Column
# def check_features(df) :
#   for data_type in df.drop(*['isFraud', 'TransactionID', 'TransactionDT']).schema.fields:
#       if str(data_type.dataType) == "IntegerType" or str(data_type.dataType) == "DoubleType" :
#         fig, ax = plt.subplots()
#         ax.set_title(data_type.name)
#         hist(ax, df.select(data_type.name), bins = 20, color=['red'])
#       elif str(data_type.dataType) == "StringType" :
#         plt.figure()
#         temp = df.groupBy(data_type.name).count()
#         hist_elect = temp.select(data_type.name, "count").rdd.map(tuple).collect()
#         (x_values, y_values) = zip(*hist_elect)
#         convert = lambda i : i or 'null'
#         x_values = [convert(i) for i in x_values]
#         plt.title(data_type.name)
#         plt.bar(x_values, y_values)
#         plt.show()

# check_features(df)

In [0]:
# Check Number of NULL in Each Column
from pyspark.sql.functions import col,isnan, when, count
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.cache()
null_counts.show()

In [0]:
# Drop Columns with >= 90% of NULL Values
column_to_drop = [k for k, number_of_null in null_counts.collect()[0].asDict().items() if number_of_null/num_rows >= 0.9]
print("Number fo columns to drop : ", len(column_to_drop))
print("Columns to drop : ", column_to_drop)
df_dropped = df.drop(*column_to_drop,'TransactionID','TransactionDT')

In [0]:
# Imputing all NULL Values in Categorical Columns with "Unknown"
df_categorical = df_dropped.na.fill("Unknown")
df_categorical.cache()
df_categorical.show()
df.unpersist()
null_counts.unpersist()

In [0]:
#  Convert Categorical Variables to Numeric Variables
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

categorical_columns = ['ProductCD', 'card4', 'card6', 'P_emaildomain', 'R_emaildomain', 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', 'id_12', 'id_15', 'id_16', 'id_28', 'id_29', 'id_30', 'id_31', 'id_33', 'id_34', 'id_35', 'id_36', 'id_37', 'id_38', 'DeviceType', 'DeviceInfo']

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_categorical) for column in list(categorical_columns) ]
pipeline = Pipeline(stages=indexers)
df_numerical = pipeline.fit(df_categorical).transform(df_categorical).drop(*categorical_columns)
df_numerical.cache()
df_numerical.show()

In [0]:
# Imputing NULL Values in Numerical Columns with the Column Mean
from pyspark.ml.feature import Imputer
from collections import Counter
categorical_columns = ['ProductCD_index', 'card4_index', 'card6_index', 'P_emaildomain_index', 'R_emaildomain_index', 'M1_index', 'M2_index', 'M3_index', 'M4_index', 'M5_index', 'M6_index', 'M7_index', 'M8_index', 'M9_index', 'id_12_index', 'id_15_index', 'id_16_index', 'id_28_index', 'id_29_index', 'id_30_index', 'id_31_index', 'id_33_index', 'id_34_index', 'id_35_index', 'id_36_index', 'id_37_index', 'id_38_index', 'DeviceType_index', 'DeviceInfo_index','isFraud']

numerical_columns = list(set(df_numerical.columns) - set(categorical_columns))

imputer = Imputer(
  inputCols=numerical_columns,
  outputCols=["{}_imputed".format(c) for c in numerical_columns]
).setStrategy("mean")

df_imputed = imputer.fit(df_numerical).transform(df_numerical).drop(*numerical_columns)
df_imputed.cache()
df_imputed.show()

In [0]:
# Check if NULL values are All Imputed
df_imputed.select([count(when(col(c).isNull(), c)).alias(c) for c in df_imputed.columns]).show()

In [0]:
# Convert All Data to Float Except isFraud
df_float = df_imputed.select('isFraud', *(F.col(c).cast('float').alias(c) for c in df_imputed.columns[1:]))
df_float.cache()
df_float.show()

In [0]:
# Unpersisting Preprocessing Cached Dataframes
df_categorical.unpersist()
df_numerical.unpersist()
df_imputed.unpersist()

In [0]:
# Splitting Dataset into Train and Test Sets
df_train, df_test = df_float.randomSplit([0.8,0.2])
df_train.cache()
df_test.cache()

In [0]:
# Vectorising Train and Test Sets
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
feature_cols = df_train.columns
feature_cols.remove('isFraud')
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

df_train_vector = assembler.transform(df_train).select('features', 'isFraud')
df_train_vector.cache()
df_train_vector.show(5)

df_test_vector = assembler.transform(df_test).select('features', 'isFraud')
df_test_vector.cache()
df_test_vector.show(5)

df_float.unpersist()
df_train.unpersist()
df_test.unpersist()

In [0]:
# Scaling Train and Test Sets Before Application of PCA
scaler = StandardScaler(
    inputCol = 'features', 
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_train_vector)

df_train_scaled = scaler.transform(df_train_vector).select('scaledFeatures', 'isFraud')
df_train_scaled.cache()
df_train_scaled.show(5)

df_test_scaled = scaler.transform(df_test_vector).select('scaledFeatures', 'isFraud')
df_test_scaled.cache()
df_test_scaled.show(5)

df_train_vector.unpersist()
df_test_vector.unpersist()

In [0]:
# Applying PCA to Train and Test Sets
n_components = 40
pca = PCA(
    k = n_components, 
    inputCol = 'scaledFeatures', 
    outputCol = 'pcaFeatures'
).fit(df_train_scaled)

print('Explained Variance Ratio', sum(pca.explainedVariance.toArray()))

df_train_pca = pca.transform(df_train_scaled).select('pcaFeatures', 'isFraud')
df_train_pca.cache()
df_train_pca.show(5)

df_test_pca = pca.transform(df_test_scaled).select('pcaFeatures', 'isFraud')
df_test_pca.cache()
df_test_pca.show(5)

df_train_scaled.unpersist()
df_test_scaled.unpersist()

In [0]:
# Plotting Scatterplot of Principal Components
import numpy as np
X_pca = df_train_pca.rdd.map(lambda row: row.pcaFeatures).collect()
X_pca = np.array(X_pca)

plt.style.use('fivethirtyeight')
plt.rcParams['figure.figsize'] = 8, 6
plt.rcParams['font.size'] = 12

def plot(X_pca, y):
    markers = 's', 'x', 'o'
    colors = list(plt.rcParams['axes.prop_cycle'])
    target = np.unique(y)
    for idx, (t, m) in enumerate(zip(target, markers)):
        subset = X_pca[y == t]
        plt.scatter(subset[:, 0], subset[:, 1], s = 50,
                    c = colors[idx]['color'], label = t, marker = m)

    plt.xlabel('PC 1')
    plt.ylabel('PC 2')
    plt.legend(loc = 'lower left')
    plt.tight_layout()
    plt.show()
    
plot(X_pca, df_train_pca.select(F.collect_list('isFraud')).first()[0])

  
# Plot bar graph
plt.bar(list(range(1, 41)), pca.explainedVariance.toArray())


In [0]:
#Unvectorising Train and Test Sets
from pyspark.ml.functions import vector_to_array
df_train = df_train_pca.withColumn('pcaFeatures', vector_to_array('pcaFeatures')).select([col("pcaFeatures")[i] for i in range(40)] + ['isFraud'])
df_test = df_test_pca.withColumn('pcaFeatures', vector_to_array('pcaFeatures')).select([col("pcaFeatures")[i] for i in range(40)] + ['isFraud'])
df_train.cache()
df_test.cache()
columns = df_train.columns

In [0]:
#Splitting Train Set into Fraud (Minority Class) and Not Fraud (Majority Class) Datasets
train_fraud = df_train.filter(df_train.isFraud == 1)
train_notfraud = df_train.filter(df_train.isFraud == 0)

In [0]:
# Partitioning Not Fraud (Majority Class) Dataset using RDD
# Uncomment to Run Specific Number of Partitions
numpartitions = 11
# numpartitions = 15
# numpartitions = 19

train_rdd = train_notfraud.rdd.repartition(numpartitions)
train_rdd.cache()
test_rdd = df_test.rdd.repartition(10)
test_rdd.cache()

In [0]:
#Broadcasting Fraud (Minority Class) Dataset
train_fraud_bc = sc.broadcast(train_fraud.collect())

In [0]:
df_train_pca.unpersist()
df_test_pca.unpersist()

In [0]:
#Function for Mapping each partition to a pandas dataset
def toPandas_partition(instances):
  import pandas as pd
  panda_df = pd.DataFrame(columns = columns)
  for instance in instances:
    panda_df = panda_df.append(instance.asDict(), ignore_index=True)
  return [panda_df]

#Function for Broadcasting a Copy of Fraud (Minority Class) to All Partitions
def broadcast_fraud_data(partition):
  pd_train_fraud = toPandas_partition(train_fraud_bc.value)[0]
  partition = partition.append(pd_train_fraud, ignore_index=False)
  return partition

#Function to Perform ADASYN on Each Partitions
def adasyn(partition):
  import pandas as pd
  from imblearn.over_sampling import ADASYN
  ada = ADASYN()
  X = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  y = partition['isFraud']
  X_resampled, y_resampled = ada.fit_resample(X, y)
  partition = pd.concat([X_resampled, y_resampled], axis = 1)
  return partition

#Function to Build a GradientBoostingClassifier Model For Each Partition
def build_model_Boost(partition):
  from sklearn.ensemble import GradientBoostingClassifier
  X_train = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  y_train = partition['isFraud']
  cl = GradientBoostingClassifier()
  model = cl.fit(X_train,y_train)
  return model

#Function to Build a SVC Model For Each Partition
def build_model_SVM(partition):
  from sklearn.svm import SVC
  X_train = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  y_train = partition['isFraud']
  cl = SVC()
  model = cl.fit(X_train,y_train)
  return model

#Function to Build a GaussianNB Model For Each Partition
def build_model_GauNB(partition):
  from sklearn.naive_bayes import GaussianNB
  X_train = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  y_train = partition['isFraud']
  cl = GaussianNB()
  model = cl.fit(X_train,y_train)
  return model


In [0]:
#Mapping Functions to RDD, Producing an Ensemble of Classifier Models
models_Boost = train_rdd.mapPartitions(toPandas_partition).map(broadcast_fraud_data).map(adasyn).map(build_model_Boost).collect()
models_SVM = train_rdd.mapPartitions(toPandas_partition).map(broadcast_fraud_data).map(adasyn).map(build_model_SVM).collect()
models_GauNB = train_rdd.mapPartitions(toPandas_partition).map(broadcast_fraud_data).map(adasyn).map(build_model_GauNB).collect()

In [0]:
#Model Testing Functions
def test_model_Boost(partition):
  predictions = []
  X_test = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  for m in models_Boost:
      predictions.append(m.predict(X_test).tolist())
  predictions = np.array(predictions, dtype=np.int32).transpose()
  majority_pred = []
  for i in predictions:
    u, c = np.unique(i, return_counts = True)
    majority_pred.append(u[c == c.max()][0])
  return majority_pred

def test_model_SVM(partition):
  predictions = []
  X_test = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  for m in models_SVM:
      predictions.append(m.predict(X_test).tolist())
  predictions = np.array(predictions, dtype=np.int32).transpose()
  majority_pred = []
  for i in predictions:
    u, c = np.unique(i, return_counts = True)
    majority_pred.append(u[c == c.max()][0])
  return majority_pred

def test_model_GauNB(partition):
  predictions = []
  X_test = partition.loc[:,'pcaFeatures[0]':'pcaFeatures[39]']
  for m in models_GauNB:
      predictions.append(m.predict(X_test).tolist())
  predictions = np.array(predictions, dtype=np.int32).transpose()
  majority_pred = []
  for i in predictions:
    u, c = np.unique(i, return_counts = True)
    majority_pred.append(u[c == c.max()][0])
  return majority_pred

In [0]:
#Testing Classifier Models
results_Boost = test_rdd.mapPartitions(toPandas_partition).flatMap(test_model_Boost).collect()
results_SVM = test_rdd.mapPartitions(toPandas_partition).flatMap(test_model_SVM).collect()
results_GauNB = test_rdd.mapPartitions(toPandas_partition).flatMap(test_model_GauNB).collect()
y_test = df_test.select("isFraud").collect()

In [0]:
#Display Confusion Matrix and Classifier Performance Based on Accuracy, F1, Precision and Recall
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
con_matrix = confusion_matrix(y_test, results_Boost)
tn, fp, fn, tp = confusion_matrix(y_test, results_Boost).ravel()
print("Boost")
print("TN: ", tn, "FP: ", fp, "FN: ", fn, "TP: ", tp)
print("Accuracy: ", accuracy_score(y_test, results_Boost))
print("F1: ", f1_score(y_test, results_Boost))
print("Precision: ", precision_score(y_test, results_Boost))
print("Recall: ", recall_score(y_test, results_Boost))

con_matrix = confusion_matrix(y_test, results_SVM)
tn, fp, fn, tp = confusion_matrix(y_test, results_SVM).ravel()
print("SVM")
print("TN: ", tn, "FP: ", fp, "FN: ", fn, "TP: ", tp)
print("Accuracy: ", accuracy_score(y_test, results_SVM))
print("F1: ", f1_score(y_test, results_SVM))
print("Precision: ", precision_score(y_test, results_SVM))
print("Recall: ", recall_score(y_test, results_SVM))

con_matrix = confusion_matrix(y_test, results_GauNB)
tn, fp, fn, tp = confusion_matrix(y_test, results_GauNB).ravel()
print("GauNB")
print("TN: ", tn, "FP: ", fp, "FN: ", fn, "TP: ", tp)
print("Accuracy: ", accuracy_score(y_test, results_GauNB))
print("F1: ", f1_score(y_test, results_GauNB))
print("Precision: ", precision_score(y_test, results_GauNB))
print("Recall: ", recall_score(y_test, results_GauNB))