## Section 1a) Mount Storage (and Upload Data)

1. Mount the Azure storage blob container.

2. Upload your data to your storage container.

  _Note: You may use any dataset that you find online (as long as you're allowed to use it) or use a dataset from `"/databricks-datasets"`_

In [None]:
# Mount Blob to DBFS
dbutils.fs.mount(
  source = "wasbs://datalake@dsba6190storageks.blob.core.windows.net/",
  mount_point = "/mnt/zeta/",
  extra_configs = {"fs.azure.account.key.dsba6190storageks.blob.core.windows.net": "T893lTdv/v0N1Gh2RDcHBKMU+z6wtLuVgWq7+Ki6X/QYL4Qyqi2vuT64n6PRC7mwxKTNg4VMPiZ9+AStoGQc0A=="})

In [0]:
display(dbutils.fs.ls("/mnt/zeta"))

path,name,size,modificationTime
dbfs:/mnt/zeta/NYCTaxiCompanies.csv,NYCTaxiCompanies.csv,233,1675009673000
dbfs:/mnt/zeta/financial_crime.csv,financial_crime.csv,493534783,1675296871000
dbfs:/mnt/zeta/lrcvModel/,lrcvModel/,0,0
dbfs:/mnt/zeta/pipeline/,pipeline/,0,0
dbfs:/mnt/zeta/pipelineMM/,pipelineMM/,0,0
dbfs:/mnt/zeta/pipelineModel/,pipelineModel/,0,0
dbfs:/mnt/zeta/rfcvModel/,rfcvModel/,0,0


## Section 1b) Read in Your Data

1. Read in your data as a Spark dataframe.

2. Describe your dataset below and provide a link or location where you retrieved the data.

Description of your Data: 
Synthetic Financial Datasets For Fraud Detection, target variable is isFraud
step:integer - maps a unit of time in the real world. In this case 1 step is 1 hour of time. Total steps 744 (30 days simulation).
type:string - CASH-IN, CASH-OUT, DEBIT, PAYMENT and TRANSFER.
amount:double - amount of the transaction in local currency.
nameOrig:string - customer who started the transaction
oldbalanceOrg:double - initial balance before the transaction
newbalanceOrig:double - new balance after the transaction
nameDest:string - customer who is the recipient of the transaction
oldbalanceDest:double - initial balance recipient before the transaction. Note that there is not information for customers that start with M (Merchants).
newbalanceDest:double - new balance recipient after the transaction. Note that there is not information for customers that start with M (Merchants).
isFraud:integer - This is the transactions made by the fraudulent agents inside the simulation. In this specific dataset the fraudulent behavior of the agents aims to profit by taking control or customers accounts and try to empty the funds by transferring to another account and then cashing out of the system.
isFlaggedFraud:integer - The business model aims to control massive transfers from one account to another and flags illegal attempts. An illegal attempt in this dataset is an attempt to transfer more than 200.000 in a single transaction.

Data Source: https://www.kaggle.com/datasets/ealaxi/paysim1

In [0]:
dataset = sqlContext.read.format('csv') \
                    .options(header='true', inferSchema='true', delimiter= ',') \
                    .load('/mnt/zeta/financial_crime.csv')

In [0]:
display(dataset)

step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud
1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0
1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0
1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0
1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0
1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0
1,PAYMENT,7817.71,C90045638,53860.0,46042.29,M573487274,0.0,0.0,0,0
1,PAYMENT,7107.77,C154988899,183195.0,176087.23,M408069119,0.0,0.0,0,0
1,PAYMENT,7861.64,C1912850431,176087.23,168225.59,M633326333,0.0,0.0,0,0
1,PAYMENT,4024.36,C1265012928,2671.0,0.0,M1176932104,0.0,0.0,0,0
1,DEBIT,5337.77,C712410124,41720.0,36382.23,C195600860,41898.0,40348.79,0,0


## Section 2) Shape your Data for Machine Learning

1. Transform your data for use in machine learning. This includes One Hot Encoding, Vectorization, etc.

  _Don't forget to use pipelines as this will make operationalizing your code much easier._

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [0]:
label = "isFraud"

categoricalColumns = ["type"]

numericalColumns = ["amount",
                    "oldbalanceOrg",
                    "newbalanceOrig",
                    "oldbalanceDest",
                    "newbalanceDest",
                    "isFlaggedFraud"]

#categoricalColumnsclassVec = ["col1classVec",
#                              "col2classVec"]
categoricalColumnsclassVec = [c + "classVec" for c in categoricalColumns]

In [0]:
stages = []

In [0]:
for categoricalColumn in categoricalColumns:
  print(categoricalColumn)
  ## Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalColumn, outputCol = categoricalColumn+"Index").setHandleInvalid("skip")
  ## Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalColumn+"Index", outputCol=categoricalColumn+"classVec")
  ## Add stages
  stages += [stringIndexer, encoder]

type


In [0]:
## Convert label into label indices using the StringIndexer
label_stringIndexer = StringIndexer(inputCol = label, outputCol = "label").setHandleInvalid("skip")
stages += [label_stringIndexer]

In [0]:
assemblerInputs = categoricalColumnsclassVec + numericalColumns
assembler = VectorAssembler(inputCols = assemblerInputs,
                            outputCol = "features")
stages += [assembler]

In [0]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = "features",
                        outputCol = "scaledFeatures",
                        withStd = True,
                        withMean = True)
