In [1]:
mlflow_experiment_id = 866112
import mlflow
import mlflow.spark
import os
print("MLflow Version: %s" % mlflow.__version__)

In [2]:
%fs ls dbfs:/FileStore/tables

path,name,size
dbfs:/FileStore/tables/fraud.csv,fraud.csv,142557
dbfs:/FileStore/tables/insurance.csv,insurance.csv,55628
dbfs:/FileStore/tables/vgsales.csv,vgsales.csv,1355781


In [3]:
fraud_data = spark.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/fraud.csv')
#display(fraud_data)

In [4]:
fraud_data.printSchema()

In [5]:
# Calculate the differences between originating and destination balances
fraud_data = fraud_data.withColumn("orgDiff", fraud_data.newbalanceOrig - fraud_data.oldbalanceOrg).withColumn("destDiff", fraud_data.newbalanceDest - fraud_data.oldbalanceDest)

# Create temporary view
fraud_data.createOrReplaceTempView("financials")

In [6]:
#display(fraud_data)

In [7]:
%sql
-- Organize by Type
select type, count(1) from financials group by type

type,count(1)
TRANSFER,227
CASH_IN,312
CASH_OUT,349
PAYMENT,1030
DEBIT,117


In [8]:
%sql
select type, sum(amount) from financials group by type

type,sum(amount)
TRANSFER,107907988.56
CASH_IN,49491919.38
CASH_OUT,58413168.860000014
PAYMENT,6210510.64
DEBIT,533043.1399999997


In [9]:
from pyspark.sql import functions as F

# Rules to Identify Known Fraud-based
fraud_data = fraud_data.withColumn("label", 
                   F.when(
                     (
                       (fraud_data.oldbalanceOrg <= 56900) & (fraud_data.type == "TRANSFER") & (fraud_data.newbalanceDest <= 105)) | 
                       (
                         (fraud_data.oldbalanceOrg > 56900) & (fraud_data.newbalanceOrig <= 12)) | 
                           (
                             (fraud_data.oldbalanceOrg > 56900) & (fraud_data.newbalanceOrig > 12) & (fraud_data.amount > 1160000)
                           ), 1
                   ).otherwise(0))

# Calculate proportions
fraud_cases = fraud_data.filter(fraud_data.label == 1).count()
total_cases = fraud_data.count()
fraud_pct = 1.*fraud_cases/total_cases

# Provide quick statistics
print("Based on these rules, we have flagged %s (%s) fraud cases out of a total of %s cases." % (fraud_cases, fraud_pct, total_cases))

# Create temporary view to review data
fraud_data.createOrReplaceTempView("financials_labeled")

In [10]:
%sql
select label, count(1) as `Transactions`, sum(amount) as `Total Amount` from financials_labeled group by label

label,Transactions,Total Amount
1,41,14049087.340000002
0,1994,208507543.2400001


In [11]:
%sql
-- where sum(destDiff) >= 10000000.00
select nameOrig, nameDest, label, TotalOrgDiff, TotalDestDiff
  from (
     select nameOrig, nameDest, label, sum(OrgDiff) as TotalOrgDiff, sum(destDiff) as TotalDestDiff 
       from financials_labeled 
      group by nameOrig, nameDest, label 
     ) a
 where TotalDestDiff >= 1000000
 limit 100

nameOrig,nameDest,label,TotalOrgDiff,TotalDestDiff
C708613859,C451111351,0,193161.04000000004,1498911.27
C1639765351,C1789550256,0,311449.3800000008,3906285.79
C2066892165,C662736689,0,-5039.0,4871072.56
C1011044643,C451111351,0,-12154.7,1888797.63
C1768127248,C1234776885,0,-16418.22,1593142.0299999998
C588449070,C11003494,0,54988.5700000003,10539130.0
C203409950,C97730845,0,44708.0199999999,9888770.12
C203819996,C1262822392,0,194900.16000000003,3866004.24
C466032056,C1234776885,0,0.0,1379219.9
C407493402,C1262822392,0,0.0,3991223.380000001


In [12]:
%sql
select type, label, count(1) as `Transactions` from financials_labeled group by type, label

type,label,Transactions
PAYMENT,0,1030
CASH_OUT,0,341
TRANSFER,1,33
CASH_OUT,1,8
DEBIT,0,117
CASH_IN,0,312
TRANSFER,0,194


In [13]:
# Initially split our dataset between training and test datasets
(train, test) = fraud_data.randomSplit([0.8, 0.2], seed=12345)

# Cache the training and test datasets
train.cache()
test.cache()

# Print out dataset counts
print("Total rows: %s, Training rows: %s, Test rows: %s" % (fraud_data.count(), train.count(), test.count()))

In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

# Encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")

# VectorAssembler is a transformer that combines a given list of columns into a single vector column
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "orgDiff", "destDiff"], outputCol = "features")

# Using the DecisionTree classifier model
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", seed = 54321, maxDepth = 5)

# Create our pipeline stages
pipeline = Pipeline(stages=[indexer, va, dt])

In [15]:
# View the Decision Tree model (prior to CrossValidator)
dt_model = pipeline.fit(train)
display(dt_model.stages[-1])

treeNode
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":-39820.0,""categories"":null,""feature"":6,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":26.575,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":63219.535,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":1291322.535,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":6075081.395,""categories"":null,""feature"":4,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":8,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
