In [1]:
# Install PySpark on Deepnote instance
# Takes ~2 minutes
! sudo apt-get update
! sudo mkdir -p /usr/share/man/man1
! sudo apt-get install -y openjdk-11-jdk
! pip install pyspark

Hit:1 http://security.debian.org/debian-security buster/updates InRelease
Hit:2 http://deb.debian.org/debian buster InRelease
Hit:3 http://deb.debian.org/debian buster-updates InRelease




openjdk-11-jdk is already the newest version (11.0.12+7-2~deb10u1).
0 upgraded, 0 newly installed, 0 to remove and 5 not upgraded.
You should consider upgrading via the '/root/venv/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
import pandas as pd
import numpy as np

import seaborn as sns
import matplotlib.pyplot as plt

import sqlite3

import pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LogisticRegression)
from pyspark.ml.evaluation import (BinaryClassificationEvaluator, MulticlassClassificationEvaluator)
from pyspark.mllib.evaluation import (BinaryClassificationMetrics, MulticlassMetrics)
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import lit
import pyspark.sql.functions as F

import warnings
warnings.filterwarnings('ignore')

In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [4]:
filepath = 'final_data.csv'

In [5]:
spark_df = spark.read.csv(filepath,
                     sep=',',
                     inferSchema=True,
                     header=True,)

