In [2]:
# A simple extension for Jupyter Notebook and Jupyter Lab to beautify Python code automatically using Black.
%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black


<IPython.core.display.Javascript object>

In [3]:
# Show Spark version we are using
spark

<IPython.core.display.Javascript object>

In [4]:
import os
from os.path import isfile, join

loc = os.path.abspath("")
data_loc = f"{loc}/data/"

<IPython.core.display.Javascript object>

In [5]:
# Load data to Spark DataFrame
df = spark.read.csv(f"{data_loc}data.csv", inferSchema=True, header=True)

# Print schema
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)



<IPython.core.display.Javascript object>

In [6]:
# Show some data records from Spark DataFrame
df.show(3)

+---+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+
|_c0|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|
+---+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+
|  0|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|
|  1|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|
|  2|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|
+---+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+
only showing top 3 rows



<IPython.core.display.Javascript object>

In [7]:
# Selecting features
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

In [8]:
# Split to train and test sets
train, test = df.randomSplit([0.75, 0.25], seed=77)

<IPython.core.display.Javascript object>

In [9]:
print(f"Train set length: {train.count()} records.")
print(f"Test set length: {test.count()} records.")

Train set length: 4772359 records.
Test set length: 1590261 records.


<IPython.core.display.Javascript object>

In [10]:
# Show something from training set
train.show(3)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  5.66|   5061561.06|    5061566.72|      0|
|CASH_IN|  14.4|1.143460813E7| 1.143462253E7|      0|
|CASH_IN| 14.54|    3347286.5|    3347301.03|      0|
+-------+------+-------------+--------------+-------+
only showing top 3 rows



<IPython.core.display.Javascript object>

In [11]:
# Datatypes (Dtypes)
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

<IPython.core.display.Javascript object>

In [12]:
# Select categorical and numerical columns
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

<IPython.core.display.Javascript object>

In [13]:
print(f"Numerical features in the dataset:\n{numCols}\n")
print(f"Categorical features in the dataset:\n{catCols}\n")

Numerical features in the dataset:
['amount', 'oldbalanceOrg', 'newbalanceOrig']

Categorical features in the dataset:
['type']



<IPython.core.display.Javascript object>

In [14]:
# One-Hot-Encoding
# -- StringIndexer: converts a single feature to an index feature
# -- OneHotEncoder

# How many categories we have in the dataset?
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



<IPython.core.display.Javascript object>

In [15]:
# What is the distribution of feature <type>?
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 399406|
| CASH_IN|1049549|
|CASH_OUT|1677930|
| PAYMENT|1614346|
|   DEBIT|  31128|
+--------+-------+



<IPython.core.display.Javascript object>

In [16]:
# Import required dependencies for OneHotEncoder and StringIndexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer

<IPython.core.display.Javascript object>

In [18]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

<IPython.core.display.Javascript object>

In [19]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

<IPython.core.display.Javascript object>

In [22]:
# Vector assembling with CodeAssembler
from pyspark.ml.feature import VectorAssembler

<IPython.core.display.Javascript object>

In [25]:
# Set the inputs for the assembler
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

print(f"Assembler Inputs are:\n{assemblerInput}")

Assembler Inputs are:
['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']


<IPython.core.display.Javascript object>

In [26]:
# We can easily assemble that vector by providing the input column and the output column.
# Then we have vector assembler features column that will consist all of those inputted features into a vector type,
# as it is required for any classification or regression problem in Spark.
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

<IPython.core.display.Javascript object>

In [27]:
# We can use Vector assembler to create a pipeline (another high level concept in Spark that let's you build a full
# pre-processing pipeline and model training as well)
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]

print(stages)

[StringIndexer_c668f2bd7e88, OneHotEncoder_e6daaab1143a, VectorAssembler_60b902e55599]


<IPython.core.display.Javascript object>

In [28]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)

# Setup a model
model = pipeline.fit(train)   # Fit the pipeline on train set

pp_df = model.transform(test) # Test set is unknown for the model

CPU times: user 56.8 ms, sys: 24.7 ms, total: 81.5 ms
Wall time: 11.5 s


<IPython.core.display.Javascript object>

