### Load relevant packages and data

In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import functions as F
from IPython.core.display import HTML
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline 
from pyspark.ml.classification import LogisticRegression 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator 

  
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
# Create SparkSession
spark = SparkSession.builder\
          .appName("wb")\
          .config("spark.executor.memory", "10g")\
          .config("spark.executor.cores", "12")\
          .getOrCreate()

# Read CSV File
data = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("C:/Users/timur/WB Datasets/exercise_3_creditcard.csv")

In [4]:
data.toPandas()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.166480,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.167170,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.379780,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.108300,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.50,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.206010,0.502292,0.219422,0.215153,69.99,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
284802,172786.0,-11.881118,10.071785,-9.834783,-2.066656,-5.364473,-2.606837,-4.918215,7.305334,1.914428,...,0.213454,0.111864,1.014480,-0.509348,1.436807,0.250034,0.943651,0.823731,0.77,0
284803,172787.0,-0.732789,-0.055080,2.035030,-0.738589,0.868229,1.058415,0.024330,0.294869,0.584800,...,0.214205,0.924384,0.012463,-1.016226,-0.606624,-0.395255,0.068472,-0.053527,24.79,0
284804,172788.0,1.919565,-0.301254,-3.249640,-0.557828,2.630515,3.031260,-0.296827,0.708417,0.432454,...,0.232045,0.578229,-0.037501,0.640134,0.265745,-0.087371,0.004455,-0.026561,67.88,0
284805,172788.0,-0.240440,0.530483,0.702510,0.689799,-0.377961,0.623708,-0.686180,0.679145,0.392087,...,0.265245,0.800049,-0.163298,0.123205,-0.569159,0.546668,0.108821,0.104533,10.00,0


### Summarise data

In [5]:
data.describe().toPandas()

Unnamed: 0,summary,Time,V1,V2,V3,V4,V5,V6,V7,V8,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,count,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,...,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0,284807.0
1,mean,94813.85957508069,8.596158276780422e-16,2.778233647847305e-16,-6.833816099302568e-16,2.0206061387073364e-15,1.0522160769720545e-15,1.5392053083475877e-15,-7.149161585193282e-16,1.6565617296790687e-16,...,2.012323330058941e-16,-3.2732063092453886e-16,2.822142512971329e-16,4.481448546720649e-15,5.791978481359828e-16,1.6815005929174789e-15,-3.675246855537229e-16,-1.177555928326085e-16,88.3496192509295,0.00172748563062
2,stddev,47488.14595456624,1.9586958038574849,1.6513085794769962,1.5162550051777712,1.4158685749409203,1.3802467340314395,1.3322710897575745,1.2370935981826665,1.194352902669204,...,0.7345240143713125,0.7257015604409116,0.6244602955949904,0.6056470678271598,0.5212780705409424,0.4822270132610575,0.4036324949650308,0.3300832641602508,250.12010924018855,0.041527189635465
3,min,0.0,-56.407509631329,-72.7157275629303,-48.3255893623954,-5.68317119816995,-113.743306711146,-26.1605059358433,-43.5572415712451,-73.2167184552674,...,-34.8303821448146,-10.933143697655,-44.8077352037913,-2.83662691870341,-10.2953970749851,-2.60455055280817,-22.5656793207827,-15.4300839055349,0.0,0.0
4,max,172792.0,2.45492999121121,22.0577289904909,9.38255843282114,16.8753440335975,34.8016658766686,73.3016255459646,120.589493945238,20.0072083651213,...,27.2028391573154,10.5030900899454,22.5284116897749,4.58454913689817,7.51958867870916,3.5173456116238,31.6121981061363,33.8478078188831,25691.16,1.0


### Perform data quality checks

In [6]:
# Calculate how many null values exist in data 
data.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in data.columns)).toPandas()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [67]:
# Find transactions with invalid amounts (in this case any amount less than or equal to 0)
zero_value_transactions = data.filter(F.col("Amount")<=0)
print("Number of transactions with invalid amounts: "+str(zero_value_transactions.count()))

Number of transactions with invalid amounts: 1825


