<a href="https://colab.research.google.com/github/troncosofranco/IoT-Spark/blob/main/Fraud_Detection_w_MLFlow_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data obtained from [Credit Card Fraud Detection](https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud).

#1. Import modules

In [1]:
!pip install pyspark
import pyspark #
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
import pyspark.sql.functions as F
import os
import seaborn as sns
import sklearn #
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_auc_score, accuracy_score
import matplotlib #
import matplotlib.pyplot as plt
!pip install mlflow
import mlflow
import mlflow.spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=4b67e91ad63ee8bb3b04cd31725b8c69db907342b6911f1204ec80172abd85d0
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

#2. Define Spark context

In [2]:
os.environ["SPARK_LOCAL_IP"]='127.0.0.1'
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.sparkContext._conf.getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.app.id', 'local-1673558278697'),
 ('spark.app.submitTime', '1673558276425'),
 ('spark.app.startTime', '1673558276610'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'localhost'),
 ('spark.app.name', 'pyspark-shel

Print module version.

In [3]:
print("pyspark: {}".format(pyspark.__version__))
print("matplotlib: {}".format(matplotlib.__version__))
print("seaborn: {}".format(sns.__version__))
print("sklearn: {}".format(sklearn.__version__))
print("mlflow: {}".format(mlflow.__version__))


pyspark: 3.3.1
matplotlib: 3.2.2
seaborn: 0.11.2
sklearn: 1.0.2
mlflow: 2.1.1


#3. Load data

In [4]:
df = spark.read.csv('creditcard.csv', header = True, inferSchema = True)
labelColumn = "Class"
columns = df.columns
numericCols = columns
numericCols.remove("Time")
numericCols.remove(labelColumn)
print(numericCols)

['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']


In [5]:
df.toPandas().head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


Select the feature from dataframe.

In [6]:
stages = []
assemblerInputs = numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
dfFeatures = df.select(F.col(labelColumn).alias('label'), *numericCols )
normal = dfFeatures.filter("Class == 0").sample(withReplacement=False, fraction=0.5, seed=2020)
anomaly = dfFeatures.filter("Class == 1")

Define train and test sets.

In [7]:
normal_train, normal_test = normal.randomSplit([0.8, 0.2], seed = 2020)
anomaly_train, anomaly_test = anomaly.randomSplit([0.8, 0.2], seed = 2020)

In [8]:
dfFeatures.toPandas().head()

Unnamed: 0,label,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V20,V21,V22,V23,V24,V25,V26,V27,V28,Amount
0,0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,0.251412,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62
1,0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.069083,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69
2,0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.52498,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66
3,0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.208038,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5
4,0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,0.408542,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99


In [9]:
train_set = normal_train.union(anomaly_train)
test_set = normal_test.union(anomaly_test)

Create the feature vector that the logistic regression model will use.Define the pipeline and create your final train and test sets.

In [10]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(dfFeatures)
train_set = pipelineModel.transform(train_set)
test_set = pipelineModel.transform(test_set)
selectedCols = ['label', 'features'] + numericCols
train_set = train_set.select(selectedCols)
test_set = test_set.select(selectedCols)
print("Training Dataset Count: ", train_set.count())
print("Test Dataset Count: ", test_set.count())

Training Dataset Count:  114138
Test Dataset Count:  28608


#4. Model

Define the function to train the model and calculate the relevant metrics.

In [11]:
#Train function
def train(spark_model, train_set):
 trained_model = spark_model.fit(train_set)
 trainingSummary = trained_model.summary
 pyspark_auc_score = trainingSummary.areaUnderROC
 mlflow.log_metric("train_acc", trainingSummary.accuracy)
 mlflow.log_metric("train_AUC", pyspark_auc_score)
 print("Training Accuracy: ", trainingSummary.accuracy)
 print("Training AUC:", pyspark_auc_score)
 return trained_model

In [12]:
#Evaluation function
def evaluate(spark_model, test_set):
 evaluation_summary = spark_model.evaluate(test_set)
 eval_acc = evaluation_summary.accuracy
 eval_AUC = evaluation_summary.areaUnderROC
 mlflow.log_metric("eval_acc", eval_acc)
 mlflow.log_metric("eval_AUC", eval_AUC)
 print("Evaluation Accuracy: ", eval_acc)
 print("Evaluation AUC: ", eval_AUC)

#5. MLFlow Run – Training/UI

In [13]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)

mlflow.set_experiment("PySpark_CreditCard")
with mlflow.start_run():
 trainedLR = train(lr, train_set)
 evaluate(trainedLR, test_set)
 mlflow.spark.log_model(trainedLR, "creditcard_model_pyspark")
mlflow.end_run()

2023/01/12 21:18:43 INFO mlflow.tracking.fluent: Experiment with name 'PySpark_CreditCard' does not exist. Creating a new experiment.