In [30]:
pp_df.select(
    "type", "amount", "oldbalanceOrg", "newbalanceOrig", "VectorAssembler_features"
).show(truncate=False)

+-------+------+-------------+--------------+----------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                            |
+-------+------+-------------+--------------+----------------------------------------------------+
|CASH_IN|13.86 |6868100.18   |6868114.04    |[13.86,6868100.18,6868114.04,0.0,0.0,1.0,0.0]       |
|CASH_IN|15.59 |1.64294897E7 |1.642950528E7 |[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]  |
|CASH_IN|35.47 |3796691.21   |3796726.68    |[35.47,3796691.21,3796726.68,0.0,0.0,1.0,0.0]       |
|CASH_IN|37.11 |1452790.24   |1452827.35    |[37.11,1452790.24,1452827.35,0.0,0.0,1.0,0.0]       |
|CASH_IN|57.98 |1290788.6    |1290846.57    |[57.98,1290788.6,1290846.57,0.0,0.0,1.0,0.0]        |
|CASH_IN|57.98 |9021204.76   |9021262.74    |[57.98,9021204.76,9021262.74,0.0,0.0,1.0,0.0]       |
|CASH_IN|71.85 |1.688345136E7|1.688352321E7 |[71.85,1.688345136E7,1.688352321E7,0.0,0.0,1.0,0.0] |
|CASH_IN|7

<IPython.core.display.Javascript object>

In [31]:
# Logistic Regression (training procedure)
from pyspark.ml.classification import LogisticRegression

<IPython.core.display.Javascript object>

In [32]:
# Construct a special dataframe with features and label upon Spark ML requirements
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"), F.col("isFraud").alias("label")
)

data.show(5, truncate=False)

+--------------------------------------------------+-----+
|features                                          |label|
+--------------------------------------------------+-----+
|[13.86,6868100.18,6868114.04,0.0,0.0,1.0,0.0]     |0    |
|[15.59,1.64294897E7,1.642950528E7,0.0,0.0,1.0,0.0]|0    |
|[35.47,3796691.21,3796726.68,0.0,0.0,1.0,0.0]     |0    |
|[37.11,1452790.24,1452827.35,0.0,0.0,1.0,0.0]     |0    |
|[57.98,1290788.6,1290846.57,0.0,0.0,1.0,0.0]      |0    |
+--------------------------------------------------+-----+
only showing top 5 rows



<IPython.core.display.Javascript object>

In [33]:
%%time
# Train the model on the train data
model = LogisticRegression().fit(data)

CPU times: user 27.6 ms, sys: 12.9 ms, total: 40.5 ms
Wall time: 27.9 s


<IPython.core.display.Javascript object>

In [34]:
print(f"Model:\n{model}")

Model:
LogisticRegressionModel: uid=LogisticRegression_4c57f4c4b7b7, numClasses=2, numFeatures=7


<IPython.core.display.Javascript object>

In [35]:
# Model results
model.summary.areaUnderROC

0.9916611200079948

<IPython.core.display.Javascript object>

In [36]:
# Check the Precision and Recall
model.summary.pr.show()

+-------------------+-------------------+
|             recall|          precision|
+-------------------+-------------------+
|                0.0| 0.9171741778319124|
|0.36660175267770206| 0.9171741778319124|
|0.47857838364167476| 0.6570855614973262|
| 0.5345666991236612| 0.5057577153385536|
| 0.5754625121713729|0.41531974701335206|
| 0.6119766309639727| 0.3570008520306731|
| 0.6416747809152873|0.31410867492850336|
| 0.6669912366114897|  0.281256415520427|
|  0.692794547224927| 0.2565813198701767|
| 0.7161635832521909|0.23645716122809837|
| 0.7351509250243428|0.21896751740139211|
|  0.752190847127556|0.20406815480121515|
| 0.7692307692307693| 0.1916080523890371|
| 0.7828627069133398|0.18024885102566976|
| 0.7964946445959105| 0.1704877032096707|
| 0.7989289191820838| 0.1597702268523026|
|  0.801363193768257|0.15037456605152566|
| 0.8062317429406037|0.14250064538335772|
|  0.810126582278481|0.13532856213402733|
| 0.8125608568646543| 0.1286518153087181|
+-------------------+-------------



<IPython.core.display.Javascript object>