## Overview

This notebook demonstrates the working of a Data Science Project using Apache Spark. The dataset was uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. The dataset has been taken from [Kaggle](https://www.kaggle.com/c/santander-customer-transaction-prediction/overview). 

The Project aims at predicting if a Santander customer will make a specific transaction in the future, irrespective of the amount of money transacted. The project will work through the data loading, exploratory analysis and feature selection, and finally building the model to correctly predict the behaviour.

## Loading the dataset

The dataset was uploaded to the DBFS. The following code reads the dataset from the filesystem. The parameters are provided for infering the schema, header and delimiter for the csv. We'll continue to work with the default datatypes provided by spark and avoid converting it into a pandas dataframe because the execution of toPandas() will bring the data to the driver node. And since it is a considerable size of dataset, we'll avoid doing this and work with the default datasets that come with Spark.

In [3]:
file_type = 'csv'
train_file_location = "/FileStore/tables/train.csv"
train = spark.read.format(file_type) \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .load(train_file_location)

In [4]:
test_file_location = "/FileStore/tables/test.csv"
test = spark.read.format(file_type) \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .load(test_file_location)

We'll now take a look at the dimensions of the training and test dataset.

In [6]:
print('training dataset shape - ', train.count(), 'x', len(train.columns), '\ntest dataset shape - ', test.count(), 'x', len(test.columns))

In [7]:
print('Columns for the training dataset are: ', train.columns)

In [8]:
print('Columns for the test dataset are - ', test.columns)

## Exploratory Analysis

We'll first start the exploratory analysis by visualizing the target variable and checking if the dataset is imbalanced or not. 

For doing this, we'll select the target function from the train dataframe after converting it into an RDD. We'll convert it into an RDD because we need to use the map function. In order to avoid heavy shuffiling across the cluster beacuse of the collect used, we'll sample the dataframe by the ratio of 10% using stratified sampling. This will lower our shuffling to a great extent and give us an idea about the dataset.

In [10]:
import seaborn as sns
target_values = train.select('target').sampleBy('target', {0:0.1, 1:0.1}).rdd.map(lambda x: x.target).collect()
display(sns.countplot(target_values))

The image shown above is for sampled dataset. We'll identify the actual ratio of the imabalance below.

In [12]:
train.select('target').groupby('target').count().toPandas()

Unnamed: 0,target,count
0,1,20098
1,0,179902


As we can see from the image, the dataset is highly imbalanced. Now we'll move on to do a check on the missing values in the dataset. We'll identify the number of missing values in the dataset. In order to do this we'll use the aggregated function. We'll build our own function null_check which takes the column name as parameter and returns the sum of null values back.

In [14]:
from pyspark.sql.functions import isnan, isnull, col
def null_check(c):
  vals = col(c).isNotNull() & isnan(c)
  return sum(vals.cast("integer")).alias(c)

train.agg(*[null_check(c) for c in train.columns]).first()

Careful examination shows that no column in the training dataset has null values. We'll do the same with testing dataset.

In [16]:
test.agg(*[null_check(c) for c in test.columns]).first()

Test dataset has no null values too. Now the next step going forward will be to examine the values in the dataset. Check if it has a skewness in it and any possible way to find out the collinearity in the columns.

In [18]:
sampled_train = train.stat.sampleBy('target', {0:0.1, 1:0.1})

In [19]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
plt.figure()
columns = np.array([c for c in sampled_train.columns if c.startswith('var')]).reshape(40, 5)
fig, axes = plt.subplots(40, 5, squeeze=True, figsize=(30, 100))
for i, a in enumerate(axes):
  for j, s in enumerate(a):
    s.title.set_text('Dist of: %s'%columns[i, j])
    sns.distplot(sampled_train.select(columns[i, j]).rdd.flatMap(lambda x: x).collect(), ax=s)
  
display(fig)

We can see from the plots that the distribution of the columns is mostly normal, but a few of them are skewed. We'll check the skewness of the dataset and identify if something has to be done about it.

In [21]:
from pyspark.sql.functions import skewness
skew = list()
for c in train.columns:
  skew.append(train.select(skewness(train[c])).rdd.flatMap(lambda x: x).collect())

In [22]:
max(np.array(x[2:])), min(np.array(x[2:])), sum(np.array(x[2:]) > 0.5)

The maximum skewness in the data is 0.26 which is relatively low. We'll check the kurtosis now

In [24]:
from pyspark.sql.functions import kurtosis
kurt = list()
for c in train.columns:
  kurt.append(train.select(kurtosis(train[c])).rdd.flatMap(lambda x: x).collect())

In [25]:
max(np.array(x[2:])), min(np.array(x[2:])), sum(np.array(x[2:]) > 0.5)

## Feature Selection

We'll now try to analyse if we can use PCA and analyse if we can use the feature reduction technique here. In order to do that, we'll have to first do the scaling of the dataset as we do not know what scale the columns are in. After that we'll see how much variablity is being explained by the components. We'll use the sampled dataframe for this. We'll create a new sampled dataframe such that the target variable is equally balanced.

In [27]:
sampled_train = train.stat.sampleBy('target', {0:0.11, 1:1})

In [28]:
sampled_train = sampled_train.drop('ID_code')

Since the columns in our dataset have float values, we cannot directly apply the minmaxscaler to the dataset. We'll have to create a vector of the column and apply the minmaxscaler after that. The output will be original columns + scaled columns.

In [30]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA

columns = [c for c in sampled_train.columns if c != 'target']

assembler = VectorAssembler(inputCols=columns, outputCol='features')
scaler = MinMaxScaler(inputCol='features', outputCol='scaledfeatures')

pipeline = Pipeline(stages=[assembler, scaler])
sampled_train = pipeline.fit(sampled_train).transform(sampled_train)

In [31]:
pca = PCA(k=100, inputCol='scaledfeatures', outputCol='pcafeatures')
model = pca.fit(sampled_train)

In [32]:
sum(model.explainedVariance.values)

So even after taking 100 features from the PCA, we are still able to explain only 57% of the variability. So we'll drop the idea of taking PCA and focus on other methods to derive feature importance which will indeed help us in building model.

In [34]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="target", featuresCol="scaledfeatures",impurity='gini')
rf_model = rf.fit(sampled_train)
rf_model.featureImportances

