In [0]:
# all spark imports
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.types import *       # data types for spark
from pyspark.sql.functions import *   # some special manipulation of data functions

# initiate spark session 
spark = SparkSession.builder.appName('Assignment 4').getOrCreate()
# set shuffle partition to the same number of cpu core to improve performance 
# get or create to make sure we are not running the same session already
spark.conf.set('spark.sql.shuffle.partitions',2) # we have 2 cores and we want to maximize the parralelization so we set our partition to 2 
# check out how to tune partition size online

# PART B:The purpose of this part is to work with a fraud detection. To do this, create a fraud detection system using Apache Spark. 

Data input For part B implementation, the dataset is provided to you, download it from Quercus. 

  • fraud_detection_data.csv 
  
Implementation:
Load Dataset and import required libraries. Create a fraud detection systemand answer following questions.You can use Databricks Community Edition or Azure Databricks or Azure Synapse for this part.

1.[Marks: 10] Explore your data and describe schema of your dataset. Show your code and output.

2.[Marks: 20] Explain whichtwo Machine Learning modelsyou have selected and why. Now split dataset into train and test. Train your model on 70percent of your data and test with the other 30 percent. Show your code and output.

3.[Marks: 20] Now tune parameters of the algorithm to get the best set of parameters. Explain different parameters of the algorithm which you have used for tuning your algorithm. Show your code and output.

4.[Marks: 10] Evaluate yourmodelsby comparing the Precision-Recall (PR) and Area under the ROC curve (AUC) metrics for the training and test sets. Showwhich one works better.

5.[Marks: 10] BONUS question: Displayconfusion metrics. Show your code and output.Explain which one works better.

In [0]:
# File location and type
file_location = "/FileStore/tables/fraud_detection_data.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

## 1.[Marks: 10] Explore your data and describe schema of your dataset. Show your code and output.
Usefull link for data exploration: https://medium.com/@aieeshashafique/exploratory-data-analysis-using-pyspark-dataframe-in-python-bd55c02a2852

In [0]:
#display(df)

In [0]:
df.printSchema()

In [0]:
#display(df.describe())

Will need to one hot encode 'Type'. Origin and destination columns may need to be dropped as they are not numeric and encoding them would result in the addition of two many dimesnssions

In [0]:
df.count()

In [0]:
print('Distinct entries of Type col')
df.select('type').distinct().show()
print('total',df.select('type').distinct().count())

In [0]:
print('Distinct entries of nameOrig col')
df.select('nameOrig').distinct().show(5)
print('total',df.select('nameOrig').distinct().count())

In [0]:
print('Distinct entries of nameDest col')
df.select('nameDest').distinct().show(5)
print('total',df.select('nameDest').distinct().count())

In [0]:
# count number of nans in each col. 
df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()

#### Calculate proportions of true labels

In [0]:
df.groupby('isFraud').count().show()

In [0]:
# Calculate proportions
fraud_cases = df.filter(df.isFraud == 1).count()
total_cases = df.count()
fraud_pct = 1.*fraud_cases/total_cases

# Provide quick statistics
print("Based on these rules, we have flagged %s (%s) fraud cases out of a total of %s cases." % (fraud_cases, fraud_pct, total_cases))


In [0]:
df.groupby('isFlaggedFraud').count().show()

In [0]:
# Calculate the differences between originating and destination balances
df = df.withColumn("orgDiff", df.newbalanceOrig - df.oldbalanceOrg).withColumn("destDiff", df.newbalanceDest - df.oldbalanceDest)

# Create temporary view
df.createOrReplaceTempView("financials")

more on views and temp. views: https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-create-view.html

#### Distribution of transactions type.

In [0]:
%sql
-- Organize by Type
select type, count(1) from financials group by type

type,count(1)
PAYMENT,18694
DEBIT,583
TRANSFER,3809
CASH_OUT,10138
CASH_IN,7375


VS. non SQL

In [0]:
display(df.groupBy('Type').count())

Type,count
PAYMENT,18694
DEBIT,583
TRANSFER,3809
CASH_OUT,10138
CASH_IN,7375


#### Quantify dollar value distribution of fraud transactions across types?

In [0]:
%sql
select type, sum(amount) from financials group by type

type,sum(amount)
PAYMENT,171958311.50000018
DEBIT,2130479.3699999982
TRANSFER,2731041314.630002
CASH_OUT,1840358775.439992
CASH_IN,1235706049.4100006


