In [1]:
%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = (
    SparkSession.builder.appName("Python Spark SQL basic example")
    .config("spark.executor.memory", "4G")
    .getOrCreate()
)

<IPython.core.display.Javascript object>

In [2]:
spark

<IPython.core.display.Javascript object>

In [4]:
import os
from os.path import isfile, join

loc = os.path.abspath("")
data_loc = f"{loc}/data/"

<IPython.core.display.Javascript object>

Dataset: https://www.kaggle.com/ntnu-testimon/paysim1/data (Randomly sampled 10% of the dataset)

In [5]:
df = spark.read.csv("SyntheticFinancialData.csv", inferSchema=True, header=True)

<IPython.core.display.Javascript object>

In [6]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)



<IPython.core.display.Javascript object>

In [7]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
only showing top 2 rows



<IPython.core.display.Javascript object>

In [8]:
df = df.select("type", "amount", "oldbalanceOrg", "newbalanceOrig", "isFraud")

<IPython.core.display.Javascript object>

In [9]:
df.show(2)

+-------+-------+-------------+--------------+-------+
|   type| amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+-------+-------------+--------------+-------+
|PAYMENT|9839.64|     170136.0|     160296.36|      0|
|PAYMENT|1864.28|      21249.0|      19384.72|      0|
+-------+-------+-------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

### Train/test split

In [10]:
train, test = df.randomSplit([0.7, 0.3], seed=7)

<IPython.core.display.Javascript object>

In [11]:
print(f"Train set length: {train.count()} records")
print(f"Test set length: {test.count()} records")

Train set length: 4452929 records
Test set length: 1909691 records


<IPython.core.display.Javascript object>

In [12]:
train.show(2)

+-------+------+-------------+--------------+-------+
|   type|amount|oldbalanceOrg|newbalanceOrig|isFraud|
+-------+------+-------------+--------------+-------+
|CASH_IN|  1.42|   1270713.41|    1270714.83|      0|
|CASH_IN|  4.71|      50198.0|      50202.71|      0|
+-------+------+-------------+--------------+-------+
only showing top 2 rows



<IPython.core.display.Javascript object>

### Dtypes
In this dataset, any column of type string is treated as a categorical feature, but sometimes we might have numeric features we want treated as categorical or vice versa. We’ll need to carefully identify which columns are numeric and which are categorical.

In [12]:
train.dtypes

[('type', 'string'),
 ('amount', 'double'),
 ('oldbalanceOrg', 'double'),
 ('newbalanceOrig', 'double'),
 ('isFraud', 'int')]

<IPython.core.display.Javascript object>

In [13]:
catCols = [x for (x, dataType) in train.dtypes if dataType == "string"]
numCols = [
    x for (x, dataType) in train.dtypes if ((dataType == "double") & (x != "isFraud"))
]

<IPython.core.display.Javascript object>

In [14]:
print(numCols)
print(catCols)

['amount', 'oldbalanceOrg', 'newbalanceOrig']
['type']


<IPython.core.display.Javascript object>

### One hot encoding

StringIndexer:
Converts a single feature to an index feature.
http://spark.apache.org/docs/latest/ml-features#stringindexer


OneHotEncoder:
http://spark.apache.org/docs/latest/ml-features#onehotencoder

For more info: http://spark.apache.org/docs/latest/ml-features

In [15]:
train.agg(F.countDistinct("type")).show()

+-----------+
|count(type)|
+-----------+
|          5|
+-----------+



<IPython.core.display.Javascript object>

In [16]:
train.groupBy("type").count().show()

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 372786|
| CASH_IN| 978628|
|CASH_OUT|1566305|
| PAYMENT|1506235|
|   DEBIT|  28975|
+--------+-------+



<IPython.core.display.Javascript object>

In [16]:
from pyspark.ml.feature import (
    OneHotEncoder,
    StringIndexer,
)

<IPython.core.display.Javascript object>

In [17]:
string_indexer = [
    StringIndexer(inputCol=x, outputCol=x + "_StringIndexer", handleInvalid="skip")
    for x in catCols
]

<IPython.core.display.Javascript object>

In [18]:
string_indexer

[StringIndexer_e4ce59e19f9f]

<IPython.core.display.Javascript object>

In [19]:
one_hot_encoder = [
    OneHotEncoder(
        inputCols=[f"{x}_StringIndexer" for x in catCols],
        outputCols=[f"{x}_OneHotEncoder" for x in catCols],
    )
]

<IPython.core.display.Javascript object>

In [20]:
one_hot_encoder

[OneHotEncoder_555ca3dc0c05]

<IPython.core.display.Javascript object>

### Vector assembling

VectorAssembler:
Combines the values of input columns into a single vector.
http://spark.apache.org/docs/latest/ml-features#vectorassembler


In [21]:
from pyspark.ml.feature import VectorAssembler

<IPython.core.display.Javascript object>

In [22]:
assemblerInput = [x for x in numCols]
assemblerInput += [f"{x}_OneHotEncoder" for x in catCols]