spark_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- amt: double (nullable = true)
 |-- gender: integer (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- age: double (nullable = true)
 |-- tran_day_of_week: integer (nullable = true)
 |-- tran_day_of_month: integer (nullable = true)
 |-- tran_month_of_year: integer (nullable = true)
 |-- tran_hour_of_day: integer (nullable = true)
 |-- cat_food_dining: integer (nullable = true)
 |-- cat_gas_transport: integer (nullable = true)
 |-- cat_grocery_net: integer (nullable = true)
 |-- cat_grocery_pos: integer (nullable = true)
 |-- cat_health_fitness: integer (nullable = true)
 |-- cat_home: integer (nullable = true)
 |-- cat_kids_pets: integer (nullable = true)
 |-- cat_misc_net: integer (nullable = true)
 |-- cat_misc_pos: integer (nullable = true)
 |-- cat_personal_care: integer (nullable = true)
 |-- cat_shopping_net: integer (nullable = true)
 |-- cat_shopping_pos: integer (nullable = true)
 |-- cat_travel: integer (nullable = tru

In [6]:
spark_df = spark_df.drop('_c0')

In [7]:
spark_df.show(10)

+------+------+--------+----+----------------+-----------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------------+--------+-------------+------------+------------+-----------------+----------------+----------------+----------+--------+
|   amt|gender|city_pop| age|tran_day_of_week|tran_day_of_month|tran_month_of_year|tran_hour_of_day|cat_food_dining|cat_gas_transport|cat_grocery_net|cat_grocery_pos|cat_health_fitness|cat_home|cat_kids_pets|cat_misc_net|cat_misc_pos|cat_personal_care|cat_shopping_net|cat_shopping_pos|cat_travel|is_fraud|
+------+------+--------+----+----------------+-----------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------------+--------+-------------+------------+------------+-----------------+----------------+----------------+----------+--------+
|  2.86|     1|  333497|52.0|               6|               21|               

In [8]:
spark_df.dtypes

[('amt', 'double'),
 ('gender', 'int'),
 ('city_pop', 'int'),
 ('age', 'double'),
 ('tran_day_of_week', 'int'),
 ('tran_day_of_month', 'int'),
 ('tran_month_of_year', 'int'),
 ('tran_hour_of_day', 'int'),
 ('cat_food_dining', 'int'),
 ('cat_gas_transport', 'int'),
 ('cat_grocery_net', 'int'),
 ('cat_grocery_pos', 'int'),
 ('cat_health_fitness', 'int'),
 ('cat_home', 'int'),
 ('cat_kids_pets', 'int'),
 ('cat_misc_net', 'int'),
 ('cat_misc_pos', 'int'),
 ('cat_personal_care', 'int'),
 ('cat_shopping_net', 'int'),
 ('cat_shopping_pos', 'int'),
 ('cat_travel', 'int'),
 ('is_fraud', 'int')]

In [9]:
spark_df.createOrReplaceTempView('data')

In [10]:
fraud_counts = spark.sql(r"""SELECT is_fraud, COUNT(*) AS cnt 
                            FROM data
                            GROUP BY is_fraud""")
fraud_counts.show()

+--------+-------+
|is_fraud|    cnt|
+--------+-------+
|       1|   9651|
|       0|1842743|
+--------+-------+



In [11]:
gender_counts = spark.sql(r"""SELECT CASE when gender = 1 then 'male' 
                                          when gender = 2 then 'female' 
                                          end as gender 
                                          , COUNT(*) AS cnt 
                            FROM data
                            GROUP BY gender""")
gender_counts.show()
# male - 1, female - 2

+------+-------+
|gender|    cnt|
+------+-------+
|  male| 837645|
|female|1014749|
+------+-------+



In [12]:
gender_amounts = spark.sql(r"""SELECT CASE when gender = 1 then 'male' 
                                           when gender = 2 then 'female' 
                                           end as gender 
                                          , SUM(amt) AS amt 
                            FROM data
                            GROUP BY gender""")
gender_amounts.show()
# male - 1, female - 2

+------+-------------------+
|gender|                amt|
+------+-------------------+
|  male|5.879701363000017E7|
|female|7.098831837999928E7|
+------+-------------------+



In [13]:
fraud_by_gender_cnt = spark.sql(r"""SELECT CASE when gender = 1 then 'male' 
                                           when gender = 2 then 'female' 
                                           end as gender
                                           , is_fraud
                                           , COUNT(*) AS cnt 
                            FROM data
                            GROUP BY gender, is_fraud""")
fraud_by_gender_cnt.show()
# male - 1, female - 2

+------+--------+-------+
|gender|is_fraud|    cnt|
+------+--------+-------+
|  male|       0| 832893|
|  male|       1|   4752|
|female|       1|   4899|
|female|       0|1009850|
+------+--------+-------+



In [14]:
fraud_by_gender_amt = spark.sql(r"""SELECT CASE when gender = 1 then 'male' 
                                           when gender = 2 then 'female' 
                                           end as gender
                                           , is_fraud
                                           , sum(amt) AS amt 
                            FROM data
                            GROUP BY gender, is_fraud""")
fraud_by_gender_amt.show()
# male - 1, female - 2

+------+--------+-------------------+
|gender|is_fraud|                amt|
+------+--------+-------------------+
|  male|       0|5.607834692000026E7|
|  male|       1| 2718666.7100000037|
|female|       1|         2402746.58|
|female|       0| 6.85855717999996E7|
+------+--------+-------------------+



In [15]:
spark_df.show()

+------+------+--------+----+----------------+-----------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------------+--------+-------------+------------+------------+-----------------+----------------+----------------+----------+--------+
|   amt|gender|city_pop| age|tran_day_of_week|tran_day_of_month|tran_month_of_year|tran_hour_of_day|cat_food_dining|cat_gas_transport|cat_grocery_net|cat_grocery_pos|cat_health_fitness|cat_home|cat_kids_pets|cat_misc_net|cat_misc_pos|cat_personal_care|cat_shopping_net|cat_shopping_pos|cat_travel|is_fraud|
+------+------+--------+----+----------------+-----------------+------------------+----------------+---------------+-----------------+---------------+---------------+------------------+--------+-------------+------------+------------+-----------------+----------------+----------------+----------+--------+
|  2.86|     1|  333497|52.0|               6|               21|               

### split data into train, val, and test (stratified)

In [16]:
fractions = spark_df.select("is_fraud").distinct().withColumn("fraction", lit(0.6)).rdd.collectAsMap()                                                         
train_df = spark_df.stat.sampleBy("is_fraud", fractions, seed = 42)

In [17]:
test_df = spark_df.subtract(train_df)

In [18]:
fractions = test_df.select("is_fraud").distinct().withColumn("fraction", lit(0.5)).rdd.collectAsMap()                                                         
val_df = test_df.stat.sampleBy("is_fraud", fractions, seed = 42)

In [19]:
test_df = test_df.subtract(val_df)

In [20]:
train_df.groupBy('is_fraud').count().show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|       1|   5765|
|       0|1106662|
+--------+-------+



In [21]:
val_df.groupBy('is_fraud').count().show()

+--------+------+
|is_fraud| count|
+--------+------+
|       1|  1961|
|       0|368407|
+--------+------+



In [22]:
test_df.groupBy('is_fraud').count().show()

+--------+------+
|is_fraud| count|
+--------+------+
|       1|  1924|
|       0|367670|
+--------+------+



### oversample the minority class

In [23]:
# Implement oversampling method
import pyspark.sql.functions as F

# calculate ratio
major_df = train_df.filter(train_df.is_fraud == 0)
minor_df = train_df.filter(train_df.is_fraud == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))
a = range(ratio)

# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", F.explode(F.array([F.lit(x) for x in a]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
train_df_os = major_df.unionAll(oversampled_df)

ratio: 191


In [24]:
train_df_os.groupby('is_fraud').count().show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|       0|1106662|
|       1|1101115|
+--------+-------+



### Modeling

In [25]:
train_df_os.schema.names#[:-1]

['amt',
 'gender',
 'city_pop',
 'age',
 'tran_day_of_week',
 'tran_day_of_month',
 'tran_month_of_year',
 'tran_hour_of_day',
 'cat_food_dining',
 'cat_gas_transport',
 'cat_grocery_net',
 'cat_grocery_pos',
 'cat_health_fitness',
 'cat_home',
 'cat_kids_pets',
 'cat_misc_net',
 'cat_misc_pos',
 'cat_personal_care',
 'cat_shopping_net',
 'cat_shopping_pos',
 'cat_travel',
 'is_fraud']

In [26]:
# the feature columns
features = train_df_os.schema.names[:-1]

assembler = VectorAssembler(inputCols=features, outputCol='features')

train_pack = assembler.transform(train_df_os)
val_pack = assembler.transform(val_df)
test_pack = assembler.transform(test_df)

In [27]:
# scaling
minmaxscale = MinMaxScaler(inputCol='features', outputCol='features_scaled')
minmaxscale = minmaxscale.fit(train_pack)

train_pack = minmaxscale.transform(train_pack)
val_pack  = minmaxscale.transform(val_pack)
test_pack  = minmaxscale.transform(test_pack)

In [28]:
# logistic regression
lr = LogisticRegression(labelCol='is_fraud', 
                        featuresCol='features_scaled',
                        predictionCol='prediction')

lr_model = lr.fit(train_pack)
lr_pred = lr_model.transform(val_pack)

In [29]:
# Random Forest (depth = 2)
rf = RandomForestClassifier(maxDepth=2, 
                            labelCol='is_fraud',
                            featuresCol='features',
                            predictionCol='prediction')

rf_model = rf.fit(train_pack)
rf_pred = rf_model.transform(val_pack)

In [30]:
# Gradient Boosted Tree (depth = 2)
gbt = GBTClassifier(maxDepth=2, 
                    labelCol='is_fraud', 
                    featuresCol='features',
                    predictionCol='prediction')

gbt_model = gbt.fit(train_pack)
gbt_pred = gbt_model.transform(val_pack)

In [31]:
accuracy = MulticlassClassificationEvaluator(labelCol='is_fraud', 
                                             predictionCol='prediction',
                                             metricName='accuracy')

precision = MulticlassClassificationEvaluator(labelCol='is_fraud', 
                                              predictionCol='prediction',
                                              metricName='weightedPrecision')

recall = MulticlassClassificationEvaluator(labelCol='is_fraud', 
                                           predictionCol='prediction',
                                           metricName='weightedRecall')

f1 = MulticlassClassificationEvaluator(labelCol='is_fraud', 
                                       predictionCol='prediction',
                                       metricName='f1')

areaROC = BinaryClassificationEvaluator(labelCol='is_fraud',
                                        rawPredictionCol='prediction',
                                        metricName='areaUnderROC')

areaPR = BinaryClassificationEvaluator(labelCol='is_fraud',
                                       rawPredictionCol='prediction',
                                       metricName='areaUnderPR')

In [32]:
# the error metrics
metrics = [accuracy, precision, recall, f1, areaROC, areaPR]
metric_labels = ['accuracy', 'precision', 'recall', 'f1', 'areaROC', 'areaPR']

# the predictions from each model
predictions = [lr_pred, rf_pred, gbt_pred]
predict_labels = ['LR', 'RF', 'GBT']

eval_list = list()

# for each model's predictions, calculate error metrics
# and add to a Pandas series
for pred in zip(predict_labels, predictions):
    name = pred[0]
    predict = pred[1]
    
    metric_vals = pd.Series(dict([(x[0], x[1].evaluate(predict)) 
                                 for x in zip(metric_labels, metrics)]),
                            name=name)
    eval_list.append(metric_vals)
    
# combine all the series into a dataframe
eval_df = pd.concat(eval_list, axis=1).T
eval_df = eval_df[metric_labels]
eval_df

Unnamed: 0,accuracy,precision,recall,f1,areaROC,areaPR
LR,0.887798,0.99348,0.887798,0.935686,0.82567,0.031591
RF,0.966992,0.993912,0.966992,0.97897,0.857615,0.097663
GBT,0.923611,0.994646,0.923611,0.955607,0.928379,0.059157


### Grid Search CV to tune the model

In [50]:
# Tuning the GradientBoostedTree model
gbt_2 = GBTClassifier(featuresCol='features',
                     labelCol='is_fraud', 
                     predictionCol='prediction')
 
# the pipeline
pipeline = Pipeline(stages=[assembler, gbt_2]) 

# the parameter grid
paramgrid = (ParamGridBuilder().addGrid(gbt_2.maxDepth, [2, 4, 6])
                               .addGrid(gbt_2.stepSize, [0.001, 0.01, 0.1, 1])
# commented out, training takes too much time                               .addGrid(gbt_2.maxBins,  [10, 20, 40])
# commented out, training takes too much time                               .addGrid(gbt_2.maxIter,  [5, 10, 20])
                               .build())

# use f1 score as the evaluation metric for best model 
evaluator = MulticlassClassificationEvaluator(labelCol='is_fraud', 
                                              predictionCol='prediction', 
                                              metricName='f1') 

In [49]:
# 5 folds CV
crossval = CrossValidator(estimator=pipeline, 
                          estimatorParamMaps=paramgrid, 
                          evaluator=evaluator, 
                          numFolds=5) 

gbt_tuned_model = crossval.fit(train_df_os) 

In [None]:
# return the best model based on f1 score
best_pipeline = gbt_tuned_model.bestModel

best_gbt_model = best_pipeline.stages[1]

In [None]:
# number of trees
len(best_gbt_model.trees)

In [None]:
# list some of the trees
best_gbt_model.trees[:10]

In [None]:
# weights
best_gbt_model.treeWeights[:10]

In [None]:
# extract feature importances
feature_importances = best_gbt_model.featureImportances.toArray()

# extract feature names, except for the predictor
feature_names = train_df_os.columns[:-1]

feature_series = (pd.Series(dict(zip(feature_names, feature_importances)))
                  .sort_values(ascending=True))

feature_series

In [None]:
sns.set_palette('dark')
sns.set_context('notebook')
sns.set_style('white')

ax = feature_series.plot(kind='barh')
_ = ax.set(xlabel='Relative Importance', 
           ylabel='Features', 
           title='Feature Importances for Best GradientBoostedTree Model')

fig = plt.gcf()

### Model testing

In [None]:
gbt_pred_test = best_pipeline.transform(test_df)

evaluate_model_predictions([gbt_pred_test], ['GBT_GridSearch'])

<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=1ffa185a-4bcd-432d-b0bf-f70e3dc06cbd' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>