VS. non SQL

In [0]:
display(df.groupBy('Type').sum('amount').sort('sum(amount)',ascending=True))

Type,sum(amount)
DEBIT,2130479.3699999982
PAYMENT,171958311.50000018
CASH_IN,1235706049.4100006
CASH_OUT,1840358775.439992
TRANSFER,2731041314.630002


### Creating set of rules to detect fraud based on given variables

In [0]:
display(df)

In [0]:
from pyspark.sql import functions as F

# Rules to Identify Known Fraud-based
df = df.withColumn("label", 
                   F.when(
                     ((df.oldbalanceOrg <= 100000) & (df.type == "TRANSFER") & (df.newbalanceDest <= 100)) | 
                     ((df.oldbalanceOrg > 50000) & (df.newbalanceOrig <= 10)) | 
                     ((df.oldbalanceOrg > 50000) & (df.newbalanceOrig > 10) & (df.amount > 1000000)), 1)
                     .otherwise(0))
# if account is flushed via transfer or other big transactions occur 

# Calculate proportions
fraud_cases = df.filter(df.label == 1).count()
total_cases = df.count()
fraud_pct = 1.*fraud_cases/total_cases

# Provide quick statistics
print("Based on these rules, we have flagged %s (%s) fraud cases out of a total of %s cases." % (fraud_cases, fraud_pct, total_cases))

# Create temporary view to review data
df.createOrReplaceTempView("financials_labeled")

I adjusted the rules to try and get approoximatley 5% fraud. this way the target variable will still be skewed biased but give the learning model more of a chance to learn

as seen from previous analysis. the dataset actually only has 2.3% fraud so this a very conservative approcimation

#### Quantifying the dollar value of these fraudlent transactions

In [0]:
%sql
select label, count(1) as `Transactions`, sum(amount) as `Total Amount` from financials_labeled group by label

label,Transactions,Total Amount
0,39007,5094903474.009965
1,1592,886291456.3400002


dollar amount is around 3x that of % of transactions

#### To see what type of transactions our rule based model is classifying as fraud

In [0]:
%sql
select type, label, count(1) as `Transactions` from financials_labeled group by type, label

type,label,Transactions
PAYMENT,0,18693
CASH_OUT,0,9349
DEBIT,0,582
TRANSFER,0,3011
CASH_OUT,1,789
CASH_IN,1,3
DEBIT,1,1
PAYMENT,1,1
TRANSFER,1,798
CASH_IN,0,7372


Vs. non SQL

In [0]:
display(df.groupby('type','label').count())

type,label,count
PAYMENT,0,18693
CASH_OUT,0,9349
DEBIT,0,582
TRANSFER,0,3011
CASH_OUT,1,789
CASH_IN,1,3
DEBIT,1,1
PAYMENT,1,1
TRANSFER,1,798
CASH_IN,0,7372


#### A rules based model is useless as fraud detection is a dynamic problem and the rules will need to be updated learned and change over time. for that reason ml is a better approach

##2.[Marks: 20] Explain which two Machine Learning models you have selected and why. Now split dataset into train and test. Train your model on 70percent of your data and test with the other 30 percent. Show your code and output.

###Explain models of choice;

Refereing to the following reaserch paper: https://doi.org/10.1186/s40537-018-0145-4

Titiled: "Intrusion detection model using machine learning algorithm on Big Data environment"

SVM was referenced as the best classification method followed by logistic regression. considering that I will test it out on both these modesl and I will also test the decision tree approach covered in class for comparison.

Side note, Read tho following for more info on the distinction between SVM and Regularized Logistic Regression: https://stats.stackexchange.com/questions/58684/regularized-logistic-regression-and-support-vector-machine

###Split dataset into train and test.

In [0]:
# Initially split our dataset between training and test datasets
(train, test) = df.randomSplit([0.7, 0.3], seed=12345)

# Cache the training and test datasets
# This enabels faster computation on the data as it is in hot storage
train.cache()
test.cache()

# Print out dataset counts
print("Total rows: %s, Training rows: %s, Test rows: %s" % (df.count(), train.count(), test.count()))