stages += [scaler]

In [0]:
prepPipeline = Pipeline().setStages(stages)
# spark.conf.set('spark.kryoserializer.buffer.max.mb', '2048')
pipelineModel = prepPipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

In [None]:
pipelineModel.save("/mnt/zeta/pipelineMM")
display(dbutils.fs.ls("/mnt/zeta/pipelineMM"))

In [None]:
from pyspark.ml import PipelineModel
pipelineModel = PipelineModel.load("/mnt/zeta/pipelineMM")
dataset = pipelineModel.transform(dataset)
display(dataset)

## Section 3a) Create a Machine Learning Model


1. Pick at least 2 machine learning algorithms from Spark MLlib and train models using your data.

  Make sure you use parameter sweeping and cross-validation in your model training to achieve the best model.

2. Explain what you're trying to predict below.

### Description of the Predictive Use Case: I'm using binary classification models Logistic Regression and Decision Tree to predict whether a transaction is a fraudulent transaction (1) or non fraudulent (0).

In [0]:
#Logistic Regression
#Split Data into Training and Test Datasets
train, test = dataset.randomSplit([0.75, 0.25], seed = 1337)

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [0]:
lr = LogisticRegression(labelCol="label", featuresCol="features")

In [0]:
lrparamGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.25])
             .addGrid(lr.maxIter, [1, 5])
             .build())

In [0]:
lrevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName = "areaUnderROC")

In [0]:
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                    estimatorParamMaps = lrparamGrid,
                    evaluator = lrevaluator,
                    numFolds = 5)

In [0]:
lrcvModel = lrcv.fit(train)
print(lrcvModel)

CrossValidatorModel_78fe8c64c084


In [0]:
lrpredictions = lrcvModel.transform(test)

In [0]:
#Decision Tree
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

In [0]:
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10])
             .addGrid(dt.maxBins, [10, 20])
             .build())

In [0]:
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

In [0]:
# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 5)

In [0]:
dtcvModel = dtcv.fit(train)
print(dtcvModel)

CrossValidatorModel_f1a6e43e41fc


In [0]:
dtpredictions = dtcvModel.transform(test)

## Section 3b) Evaluate the Model(s)

1. Evaluate the models and pick which of the algorithms worked the best and why.

2. Report the best model and its parameters below.

### Best Model: AUC is the metric used to compare two models. Decision Tree is the best model for this dataset because it has a higher AUC value of 0.974948173279357 vs. 0.5993530983070539 for Logistic Regression.