<IPython.core.display.Javascript object>

In [23]:
assemblerInput

['amount', 'oldbalanceOrg', 'newbalanceOrig', 'type_OneHotEncoder']

<IPython.core.display.Javascript object>

In [24]:
vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

<IPython.core.display.Javascript object>

In [25]:
stages = []
stages += string_indexer
stages += one_hot_encoder
stages += [vector_assembler]


<IPython.core.display.Javascript object>

In [26]:
stages

[StringIndexer_e4ce59e19f9f,
 OneHotEncoder_555ca3dc0c05,
 VectorAssembler_4cf552bdad45]

<IPython.core.display.Javascript object>

In [27]:
%%time
from pyspark.ml import Pipeline

pipeline = Pipeline().setStages(stages)
model = pipeline.fit(train)

pp_df = model.transform(test)

CPU times: user 29.8 ms, sys: 28.2 ms, total: 58.1 ms
Wall time: 9.09 s


<IPython.core.display.Javascript object>

In [28]:
pp_df.select(
    "type",
    "amount",
    "oldbalanceOrg",
    "newbalanceOrig",
    "VectorAssembler_features",
).show(truncate=False)

+-------+------+-------------+--------------+--------------------------------------------------+
|type   |amount|oldbalanceOrg|newbalanceOrig|VectorAssembler_features                          |
+-------+------+-------------+--------------+--------------------------------------------------+
|CASH_IN|5.44  |0.0          |5.44          |(7,[0,2,5],[5.44,5.44,1.0])                       |
|CASH_IN|8.27  |8428410.94   |8428419.21    |[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]      |
|CASH_IN|9.02  |2416260.59   |2416269.61    |[9.02,2416260.59,2416269.61,0.0,0.0,1.0,0.0]      |
|CASH_IN|12.79 |601743.0     |601755.79     |[12.79,601743.0,601755.79,0.0,0.0,1.0,0.0]        |
|CASH_IN|13.2  |106204.0     |106217.2      |[13.2,106204.0,106217.2,0.0,0.0,1.0,0.0]          |
|CASH_IN|14.4  |1.143460813E7|1.143462253E7 |[14.4,1.143460813E7,1.143462253E7,0.0,0.0,1.0,0.0]|
|CASH_IN|15.52 |4368030.06   |4368045.59    |[15.52,4368030.06,4368045.59,0.0,0.0,1.0,0.0]     |
|CASH_IN|16.3  |2.140511936E7|

<IPython.core.display.Javascript object>

### Logistic Regression

In [29]:
from pyspark.ml.classification import LogisticRegression

<IPython.core.display.Javascript object>

In [31]:
data = pp_df.select(
    F.col("VectorAssembler_features").alias("features"),
    F.col("isFraud").alias("label"),
)

<IPython.core.display.Javascript object>

In [32]:
data.show(5, truncate=False)

+--------------------------------------------+-----+
|features                                    |label|
+--------------------------------------------+-----+
|(7,[0,2,5],[5.44,5.44,1.0])                 |0    |
|[8.27,8428410.94,8428419.21,0.0,0.0,1.0,0.0]|0    |
|[9.02,2416260.59,2416269.61,0.0,0.0,1.0,0.0]|0    |
|[12.79,601743.0,601755.79,0.0,0.0,1.0,0.0]  |0    |
|[13.2,106204.0,106217.2,0.0,0.0,1.0,0.0]    |0    |
+--------------------------------------------+-----+
only showing top 5 rows



<IPython.core.display.Javascript object>

In [33]:
%%time
model = LogisticRegression().fit(data)

CPU times: user 25.3 ms, sys: 12.2 ms, total: 37.5 ms
Wall time: 26.8 s


<IPython.core.display.Javascript object>

In [34]:
model.summary.areaUnderROC

0.9893146184131564

<IPython.core.display.Javascript object>

In [35]:
model.summary.pr.show()

+------------------+--------------------+
|            recall|           precision|
+------------------+--------------------+
|               0.0| 0.10797143006099787|
|0.8310593900481541| 0.10797143006099787|
|0.8796147672552167|0.057433317612534715|
|0.9177367576243981|  0.0400174978127734|
|0.9498394863563403|0.031090328766763427|
|0.9719101123595506| 0.02546337668345301|
|0.9931781701444623|0.021690358088093525|
|0.9931781701444623|0.018592945948991472|
|0.9931781701444623|0.016267162678199373|
|0.9935794542536116|0.014464982590610614|
|0.9935794542536116|0.013018081252595992|
|0.9935794542536116|0.011833923595677464|
|0.9939807383627608|0.010851850553764194|
|0.9939807383627608|0.010017187342028106|
|0.9939807383627608| 0.00930122526839123|
|0.9939807383627608|0.008681176181964742|
|0.9939807383627608|0.008394959635054803|
|0.9939807383627608|0.007886048666184866|
|0.9943820224719101|0.007438515178713...|
|0.9943820224719101|0.007036454399345763|
+------------------+--------------

<IPython.core.display.Javascript object>