In [0]:
display(train)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,orgDiff,destDiff,label
1,CASH_IN,270.78,C619985571,4184966.65,4185237.43,C875917495,3019.0,0.0,0,0,270.7800000002608,-3019.0,0
1,CASH_IN,484.57,C1859216983,5422437.76,5422922.33,C657736958,5638778.53,5579568.65,0,0,484.570000000298,-59209.87999999989,0
1,CASH_IN,783.31,C1585711807,8150331.93,8151115.24,C284686302,2013.12,1229.81,0,0,783.3100000005215,-783.31,0
1,CASH_IN,863.08,C1554118033,9290756.54,9291619.62,C5592464,5577.88,4714.8,0,0,863.0800000000745,-863.0799999999999,0
1,CASH_IN,911.76,C566760932,1335635.48,1336547.24,C1364913072,48321.6,47409.85,0,0,911.7600000000092,-911.75,0
1,CASH_IN,1076.27,C217389263,3538789.28,3539865.55,C838411509,22774.25,23539.55,0,0,1076.2700000000186,765.2999999999993,0
1,CASH_IN,2099.59,C685934,7096554.61,7098654.2,C1854778591,40471.79,0.0,0,0,2099.589999999851,-40471.79,0
1,CASH_IN,2643.45,C1574509514,6434890.26,6437533.71,C215145189,49974.0,1891.79,0,0,2643.4500000001863,-48082.21,0
1,CASH_IN,2673.64,C1164394344,768815.91,771489.55,C1330400026,135676.32,817.21,0,0,2673.640000000014,-134859.11000000002,0
1,CASH_IN,4865.48,C691096777,7395911.97,7400777.45,C1577213552,274039.11,217862.21,0,0,4865.480000000447,-56176.9,0


### Train your model on 70percent of your data and test with the other 30 percent.

#### Logistic Regression

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html

class pyspark.ml.classification.LogisticRegression(*, featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, regParam=0.0, elasticNetParam=0.0, tol=1e-06, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol='probability', rawPredictionCol='rawPrediction', standardization=True, weightCol=None, aggregationDepth=2, family='auto', lowerBoundsOnCoefficients=None, upperBoundsOnCoefficients=None, lowerBoundsOnIntercepts=None, upperBoundsOnIntercepts=None, maxBlockSizeInMB=0.0)[source]¶

A cool feature is this: standardization = Param(parent='undefined', name='standardization', doc='whether to standardize the training features before fitting the model.') And is set to true by default so dont need to worry about standardization!

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")
# VectorAssembler is a transformer that combines a given list of columns into a single vector column
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "orgDiff", "destDiff"], outputCol = "features")

# Using Logistic Reg
lr = LogisticRegression(labelCol = "label", featuresCol = "features")

# Create our pipeline stages
lr_pipeline = Pipeline(stages=[indexer, va, lr])

In [0]:
# Train and View the trained Decision Tree model
lr_model = lr_pipeline.fit(train)
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lr_model.stages[-1].coefficients))
print("Intercept: " + str(lr_model.stages[-1].intercept)) # extract coeff from last stage of pipeline


looks like only the first feature/variable has the greatest influence on this model based on these coefficients..

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model for test vs training error
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

# Make predictions ie. TEST!
lr_predictions = lr_model.transform(test)
# Select example rows to display.
lr_predictions.select("prediction", "label", "features").show(5)

# Make predictions on TRAINING!
training_pred= lr_model.transform(train)

print("+----------------------------TRAINING-----------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(training_pred))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(training_pred))
print("+------------------------------TEST-------------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(lr_predictions))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(lr_predictions))

Clearly severe underfitting. with both training and testing evaluation metrics doing poorly

#### SVM

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LinearSVC.html

pyspark.ml.classification.LinearSVC(*, featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, regParam=0.0, tol=1e-06, rawPredictionCol='rawPrediction', fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0)

A cool feature is this:
standardization = Param(parent='undefined', name='standardization', doc='whether to standardize the training features before fitting the model.')
And is set to true by default so dont need to worry about standardization!

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline

# Encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")
# VectorAssembler is a transformer that combines a given list of columns into a single vector column
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "orgDiff", "destDiff"], outputCol = "features")

# Using the SVC
sv = LinearSVC(labelCol = "label", featuresCol = "features")

# Create our pipeline stages
sv_pipeline = Pipeline(stages=[indexer, va, sv])

In [0]:
# Train and View the trained Decision Tree model
sv_model = sv_pipeline.fit(train)
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(sv_model.stages[-1].coefficients))
print("Intercept: " + str(sv_model.stages[-1].intercept)) # extract coeff from last stage of pipeline

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model for test vs training error
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

# Make predictions ie. TEST!
sv_predictions = sv_model.transform(test)
# Select example rows to display.
sv_predictions.select("prediction", "label", "features").show(5)