In [7]:
# Remove rows where the amount is 0, as there shouldn't be 0 amount transactions (this is just to demonstrate filtering in spark)
data = data.filter(F.col("Amount")>0)
data.toPandas()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.166480,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.167170,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.379780,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.108300,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.50,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.206010,0.502292,0.219422,0.215153,69.99,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
282977,172786.0,-11.881118,10.071785,-9.834783,-2.066656,-5.364473,-2.606837,-4.918215,7.305334,1.914428,...,0.213454,0.111864,1.014480,-0.509348,1.436807,0.250034,0.943651,0.823731,0.77,0
282978,172787.0,-0.732789,-0.055080,2.035030,-0.738589,0.868229,1.058415,0.024330,0.294869,0.584800,...,0.214205,0.924384,0.012463,-1.016226,-0.606624,-0.395255,0.068472,-0.053527,24.79,0
282979,172788.0,1.919565,-0.301254,-3.249640,-0.557828,2.630515,3.031260,-0.296827,0.708417,0.432454,...,0.232045,0.578229,-0.037501,0.640134,0.265745,-0.087371,0.004455,-0.026561,67.88,0
282980,172788.0,-0.240440,0.530483,0.702510,0.689799,-0.377961,0.623708,-0.686180,0.679145,0.392087,...,0.265245,0.800049,-0.163298,0.123205,-0.569159,0.546668,0.108821,0.104533,10.00,0


In [8]:
# Remove columns with null values
data = data.na.drop() 
data.toPandas()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.166480,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.167170,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.379780,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.108300,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.50,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.206010,0.502292,0.219422,0.215153,69.99,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
282977,172786.0,-11.881118,10.071785,-9.834783,-2.066656,-5.364473,-2.606837,-4.918215,7.305334,1.914428,...,0.213454,0.111864,1.014480,-0.509348,1.436807,0.250034,0.943651,0.823731,0.77,0
282978,172787.0,-0.732789,-0.055080,2.035030,-0.738589,0.868229,1.058415,0.024330,0.294869,0.584800,...,0.214205,0.924384,0.012463,-1.016226,-0.606624,-0.395255,0.068472,-0.053527,24.79,0
282979,172788.0,1.919565,-0.301254,-3.249640,-0.557828,2.630515,3.031260,-0.296827,0.708417,0.432454,...,0.232045,0.578229,-0.037501,0.640134,0.265745,-0.087371,0.004455,-0.026561,67.88,0
282980,172788.0,-0.240440,0.530483,0.702510,0.689799,-0.377961,0.623708,-0.686180,0.679145,0.392087,...,0.265245,0.800049,-0.163298,0.123205,-0.569159,0.546668,0.108821,0.104533,10.00,0


### Generate summary statistics 

In [9]:
# Summarise transaction amounts and counts by class. 
data.groupBy("Class").agg(F.count(F.col("Amount")).alias("count"), F.avg(F.col("Amount")).alias("avgAmount"), F.sum(F.col("Amount")).alias("sumAmount")).toPandas()

Unnamed: 0,Class,count,avgAmount,sumAmount
0,1,465,129.307462,60127.97
1,0,282517,88.852926,25102460.0


### Build ML model to predict whether a transaction is a fraudulent or not

In [10]:
# Build logistic regression 

# Create a log transformation of the transaction amount
data = data.withColumn("Log Amount", F.log(F.col("Amount")))

# Given we have a highly imbalanced dataset we will under-sample the majority class. 
under_sample = data.filter(F.col("Class")==1).union(data.filter(F.col("Class")==0).sample(fraction=0.05, seed=1))

# Reshuffle data to ensure classes are evenly distributed across nodes
under_sample = under_sample.sample(fraction=1.0, seed=1)