Training Accuracy:  0.9988347439064992
Training AUC: 0.9850628280031647
Evaluation Accuracy:  0.9990212527964206
Evaluation AUC:  0.9762576637420227


#6. Load Model

In [14]:
#model = mlflow.spark.load_model("runs:/YOUR_RUN_ID/creditcard_model_pyspark")

Obtain predictions and metrics.

In [15]:
# predictions = model.transform(test_set)
# y_true = predictions.select(['label']).collect()
# y_pred = predictions.select(['prediction']).collect()

Print out the evaluation accuracy and the AUC score.

In [16]:
# print(f"AUC Score: {roc_auc_score(y_true, y_pred):.3%}")
# print(f"Accuracy Score: {accuracy_score(y_true, y_pred):.3%}")

Construct the confusion matrix.

In [17]:
# conf_matrix = confusion_matrix(y_true, y_pred)
# ax = sns.heatmap(conf_matrix, annot=True,fmt='g')
# ax.invert_xaxis()
# ax.invert_yaxis()
# plt.ylabel('Actual')
# plt.xlabel('Predicted')

#7. Deploy model locally

In [18]:
# import pandas as pd
# import mlflow
# import mlflow.sklearn
# import seaborn as sns
# import matplotlib.pyplot as plt
# from sklearn.preprocessing import StandardScaler
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import roc_auc_score, accuracy_score, confusion_matrix
# import numpy as np
# import subprocess
# import json

In [19]:
# mlflow models serve --model-uri runs:/YOUR_MODEL_RUN/YOUR_MODEL_NAME -p 1235

## Querying the Model

In [20]:
# df = pd.read_csv("creditcard.csv")

Select 50 values from your data frame to query your model with. Transform data input to JSON format.

In [21]:
# input_json = df.iloc[:80].drop(["Time", "Class"], axis=1).to_json(orient="split")

Send data to the model and receive predictions back.

In [22]:
# proc = subprocess.run(["curl", "-X", "POST", "-H", "Content-Type:application/json; format=pandas-split", "--data", input_json, "http://127.0.0.1:1235/invocations"], stdout=subprocess.PIPE, encoding='utf-8')
# output = proc.stdout
# df2 = pd.DataFrame([json.loads(output)])
# df2

## Querying with Scaling

Recreate the data that you used to fit the scaler when training the model originally.

In [23]:
# normal = df[df.Class == 0].sample(frac=0.5, random_state=2020).reset_index(drop=True)
# anomaly = df[df.Class == 1]
# normal_train, normal_test = train_test_split(normal, test_size = 0.2, random_state = 2020)
# anomaly_train, anomaly_test = train_test_split (anomaly, test_size = 0.2, random_state = 2020)
# scaler = StandardScaler()
# scaler.fit(pd.concat((normal, anomaly)).drop(["Time", "Class"], axis=1))

Transform your data selection

In [24]:
# scaled_selection = scaler.transform(df.iloc[:80].drop(["Time", "Class"], axis=1))
# input_json = pd.DataFrame(scaled_selection).to_json(orient="split")

In [25]:
# proc = subprocess.run(["curl", "-X", "POST", "-H", "Content-Type:application/json; format=pandas-split", "--data", input_json, "http://127.0.0.1:1235/invocations"],  stdout=subprocess.PIPE, encoding='utf-8')
# output = proc.stdoutpreds = pd.DataFrame([json.loads(output)])
# preds

Check the output model.

In [26]:
# y_true = df.iloc[:80].Class
# preds = preds.T
# eval_acc = accuracy_score(y_true, preds)
# y_true.iloc[-1] = 1
# eval_auc = roc_auc_score(y_true, preds)
# print("Eval Acc", eval_acc)
# print("Eval AUC", eval_auc)

##Batch Querying

Run multiple queries. Selecti the first 8000 samples from the data frame. Since the batch size is 80, you have 100 batches that you are passing to the model. Data scaling is also required. 

In [27]:
# test = df.iloc[:8000]
# true = test.Class
# test = scaler.transform(test.drop(["Time", "Class"], axis=1))
# preds = []
# batch_size = 80
# for f in range(100):
# sample = pd.DataFrame(test[f*batch_size:(f+1)*batch_size]).to_json(orient="split")
# proc = subprocess.run(["curl", "-X", "POST", "-H", "Content-Type:application/json;format=pandas-split", "--data", sample, "http://127.0.0.1:1235/invocations"], stdout=subprocess.PIPE, encoding='utf-8')
# output = proc.stdout
# resp = pd.DataFrame([json.loads(output)])
# preds = np.concatenate((preds, resp.values[0]))

# eval_acc = accuracy_score(true, preds)
# eval_auc = roc_auc_score(true, preds)
# print("Eval Acc", eval_acc)
# print("Eval AUC", eval_auc)