# Make predictions on TRAINING!
training_pred= sv_model.transform(train)

print("+----------------------------TRAINING-----------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(training_pred))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(training_pred))
print("+------------------------------TEST-------------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(sv_predictions))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(sv_predictions))

Clearly underfitting same as logistic regression just slightly better

#### Decesion Trees


class pyspark.ml.classification.DecisionTreeClassifier(*, featuresCol='features', labelCol='label', predictionCol='prediction', probabilityCol='probability', rawPredictionCol='rawPrediction', maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity='gini', seed=None, weightCol=None, leafCol='', minWeightFractionPerNode=0.0)

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Encodes a string column of labels to a column of label indices
indexer = StringIndexer(inputCol = "type", outputCol = "typeIndexed")
# VectorAssembler is a transformer that combines a given list of columns into a single vector column
va = VectorAssembler(inputCols = ["typeIndexed", "amount", "oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest", "orgDiff", "destDiff"], outputCol = "features")

# Using the DecisionTree classifier model
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features")

# Create our pipeline stages
dt_pipeline = Pipeline(stages=[indexer, va, dt])

In [0]:
# Train and View the trained Decision Tree model
dt_model = dt_pipeline.fit(train)
# word summary only
print(dt_model.stages[-1])
# visual
display(dt_model.stages[-1])

treeNode
"{""index"":17,""featureType"":""continuous"",""prediction"":null,""threshold"":-42082.0,""categories"":null,""feature"":6,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":6.565,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":47906.79,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":-364753.01,""categories"":null,""feature"":7,""overflow"":false}"
"{""index"":1,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[1.0],""feature"":0,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":782450.4550000001,""categories"":null,""feature"":1,""overflow"":false}"


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model for test vs training error
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

# Make predictions ie. TEST!
dt_predictions = dt_model.transform(test)
# Select example rows to display.
dt_predictions.select("prediction", "label", "features").show(5)

# Make predictions on TRAINING!
training_pred= dt_model.transform(train)

print("+----------------------------TRAINING-----------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(training_pred))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(training_pred))
print("+------------------------------TEST-------------------------------+")
# Area under precision-recall curve
print("Area under PR = %s" % evaluatorPR.evaluate(dt_predictions))
# Area under ROC curve
print("Area under ROC = %s" % evaluatorAUC.evaluate(dt_predictions))

already looking great, I think this is because the target itself is rule based and dec. trees work on the same rule based splitting logic with a greedy entropy rediction feature importance and split proporty behaviour. so bassically the tree can easily re-create the rules we used to create the labels

## 3.[Marks: 20] Now tune parameters of the algorithm to get the best set of parameters. Explain different parameters of the algorithm which you have used for tuning your algorithm. Show your code and output.

#### Logistic Regression

will tune threshold to alter the classification boundary to compensate for the underfiting (this means we can capture more true negatives at the expensive of introducing more false possitives)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Build the grid of different parameters
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [20,30]) \
    .addGrid(lr.threshold, [0.2,0.3,0.325]) \
    .build()

# Build out the cross validation
crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluatorPR,
                          numFolds = 3)  # using areaa under Percision Recall curve for evaluation as we want

pipelineCV = Pipeline(stages=[indexer, va, crossval])

# Train the model using the pipeline, parameter grid, and preceding BinaryClassificationEvaluator
cvModel_u = pipelineCV.fit(train)

For more details on how to acess tuned paramaters rules and coefficients of a DT: https://kb.databricks.com/machine-learning/extract-feature-info.html

In [0]:
lrCV = cvModel_u.stages[-1].bestModel
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lrCV.coefficients))
print("Intercept: " + str(lrCV.intercept)) # extract coeff from last stage of pipeline

print('Cutoof Threshold:',lrCV._java_obj.getThreshold())
print('Max Iters:',lrCV._java_obj.getMaxIter())
lrCV.coefficientMatrix

In [0]:
# Build the best model (training and test datasets)
train_pred = cvModel_u.transform(train)
test_pred = cvModel_u.transform(test)

# Evaluate the model on training datasets
pr_train = evaluatorPR.evaluate(train_pred)
auc_train = evaluatorAUC.evaluate(train_pred)

# Evaluate the model on test datasets
pr_test = evaluatorPR.evaluate(test_pred)
auc_test = evaluatorAUC.evaluate(test_pred)

# Print out the PR and AUC values

