In [1]:
import os
os.chdir('../')
print(os.listdir())
import findspark

findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorIndexer, VectorAssembler

findspark.find()

['.git', '.gitignore', '.idea', 'architecture-design', 'docker', 'drive-download-20211215T213754Z-001.zip', 'drive-download-20211221T190522Z-001.zip', 'evaluated-data', 'models', 'notebooks', 'README.md', 'scripts', 'simulated-data-raw', 'spark-warehouse', 'transformed-data', 'venv']


'D:\\spark\\spark-3.1.2-bin-hadoop2.7\\spark-3.1.2-bin-hadoop2.7'

In [2]:
from pyspark.ml.feature import VectorIndexer, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


In [3]:
spark = SparkSession.builder.appName('Spark').master("local[*]").getOrCreate()


In [4]:
print(os.listdir())
data = spark.read.parquet("transformed-data/training/training_set.parquet", engine='fastparquet')

['.git', '.gitignore', '.idea', 'architecture-design', 'docker', 'drive-download-20211215T213754Z-001.zip', 'drive-download-20211221T190522Z-001.zip', 'evaluated-data', 'models', 'notebooks', 'README.md', 'scripts', 'simulated-data-raw', 'spark-warehouse', 'transformed-data', 'venv']


In [5]:
data.printSchema()

root
 |-- index: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TX_DATETIME: timestamp (nullable = true)
 |-- CUSTOMER_ID: long (nullable = true)
 |-- TERMINAL_ID: long (nullable = true)
 |-- TX_AMOUNT: double (nullable = true)
 |-- TX_TIME_SECONDS: long (nullable = true)
 |-- TX_TIME_DAYS: long (nullable = true)
 |-- TX_FRAUD: long (nullable = true)
 |-- TX_FRAUD_SCENARIO: long (nullable = true)
 |-- TX_DURING_NIGHT: integer (nullable = true)
 |-- TX_DURING_WEEKEND: integer (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW: double (nullable = true)
 |-- CUSTOMER_ID_NB_TX_1DAY_WINDOW: long (nullable = true)
 |-- CUSTOMER_ID_NB_TX_7DAY_WINDOW: long (nullable = true)
 |-- CUSTOMER_ID_NB_TX_30DAY_WINDOW: long (nullable = true)
 |-- CUSTOMER_ID_TERMINAL_ID_NB_TX_1DAY_WINDOW: long (nullable = true)
 |-- CUSTOMER_ID_TERMINAL_ID_

In [6]:
features = ['TX_AMOUNT', 'TX_DURING_NIGHT', 'TX_DURING_WEEKEND', 'CUSTOMER_ID_AVG_AMOUNT_1DAY_WINDOW',
           'CUSTOMER_ID_AVG_AMOUNT_7DAY_WINDOW', 'CUSTOMER_ID_AVG_AMOUNT_30DAY_WINDOW',
           'CUSTOMER_ID_NB_TX_1DAY_WINDOW', 'CUSTOMER_ID_NB_TX_7DAY_WINDOW', 'CUSTOMER_ID_NB_TX_30DAY_WINDOW',
           'CUSTOMER_ID_TERMINAL_ID_NB_TX_1DAY_WINDOW', 'CUSTOMER_ID_TERMINAL_ID_NB_TX_7DAY_WINDOW', 'CUSTOMER_ID_TERMINAL_ID_NB_TX_30DAY_WINDOW',
           'CUSTOMER_ID_MAX_AMOUNT_1DAY_WINDOW', 'CUSTOMER_ID_MAX_AMOUNT_7DAY_WINDOW', 'CUSTOMER_ID_MAX_AMOUNT_30DAY_WINDOW']

## Random Forest Pipeline

In [8]:
# # Index labels, adding metadata to the label column.
# # Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="TX_FRAUD", outputCol="indexedLabel").fit(data)


featureIndexer = VectorAssembler(inputCols=features, outputCol="indexedFeatures")

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                              labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureIndexer, labelIndexer, rf, labelConverter])

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderPR")

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderROC")

paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [5, 10, 15, 20])
    .build())


crossval_roc = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_roc,
    numFolds=5)

model = crossval_roc.fit(data)
model.write().overwrite().save('models/cv-random-forest-roc')

crossval_pr = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_pr,
    numFolds=5)

model = crossval_pr.fit(data)
model.write().overwrite().save('models/cv-random-forest-pr')

## Binomial logistic regression

In [9]:
from pyspark.ml.classification import LogisticRegression

labelIndexer = StringIndexer(inputCol="TX_FRAUD", outputCol="indexedLabel").fit(data)


featureIndexer = VectorAssembler(inputCols=features, outputCol="indexedFeatures")

lr = LogisticRegression(labelCol="indexedLabel", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                              labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureIndexer, labelIndexer, lr, labelConverter])

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderPR")

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderROC")

paramGrid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.3, 0.8, 1.0])
    .addGrid(lr.maxIter, [5, 10])
    .build())

crossval_roc = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_roc,
    numFolds=5)

model = crossval_roc.fit(data)
model.write().overwrite().save('models/cv-logistic-regression-roc')

crossval_pr = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_pr,
    numFolds=5)

model = crossval_pr.fit(data)
model.write().overwrite().save('models/cv-logistic-regression-pr')

## Gradient-boosted tree classifier

In [10]:
from pyspark.ml.classification import GBTClassifier

# # Index labels, adding metadata to the label column.
# # Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="TX_FRAUD", outputCol="indexedLabel").fit(data)


featureIndexer = VectorAssembler(inputCols=features, outputCol="indexedFeatures")

gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                              labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureIndexer, labelIndexer, gbt, labelConverter])

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderPR")

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderROC")

paramGrid = (ParamGridBuilder()
    .addGrid(gbt.maxIter, [5, 10])
    .build())


crossval_roc = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_roc,
    numFolds=5)

model = crossval_roc.fit(data)
model.write().overwrite().save('models/cv-gbt-roc')

crossval_pr = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_pr,
    numFolds=5)

model = crossval_pr.fit(data)
model.write().overwrite().save('models/cv-gbt-pr')

## Linear Support Vector Machine

In [11]:
from pyspark.ml.classification import LinearSVC

# # Index labels, adding metadata to the label column.
# # Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="TX_FRAUD", outputCol="indexedLabel").fit(data)


featureIndexer = VectorAssembler(inputCols=features, outputCol="indexedFeatures")

svm = LinearSVC(labelCol="indexedLabel", featuresCol="indexedFeatures")
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                              labels=labelIndexer.labels)
pipeline = Pipeline(stages=[featureIndexer, labelIndexer, svm, labelConverter])

evaluator_pr = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderPR")

evaluator_roc = BinaryClassificationEvaluator(
    labelCol="indexedLabel", rawPredictionCol="prediction", metricName="areaUnderROC")

paramGrid = (ParamGridBuilder()
    .addGrid(svm.maxIter, [5, 10])
    .addGrid(svm.regParam, [0.01, 0.1])
    .build())


crossval_roc = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_roc,
    numFolds=5)

model = crossval_roc.fit(data)
model.write().overwrite().save('models/cv-svm-roc')

crossval_pr = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_pr,
    numFolds=5)

model = crossval_pr.fit(data)
model.write().overwrite().save('models/cv-svm-pr')