In this notebook we will be implementing machine learning models to predict fraudulent transactions. Due to the size of the data we are working with, we will use Dask, a library for parallel computing that interacts well with pandas and numpy. This should solve some core problems that arise when working with large datasets:

- If the data that you want to analyze is larger than the capacity of your RAM, you simply can't load that data into a pandas DataFrame or NumPy array.

- Even if you are able to load a large dataset into a DataFrame, some operations will take a lot of time.

- When training a machine-learning model on a large dataset, you will be confronted with seemingly never-ending training. One solution to this problem is to run your code across several cores in parallel, and scikit-learn offers single-machine parallelization. However, scikit-learn alone can't scale this parallelization to more than one computer.


That last line is important. Dask is built to run across hundreds or thousands of machines known as a cluster. Although for this project I am only using one machine, with a few lines of code I could easily scale it up.



In [1]:
from dask.distributed import Client, progress
import dask.dataframe as dd

from sklearn.model_selection import cross_validate, GridSearchCV
from sklearn.metrics import roc_auc_score
import joblib
from dask_ml.model_selection import train_test_split
import pandas as pd
import warnings

warnings.filterwarnings("ignore")

client = Client(n_workers=8, threads_per_worker=1, memory_limit='2GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:54832  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 16.00 GB


In [2]:
# load credit fraud data into a Dask DataFrame
df = dd.read_csv('https://tf-assets-prod.s3.amazonaws.com/tf-curric/data-science/creditcard.csv', dtype={'Time': 'float64'})

In [3]:
df.head()

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0.0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0.0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1.0,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0
3,1.0,-0.966272,-0.185226,1.792993,-0.863291,-0.010309,1.247203,0.237609,0.377436,-1.387024,...,-0.1083,0.005274,-0.190321,-1.175575,0.647376,-0.221929,0.062723,0.061458,123.5,0
4,2.0,-1.158233,0.877737,1.548718,0.403034,-0.407193,0.095921,0.592941,-0.270533,0.817739,...,-0.009431,0.798278,-0.137458,0.141267,-0.20601,0.502292,0.219422,0.215153,69.99,0


You will notice that the majority of these columns don't make much sense to us. Because this is confidential financial data, the authors have chosen to include only the principal components instead of the real variables. 

In [4]:
df.groupby("Class")["Time"].count().compute()

Class
0    284315
1       492
Name: Time, dtype: int64

Out of a total of 284807 transactions, only 492 were fraudulent. This class imbalance outlines one of the major difficulties when trying to model and predict fraudulent behavior.

In [5]:
df[df.Class == 1]["Amount"].max().compute()

2125.87

The largest case of fraud amounted to ~$2000 usd

## Modeling

We will train using all 28 principal components and perform a gridsearch to tune our best model. Normally this would take a ridiculous amount of time but I am confident that Dask will help us. If it doesn't, you likely won't be reading this.

In [4]:
# We will train a random forest, logistic regression, and xgboost model

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression

In [5]:
# This is the feature set
X = df.drop(["Time", "Amount", "Class"], axis=1)

# This is the target variable
Y = df["Class"]

X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)

# Because your data can fit into memory,
# persist it to the RAM
X_train.persist()
X_test.persist()
y_train.persist()
y_test.persist()

Dask Series Structure:
npartitions=3
    int64
      ...
      ...
      ...
Name: Class, dtype: int64
Dask Name: split, 3 tasks

When using Dask as the backend of the joblib library, we need to put our machine-learning code inside the context manager of joblib. That is, we need to put the code that we want to parallelize inside the following with statement:

```python
with joblib.parallel_backend('dask'):
    Scikit-learn code here
```

In [13]:
%%time
lr = LogisticRegression()

with joblib.parallel_backend('dask'):
    lr.fit(X_train.compute(), y_train.compute())
    
preds_train = lr.predict(X_train.values.compute())
preds_test = lr.predict(X_test.values.compute())

print("Logistic regression training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Logistic regression test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Logistic regression training score is:  0.9380739377219164
Logistic regression test score is:  0.9524217594356836
Wall time: 5min 52s


In [15]:
%%time
gbc = GradientBoostingClassifier()

with joblib.parallel_backend('dask'):
    gbc.fit(X_train.compute(), y_train.compute())
    
preds_train = gbc.predict(X_train.values.compute())
preds_test = gbc.predict(X_test.values.compute())

print("Gradient boosting tree training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Gradient boosting tree test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Gradient boosting tree training score is:  0.9573567735254457
Gradient boosting tree test score is:  0.9397102721685688
Wall time: 11min 2s


In [16]:
%%time
rfc = RandomForestClassifier()

with joblib.parallel_backend('dask'):
    rfc.fit(X_train.compute(), y_train.compute())
    
preds_train = rfc.predict(X_train.values.compute())
preds_test = rfc.predict(X_test.values.compute())

print("Random forest training score is: ", roc_auc_score(preds_train, y_train.values.compute()))
print("Random forest test score is: ", roc_auc_score(preds_test, y_test.values.compute()))

Random forest training score is:  1.0
Random forest test score is:  0.9751154697359156
Wall time: 6min 58s


Our random forest model performed the best, and took a very reasonable amount of time to train. I will have to do a more in depth analysis, but this did feel faster than scikit alone would've been. Even though we are dealing with a 'large' dataset, it really is nothing compared to the terabytes that Dask is built to handle. So I am curious as to whether Dask is actually faster at this level. But that is a question for another time. Let's tune our random forest model then call it a day.

In [6]:
%%time
rf_params = {"max_depth": [4, 8, 16]}

rf_model = RandomForestClassifier()

grid_search_rf = GridSearchCV(rf_model,
                           param_grid=rf_params,
                           return_train_score=True,
                           iid=True,
                           cv=4,
                           n_jobs=-1, 
                           scoring='roc_auc')

with joblib.parallel_backend('dask'):
    grid_search_rf.fit(X_train.compute(), y_train.compute())

print("The best value is: ", grid_search_rf.best_params_)
print("The test AUC score is: ", grid_search_rf.score(X_test.compute(), y_test.compute()))

The best value is:  {'max_depth': 8}
The test AUC score is:  0.9767044689975546
Wall time: 8min 11s