print("+----------------------------TRAINING-----------------------------+")
print("PR train:", pr_train)
print("AUC train:", auc_train)

print("+------------------------------TEST-------------------------------+")
print("PR test:", pr_test)
print("AUC test:", auc_test)

As compared to the post-hyperparam tuning model:

%md As compared to the post-hyperparam tuning model:

+----------------------------TRAINING-----------------------------+

Area under PR = 0.5536211141614146

Area under ROC = 0.6884500383316062

+------------------------------TEST-------------------------------+

Area under PR = 0.5464549235400695

Area under ROC = 0.684582223423937

very slight improvements to be discussed further in the next section

#### SVM

will tune threshold to alter the classification boundary to compensate for the underfiting (this means we can capture more true negatives at the expensive of introducing more false possitives)

In [0]:
'''pyspark.ml.classification.LinearSVC(*, featuresCol='features', labelCol='label', predictionCol='prediction', maxIter=100, regParam=0.0, tol=1e-06, rawPredictionCol='rawPrediction', fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, aggregationDepth=2, maxBlockSizeInMB=0.0)'''

threshold = Param(parent='undefined', name='threshold', doc='The threshold in binary classification applied to the linear model prediction. This threshold can be any real number, where Inf will make all predictions 0.0 and -Inf will make all predictions 1.0.')

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Build the grid of different parameters
paramGrid = ParamGridBuilder() \
    .addGrid(sv.maxIter, [50]) \
    .addGrid(sv.threshold, [-0.7,-0.55]) \
    .build()

# Build out the cross validation
crossval = CrossValidator(estimator = sv,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluatorPR,
                          numFolds = 3)  # using areaa under Percision Recall curve for evaluation as we want

pipelineCV = Pipeline(stages=[indexer, va, crossval])

# Train the model using the pipeline, parameter grid, and preceding BinaryClassificationEvaluator
cvModel_u = pipelineCV.fit(train)

For more details on how to acess tuned paramaters rules and coefficients of a DT: https://kb.databricks.com/machine-learning/extract-feature-info.html

In [0]:
svCV = cvModel_u.stages[-1].bestModel
# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(svCV.coefficients))
print("Intercept: " + str(svCV.intercept)) # extract coeff from last stage of pipeline

print('Cutoof Threshold:',svCV._java_obj.getThreshold())
print('Max Iters:',svCV._java_obj.getMaxIter())
#svCV.coefficientMatrix

In [0]:
# Build the best model (training and test datasets)
train_pred = cvModel_u.transform(train)
test_pred = cvModel_u.transform(test)

# Evaluate the model on training datasets
pr_train = evaluatorPR.evaluate(train_pred)
auc_train = evaluatorAUC.evaluate(train_pred)

# Evaluate the model on test datasets
pr_test = evaluatorPR.evaluate(test_pred)
auc_test = evaluatorAUC.evaluate(test_pred)

# Print out the PR and AUC values

print("+----------------------------TRAINING-----------------------------+")
print("PR train:", pr_train)
print("AUC train:", auc_train)

print("+------------------------------TEST-------------------------------+")
print("PR test:", pr_test)
print("AUC test:", auc_test)

As compared to the post-hyperparam tuning model:

+----------------------------TRAINING-----------------------------+

Area under PR = 0.5754599066202166

Area under ROC = 0.7040790389297479

+------------------------------TEST-------------------------------+

Area under PR = 0.5682850129522884

Area under ROC = 0.6975897443441756

almost 15% improvement. good result. will discuss further next section

#### Decesion Trees

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use BinaryClassificationEvaluator to evaluate our model
evaluatorPR = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderPR")
evaluatorAUC = BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol = "prediction", metricName = "areaUnderROC")

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Build the grid of different parameters
paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15]) \
    .addGrid(dt.maxBins, [10, 20, 30]) \
    .build()

# Build out the cross validation
crossval = CrossValidator(estimator = dt,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluatorPR,
                          numFolds = 3)  # using areaa under Percision Recall curve for evaluation as we want

pipelineCV = Pipeline(stages=[indexer, va, crossval])

# Train the model using the pipeline, parameter grid, and preceding BinaryClassificationEvaluator
cvModel_u = pipelineCV.fit(train)

For more details on how to acess tuned paramaters rules and coefficients of a DT: https://kb.databricks.com/machine-learning/extract-feature-info.html

In [0]:
va = cvModel_u.stages[-2]
treeCV = cvModel_u.stages[-1].bestModel

