In [1]:
# Date - May 05 2020
# Explore Finance Data Simulated Credit Card Fraud
# Data Source - https://www.kaggle.com/ntnu-testimon/paysim1

# Ensure that all of the specified libraries are installed.

dbutils.library.installPyPI("mlflow")
dbutils.library.restartPython()

In [2]:
# Configure MLflow Experiment
mlflow_experiment_id = 866112

# Including MLflow
import mlflow
import mlflow.spark

import os
print("MLflow Version: %s" % mlflow.__version__)


In [3]:
# Create df DataFrame which contains our simulated financial fraud detection dataset
df = spark.sql("select step, type, amount, nameOrig, oldbalanceOrg, newbalanceOrig, nameDest, oldbalanceDest, newbalanceDest from sim_fin_fraud_detection")

df

df.printSchema()


In [4]:
# Calculate the differences between originating and destination balances
df = df.withColumn("orgDiff", df.newbalanceOrig - df.oldbalanceOrg).withColumn("destDiff", df.newbalanceDest - df.oldbalanceDest)

# Create temporary view
df.createOrReplaceTempView("financials")

In [5]:
%sql

select type, count(1) from sim_fin_fraud_detection group by type


type,count(1)
TRANSFER,532909
CASH_IN,1399284
CASH_OUT,2237500
PAYMENT,2151495
DEBIT,41432


In [6]:
%sql

select type, sum(amount) from sim_fin_fraud_detection group by type


type,sum(CAST(amount AS DOUBLE))
TRANSFER,485291987263.1668
CASH_IN,236367391912.45865
CASH_OUT,394412995224.4882
PAYMENT,28093371138.37033
DEBIT,227199221.27999964


In [7]:
from pyspark.sql import functions as F

# Rules to Identify Known Fraud-based
df = df.withColumn("label", 
                   F.when(
                     (
                       (df.oldbalanceOrg <= 56900) & (df.type == "TRANSFER") & (df.newbalanceDest <= 105)) | ( (df.oldbalanceOrg > 56900) & (df.newbalanceOrig <= 12)) | ( (df.oldbalanceOrg > 56900) & (df.newbalanceOrig > 12) & (df.amount > 1160000)
                           ), 1
                   ).otherwise(0))

# Calculate proportions
fraud_cases = df.filter(df.label == 1).count()
total_cases = df.count()
fraud_pct = 1.*fraud_cases/total_cases

# Provide quick statistics
print("Based on these rules, we have flagged %s (%s) fraud cases out of a total of %s cases." % (fraud_cases, fraud_pct, total_cases))

# Create temporary view to review data
df.createOrReplaceTempView("financials_labeled")

In [8]:
%sql
select label, count(1) as `Transactions`, sum(amount) as `Total Amount` from financials_labeled group by label

label,Transactions,Total Amount
1,255287,126346754314.6704
0,6107333,1018046190445.1278


In [9]:
%sql
-- where sum(destDiff) >= 10000000.00
select nameOrig, nameDest, label, TotalOrgDiff, TotalDestDiff
  from (
     select nameOrig, nameDest, label, sum(OrgDiff) as TotalOrgDiff, sum(destDiff) as TotalDestDiff 
       from financials_labeled 
      group by nameOrig, nameDest, label 
     ) a
 where TotalDestDiff >= 1000000
 limit 100

nameOrig,nameDest,label,TotalOrgDiff,TotalDestDiff
C752686443,C1816757085,0,-51178.0,1037297.459999999
C1034899044,C306206744,0,0.0,2308448.1800000006
C1337803451,C1778801068,0,0.0,3155647.91
C1570013189,C1945802665,1,-63916.0,1179067.2799999998
C1379731404,C413177525,0,0.0,2375262.37
C608080514,C2006081398,1,-101396.29,3208776.46
C1567963311,C1141049797,0,136177.75999999978,1243657.54
C582291919,C461640598,0,0.0,3381134.63
C689413511,C1988852187,0,0.0,1161908.05
C122861681,C1754722089,0,-22291.0,1512938.6399999997


In [10]:
%sql
select type, label, count(1) as `Transactions` from financials_labeled group by type, label

type,label,Transactions
PAYMENT,0,2150909
CASH_OUT,0,2052932
DEBIT,1,31
TRANSFER,1,70100
CASH_OUT,1,184568
PAYMENT,1,586
DEBIT,0,41401
CASH_IN,0,1399282
TRANSFER,0,462809
CASH_IN,1,2


In [11]:
# Initially split our dataset between training and test datasets
(train, test) = df.randomSplit([0.8, 0.2], seed=12345)

# Cache the training and test datasets
train.cache()
test.cache()

# Print out dataset counts
print("Total rows: %s, Training rows: %s, Test rows: %s" % (df.count(), train.count(), test.count()))

In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler

#from pyspark.ml.feature import HashingTF, Tokenizer

from pyspark.ml.classification import DecisionTreeClassifier

# Encodes a string column of labels to a column of label indices   
#Tokenizer
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")

# VectorAssembler is a transformer that combines a given list of columns into a single vector column 
#HashingTF
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "orgDiff", "destDiff"], outputCol = "features")

#hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

# Using the DecisionTree classifier model
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", seed = 54321, maxDepth = 5)

# Create our pipeline stages
#pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
pipeline = Pipeline(stages=[indexer, va, dt])


In [13]:
# View the Decision Tree model (prior to CrossValidator)
model = pipeline.fit(train)
#display(dt_model.stages[-1])