under_sample.toPandas()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V22,V23,V24,V25,V26,V27,V28,Amount,Class,Log Amount
0,472.0,-3.043541,-3.157307,1.088463,2.288644,1.359805,-1.064823,0.325574,-0.067794,-0.270953,...,0.435477,1.375966,-0.293803,0.279798,-0.145362,-0.252773,0.035764,529.00,1,6.270988
1,4462.0,-2.303350,1.759247,-0.359745,2.330243,-0.821628,-0.075788,0.562320,-0.399147,-0.238253,...,-0.932391,0.172726,-0.087330,-0.156114,-0.542628,0.039566,-0.153029,239.93,1,5.480347
2,6986.0,-4.397974,1.358367,-2.592844,2.679787,-1.128131,-1.706536,-3.496197,-0.248778,-0.247768,...,0.176968,-0.436207,-0.053502,0.252405,-0.657488,-0.827136,0.849573,59.00,1,4.077537
3,7519.0,1.234235,3.019740,-4.304597,4.732795,3.624201,-1.357746,1.713445,-0.496358,-1.282858,...,-0.704181,-0.656805,-1.632653,1.488901,0.566797,-0.010016,0.146793,1.00,1,0.000000
4,7526.0,0.008430,4.137837,-6.240697,6.675732,0.768307,-3.353060,-1.631735,0.154612,-2.795892,...,-0.608057,-0.539528,0.128940,1.488481,0.507963,0.735822,0.513574,1.00,1,0.000000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14564,172734.0,-1.248847,2.279478,-1.144824,-0.263061,0.070912,-0.323414,-1.454139,-5.069379,-0.827733,...,0.261945,0.490272,-0.112758,-0.453183,-0.694933,-0.106693,0.082714,9.99,0,2.301585
14565,172734.0,1.763157,-0.103980,0.199911,3.948933,-0.642361,0.257215,-0.679898,0.291004,0.187648,...,0.372182,0.189281,-0.115979,-0.370147,0.036638,-0.008578,-0.029708,52.34,0,3.957761
14566,172735.0,-0.899559,-0.047744,0.717555,-3.262816,-0.384415,-0.164529,-0.227184,0.272726,-1.835745,...,-0.464731,-0.434248,0.009776,0.803066,-0.129513,-0.176856,-0.190917,10.00,0,2.302585
14567,172759.0,1.894910,-0.427906,-2.184267,0.159979,0.587740,-0.557966,0.510524,-0.281590,0.785446,...,0.202373,-0.125329,0.017737,0.396759,-0.515948,-0.039485,-0.040441,124.99,0,4.828234


In [11]:
# Vectorise the data into a new column "features", this will be the input feature set for the logistic regression
assembler = VectorAssembler(inputCols= data.columns[1:-3]+["Log Amount"],  outputCol='Features')

# Create logistic regression model
log_reg = LogisticRegression(featuresCol='Features', labelCol='Class') 
  
# Create the pipeline
pipeline = Pipeline(stages=[ assembler, log_reg]) 

# Create train and test dataset
train_data, test_data = data.randomSplit([0.7, .3]) 
  
# Fit the model to the training dataset
fit_model = pipeline.fit(train_data) 
  
# Append the predicted values to the test dataset
results = fit_model.transform(test_data) 
  
# Showing the results 
results.select("Class", "Probability", "prediction").toPandas()

Unnamed: 0,Class,Probability,prediction
0,0,"[0.9992928269349429, 0.0007071730650570762]",0.0
1,0,"[0.9998858924831054, 0.00011410751689455711]",0.0
2,0,"[0.9999521612628116, 4.783873718838816e-05]",0.0
3,0,"[0.9989186410224268, 0.0010813589775732213]",0.0
4,0,"[0.9999697483964923, 3.0251603507669422e-05]",0.0
...,...,...,...
84732,0,"[0.9996889899794791, 0.00031101002052091875]",0.0
84733,0,"[0.9998299837875345, 0.0001700162124654847]",0.0
84734,0,"[0.999775175317423, 0.00022482468257700727]",0.0
84735,0,"[0.9991766810983095, 0.0008233189016905174]",0.0


In [84]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Class') 
  
# Get the area under curve
print(evaluator.evaluate(results))

0.7968098811877672


In [87]:
# 

tpr = results.filter( (F.col("prediction")==F.col("Class")) & (F.col("Class")==1)).count()/results.filter(F.col("Class")==1).count()

print("The true positive rate is " +str(tpr)+"\n")

tnr = results.filter( (F.col("prediction")==F.col("Class")) & (F.col("Class")==0)).count()/results.filter(F.col("Class")==0).count()
print("The true negative rate is " +str(tnr)+"\n")


fnr = results.filter( (F.col("prediction")!=F.col("Class")) & (F.col("Class")==1)).count()/results.filter(F.col("Class")==1).count()
print("The false negative rate is " +str(fnr)+"\n")


fpr = results.filter( (F.col("prediction")!=F.col("Class")) & (F.col("Class")==0)).count()/results.filter(F.col("Class")==0).count()
print("The false positive rate is " +str(fpr)+"\n")


The true positive rate is 0.59375

The true negative rate is 0.9998697623755343

The false negative rate is 0.40625

The false positive rate is 0.00013023762446572976