#visualize the best decision tree model
display(treeCV)
#print(treeCV.toDebugString) #print the nodes of the decision tree model
print('+------------Params Selected via tuning-----------------+')
print("Tuned params",treeCV.extractParamMap)

print('Max Depth:',treeCV._java_obj.getMaxDepth())
print('Max Bins:',treeCV._java_obj.getMaxBins())

print('+------------feat importances-----------------+')
list(zip(va.getInputCols(), treeCV.featureImportances))# get feature importances

treeNode
"{""index"":17,""featureType"":""continuous"",""prediction"":null,""threshold"":-38765.56999999999,""categories"":null,""feature"":6,""overflow"":false}"
"{""index"":7,""featureType"":""continuous"",""prediction"":null,""threshold"":6.565,""categories"":null,""feature"":3,""overflow"":false}"
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":50344.5,""categories"":null,""feature"":2,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":58527.095,""categories"":null,""feature"":5,""overflow"":false}"
"{""index"":1,""featureType"":""categorical"",""prediction"":null,""threshold"":null,""categories"":[0.0,1.0],""feature"":0,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":9,""featureType"":""continuous"",""prediction"":null,""threshold"":743064.4199999999,""categories"":null,""feature"":1,""overflow"":false}"


'orgDiff' (orgDiff	Difference between the originating balance) is our most important feature with an importance of  almost 50%. Features like oldbalanceDest and destDiff do not ad any value to the DT's greedy perspective of feature importance.

Really dont know what brought about the improvement in accuracy. would love to get the your input if you are marking this! I am new to the feild and still learning.

only change in hyper params is reduction in max bins from default of 32 to 30. depth is still the same at 5 but either way in both cases we had num nodes under 30 so i am assuming the maxbins does not change

maybe it was due to better/ more general training via cross validation and not hyperpapram tuning? is that a possibility??

In [0]:
# Build the best model (training and test datasets)
train_pred = cvModel_u.transform(train)
test_pred = cvModel_u.transform(test)

# Evaluate the model on training datasets
pr_train = evaluatorPR.evaluate(train_pred)
auc_train = evaluatorAUC.evaluate(train_pred)

# Evaluate the model on test datasets
pr_test = evaluatorPR.evaluate(test_pred)
auc_test = evaluatorAUC.evaluate(test_pred)

# Print out the PR and AUC values

print("+----------------------------TRAINING-----------------------------+")
print("PR train:", pr_train)
print("AUC train:", auc_train)

print("+------------------------------TEST-------------------------------+")
print("PR test:", pr_test)
print("AUC test:", auc_test)

As compared to the post-hyperparam tuning model:

+----------------------------TRAINING-----------------------------+

Area under PR = 0.9430052903389915

Area under ROC = 0.9835269627046106

+------------------------------TEST-------------------------------+

Area under PR = 0.9432071534583562

Area under ROC = 0.9841538503942746

very slight improvements around 5 %

## 4.[Marks: 10] Evaluate yourmodelsby comparing the Precision-Recall (PR) and Area under the ROC curve (AUC) metrics for the training and test sets. Showwhich one works better.

This was done in the previous section. 

For logistic Regression and SVM, I needed to alter the decision boundary cuttoff threshold to improve on the model's results. With the boundary set at Prob.fraud=50% the area under the PR curve (score) was low. Using gridserch and evaluating the models based on this score I was able to fine tune the thresholds for both models.

For logistic regression:
- prior to threshold change, we were underfitting severly. With both training and testing scores bellow 0.6
  - with the tuned threshold (from 0.5 to 0.3) we got both our training and test scores just above 0.6 showing slight imporvement.
    - model is still underfitting and proving not ideal for the data.
For SVM
- prior to threshold change, we were underfitting severly. With both training and testing scores bellow 0.6 but slightly higher than the scores of the untuned logistic regression
  - with the tuned threshold (from 0 to -0.7) we got both our training and test scores just above 0.7. this 15% improvement is is good however when compared to the superiority of the decision tree model we cant choose this.
    - Therefor model is slightly underfitting and proving not ideal for the data.

Decision Tree:
- even prior to any tuning both training and testing scores were around 0.95, after minor tuning we were able to acheive near 0.99 for both. 
- it is the clear winner

Area under the ROC curve was generally higher than the area under the PR curve but I decided to go with area under the PR curve as my metric as I would like to maximize the tradeoff between recall and percision.