### Model Parameters: 
Accuracy: 0.9995479889279291
Recall: 0.6868932038834952
Precision:  0.9503022162525184
F1 score: 0.7974077204846435
AUC: 0.974948173279357

In [0]:
#Logistic Regression
print('Accuracy:', lrevaluator.evaluate(lrpredictions))
print('AUC:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(lrpredictions['label','prediction'].rdd).areaUnderPR)

Accuracy: 0.902430510939695
AUC: 0.5993530983070539
PR: 0.0005850389370578503


In [0]:
#Decision Tree
print('Accuracy:', dtevaluator.evaluate(dtpredictions))
print('AUC:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(dtpredictions['label','prediction'].rdd).areaUnderPR)

Accuracy: 0.940252743825891
AUC: 0.974948173279357
PR: 0.6698479295846088


In [0]:
for model in ["lrpredictions", "dtpredictions"]:
    df = globals()[model]
    
    tp = df[(df.label == 1) & (df.prediction == 1)].count()
    tn = df[(df.label == 0) & (df.prediction == 0)].count()
    fp = df[(df.label == 0) & (df.prediction == 1)].count()
    fn = df[(df.label == 1) & (df.prediction == 0)].count()
    a = ((tp + tn)/df.count())
    
    if(tp + fn == 0.0):
        r = 0.0
        p = float(tp) / (tp + fp)
    elif(tp + fp == 0.0):
        r = float(tp) / (tp + fn)
        p = 0.0
    else:
        r = float(tp) / (tp + fn)
        p = float(tp) / (tp + fp)
    
    if(p + r == 0):
        f1 = 0
    else:
        f1 = 2 * ((p * r)/(p + r))
    
    print("Model:", model)
    print("True Positives:", tp)
    print("True Negatives:", tn)
    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Total:", df.count())
    print("Accuracy:", a)
    print("Recall:", r)
    print("Precision: ", p)
    print("F1 score:", f1)
    print('AUC:', BinaryClassificationMetrics(df['label','prediction'].rdd).areaUnderROC)
print("\n")

Model: lrpredictions
True Positives: 2
True Negatives: 1588601
False Positives: 8
False Negatives: 2058
Total: 1590669
Accuracy: 0.9987011754173872
Recall: 0.000970873786407767
Precision:  0.2
F1 score: 0.0019323671497584543
AUC: 0.5993530983070539
Model: dtpredictions
True Positives: 1415
True Negatives: 1588535
False Positives: 74
False Negatives: 645
Total: 1590669
Accuracy: 0.9995479889279291
Recall: 0.6868932038834952
Precision:  0.9503022162525184
F1 score: 0.7974077204846435
AUC: 0.974948173279357




## Section 4) Save Your Transformation Pipeline and Model(s)


1. Save your pipeline out to your mounted storage under `/mnt/<GROUP>/<NAME>/pipeline/`

2. Save your model(s) out to your mounted storage under `/mnt/<GROUP>/<NAME>/models/`

In [0]:
pipelineModel.save("/mnt/zeta/MarcieMatejka/pipeline")
display(dbutils.fs.ls("/mnt/zeta/MarcieMatejka/pipeline"))

path,name,size,modificationTime
dbfs:/mnt/zeta/MarcieMatejka/pipeline/metadata/,metadata/,0,0
dbfs:/mnt/zeta/MarcieMatejka/pipeline/stages/,stages/,0,0


In [0]:
lrcvModel.save("/mnt/zeta/MarcieMatejka/models/lr")
dtcvModel.save("/mnt/zeta/MarcieMatejka/models/dt")
display(dbutils.fs.ls("/mnt/zeta/MarcieMatejka/models/"))

path,name,size,modificationTime
dbfs:/mnt/zeta/MarcieMatejka/models/dt/,dt/,0,0
dbfs:/mnt/zeta/MarcieMatejka/models/lr/,lr/,0,0