In [35]:
import pandas as pd

feature_imp_df = pd.DataFrame({'colname':columns, 'imp':rf_model.featureImportances}).sort_values('imp', ascending=False)

In [36]:
feature_imp_df.head(25).T

Unnamed: 0,81,139,110,26,12,0,166,6,2,80,146,44,78,190,22,165,21,174,13,76,198,133,191,99,169
colname,var_81,var_139,var_110,var_26,var_12,var_0,var_166,var_6,var_2,var_80,var_146,var_44,var_78,var_190,var_22,var_165,var_21,var_174,var_13,var_76,var_198,var_133,var_191,var_99,var_169
imp,0.11002,0.0749469,0.0571374,0.0563614,0.0518402,0.0490933,0.0448938,0.0434187,0.0401545,0.0384796,0.0319045,0.0295492,0.0254687,0.0231182,0.022986,0.0205869,0.0186514,0.0185214,0.0174395,0.0169846,0.0143446,0.0131662,0.0119092,0.0117959,0.0108433


In [37]:
feature_imp_df[feature_imp_df['imp'] == 0.0].shape

So there are 58 columns in the dataset which contribute nothing towards the prediction of the model.

Now we'll select the top 25 important features and will use those going forward in order to do hyper-parameter tuning and create a final model.

In [40]:
imp_features = feature_imp_df['colname'][:25]
ind = feature_imp_df.index.values[:25]

Here we'll use the VectorSlicer to get the values from the features which we have scaled.

In [42]:
from pyspark.ml.feature import VectorSlicer

slicer = VectorSlicer(inputCol='scaledfeatures', outputCol='impfeatures', indices=ind)
sampled_train = slicer.transform(sampled_train)

## Splitting the data for training and validation.

Now, we have the set of important features and their values in the column 'impfeatures'. We'll now proceed to building various models.

In [44]:
X_train = train.sampleBy('target', {0:0.8, 1:0.8})
X_test = train.subtract(X_train)

In [45]:
X_train.groupby('target').count().toPandas()

Unnamed: 0,target,count
0,1,16134
1,0,143925


In [46]:
X_test.groupby('target').count().toPandas()

Unnamed: 0,target,count
0,1,3991
1,0,35948


## Transforming the training and validation data 

We'll create a function which takes the data in and transforms it into the format that we want using the pipeline we generated and slicer to get the values from only the important features.

In [48]:
def transformer(train, pipeline, slicer):
  df = spark.createDataFrame(sc.emptyRDD(), schema=train.schema)
  for i in range(5):
    df = df.union(train.sampleBy('target', {0:0.11, 1:1}))
  df = pipeline.fit(df).transform(df)
  df = slicer.transform(df)
  return df

In [49]:
def transformer_test(df, pipeline, slicer):
  df = pipeline.fit(df).transform(df)
  df = slicer.transform(df)
  return df

We'll transform the training data first to the required format.

In [51]:
X_train_1 = transformer(X_train, pipeline, slicer)

We'll transform the test data to the required format.

In [53]:
X_test = transformer_test(X_test, pipeline, slicer)

## Building  a model

Now we have the data in the required format. We'll proceed to building a most suitable model which gives us more accuracy towards the validation dataset. We'll also do some hyper-parameter tuning to get the desired results.

#### Logistic Regression

In [56]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='target')
lg = LogisticRegression(featuresCol='scaledfeatures', labelCol='target')

lg_model = lg.fit(X_train_1)
training_result_lr = lg_model.summary
predictions_lr = lg_model.transform(X_test)
print("Area Under ROC for test dataset: " + str(evaluator.evaluate(predictions_lr)))

#### Random Forest Classifier

In [58]:
rf = RandomForestClassifier(labelCol="target", featuresCol="scaledfeatures", numTrees=50)
rf_model = rf.fit(X_train_1)
#training_result_rf = rf_model.summary
predictions_rf = rf_model.transform(X_test)
print("Area Under ROC for test dataset: " + str(evaluator.evaluate(predictions_rf)))

#### GBTClassifier

The classfier gave best performance at stepsize 0.45. The performance deteiorated when used other hyper-parameters.

In [60]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="target", featuresCol="scaledfeatures", stepSize=0.5)
gbt_model = gbt.fit(X_train_1)
predictions_gbt = gbt_model.transform(X_test)
print("Area Under ROC for test dataset: " + str(evaluator.evaluate(predictions_gbt)))

#### Multi Layer Perceptron Classifier

In [62]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

layers = [25, 20, 10, 2]

mp = MultiLayerPerceptronClassifier(maxIters=10, layers=layers, blocksize=128)
mp_model = mp.fit(X_train_1)
predictions_mp = mp_model.transform(X_test)
print("Area Under ROC for test dataset: " + str(evaluator.evaluate(predictions_mp)))

In [63]:
import matplotlib.pyplot as plt
plt.figure(figsize=(3,3))
roc = training_result.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
display(plt.show())

In [64]:
predictions_lr.select('target', 'prediction').groupby(['target', 'prediction']).count().toPandas()

Unnamed: 0,target,prediction,count
0,1,0.0,722
1,0,0.0,117433
2,1,1.0,3387
3,0,1.0,46808
