In [1]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import sys
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoderEstimator, OneHotEncoderModel
from pyspark.ml.feature import OneHotEncoder, StringIndexer, StandardScaler, Imputer, VectorAssembler, SQLTransformer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
sc = SparkContext(appName="Final_Project")
sqlContext = SQLContext(sc)

In [5]:
# start Spark Session
app_name = "final_project_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()

In [6]:
spark

In [7]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [8]:
sc._conf.getAll()

[('spark.app.id', 'local-1576041156192'),
 ('spark.app.name', 'final_project_notebook'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '44759'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', 'docker.w261')]

# Toy Example

In [9]:
toyDF = spark.read.csv("toy_example.txt", header=True)
toyRDD = toyDF.rdd

In [10]:
toyDF.show()

+---+----+-----+-----+-----+--------+--------+--------+--------+
|  y|  x1|   x2|   x6|   x8|     x14|     x19|     x20|     x35|
+---+----+-----+-----+-----+--------+--------+--------+--------+
|1.0| 2.0|671.0|145.0| 12.0|05db9164|fbad5c96|6c5e14ec|    null|
|0.0| 0.0| -1.0|  9.0|  0.0|05db9164|6f6d9be8|2f5788d6|    null|
|0.0|null|  0.0|100.0|  0.0|8cf07265|7e0ccccf|2cc59e2b|ad3062eb|
|0.0|null| -1.0| null|  0.0|05db9164|fbad5c96|d356c7e6|    null|
|0.0|null|  1.0|203.0|  5.0|68fd1e64|fbad5c96|d5f62b87|    null|
|0.0| 1.0| -1.0|  0.0|  0.0|05db9164|    null|1b76cf1e|    null|
|1.0|null| 39.0| null|117.0|5bfa8ab5|7e0ccccf|af0809a5|    null|
|0.0|null|  0.0| 66.0|  7.0|05db9164|    null|da33ebe6|    null|
|1.0|10.0|  1.0| 66.0| 27.0|05db9164|fbad5c96|ce4f7f55|    null|
|0.0|null|  1.0| 16.0|  7.0|68fd1e64|7e0ccccf|5e64ce5f|    null|
+---+----+-----+-----+-----+--------+--------+--------+--------+



Based on our EDA, we decided to convert numeric features into categorical features.

In [11]:
categoricalToImpute = toyDF.columns[1:]

In [12]:
def imputeValues(df, categoricalToImpute):
    
    # Impute categorical features
    for col in categoricalToImpute:
        mostCommon = df.select(col).groupby(col).count()\
                            .orderBy('count', ascending=False) \
                            .limit(1).collect()[0][0]
        if mostCommon == "" or mostCommon is None:
            mostCommon = "EMPTY"
        
        print(f"Column {col} has most common {mostCommon}")
        
        df = df.withColumn(col, F.when((df[col].isNull() | (df[col] == '')), mostCommon) \
                                .otherwise(df[col]))

    return (df)

In [13]:
toyDF = imputeValues(toyDF,categoricalToImpute)

Column x1 has most common EMPTY
Column x2 has most common 1.0
Column x6 has most common EMPTY
Column x8 has most common 0.0
Column x14 has most common 05db9164
Column x19 has most common fbad5c96
Column x20 has most common d356c7e6
Column x35 has most common EMPTY


After imputation, we can see that the null values have converted a new string value 'EMPTY' for columns x1 and x35, as the null value was the most common value in those features. The null values originally present in columns x6 and x19 have been imputed to the most common value, 66.0 and fbad5c96, respectively.

In [14]:
toyDF.show()

+---+-----+-----+-----+-----+--------+--------+--------+--------+
|  y|   x1|   x2|   x6|   x8|     x14|     x19|     x20|     x35|
+---+-----+-----+-----+-----+--------+--------+--------+--------+
|1.0|  2.0|671.0|145.0| 12.0|05db9164|fbad5c96|6c5e14ec|   EMPTY|
|0.0|  0.0| -1.0|  9.0|  0.0|05db9164|6f6d9be8|2f5788d6|   EMPTY|
|0.0|EMPTY|  0.0|100.0|  0.0|8cf07265|7e0ccccf|2cc59e2b|ad3062eb|
|0.0|EMPTY| -1.0|EMPTY|  0.0|05db9164|fbad5c96|d356c7e6|   EMPTY|
|0.0|EMPTY|  1.0|203.0|  5.0|68fd1e64|fbad5c96|d5f62b87|   EMPTY|
|0.0|  1.0| -1.0|  0.0|  0.0|05db9164|fbad5c96|1b76cf1e|   EMPTY|
|1.0|EMPTY| 39.0|EMPTY|117.0|5bfa8ab5|7e0ccccf|af0809a5|   EMPTY|
|0.0|EMPTY|  0.0| 66.0|  7.0|05db9164|fbad5c96|da33ebe6|   EMPTY|
|1.0| 10.0|  1.0| 66.0| 27.0|05db9164|fbad5c96|ce4f7f55|   EMPTY|
|0.0|EMPTY|  1.0| 16.0|  7.0|68fd1e64|7e0ccccf|5e64ce5f|   EMPTY|
+---+-----+-----+-----+-----+--------+--------+--------+--------+



Once imputed, the categorical features will need to get one hot encoded. For example, column x1 has 5 unique values: 0.0, 1.0, 2.0, 10.0, and EMPTY. This one column will then get mapped to 4 columns, such the first column would be an indicator of whether the value is 0.0, the second column would be an indicator of whether the value is 1.0, and so on. The last value is omitted as it does not provide any additional value. Thus, a value of EMPTY would have 0 values for all 4 columns.

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

In [16]:
categorical_columns= categoricalToImpute

# The index of string values multiple columns
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

# The encode of indexed values multiple columns
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

# Vectorizing encoded values
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [17]:
pipeline = Pipeline(stages=indexers + encoders+[assembler])
model=pipeline.fit(toyDF)
transformed = model.transform(toyDF)

In [18]:
transformed.take(1)

[Row(y='1.0', x1='2.0', x2='671.0', x6='145.0', x8='12.0', x14='05db9164', x19='fbad5c96', x20='6c5e14ec', x35='EMPTY', x1_indexed=3.0, x2_indexed=4.0, x6_indexed=6.0, x8_indexed=2.0, x14_indexed=0.0, x19_indexed=0.0, x20_indexed=5.0, x35_indexed=0.0, x1_indexed_encoded=SparseVector(5, {3: 1.0}), x2_indexed_encoded=SparseVector(5, {4: 1.0}), x6_indexed_encoded=SparseVector(8, {6: 1.0}), x8_indexed_encoded=SparseVector(6, {2: 1.0}), x14_indexed_encoded=SparseVector(4, {0: 1.0}), x19_indexed_encoded=SparseVector(3, {0: 1.0}), x20_indexed_encoded=SparseVector(10, {5: 1.0}), x35_indexed_encoded=SparseVector(2, {0: 1.0}), features=SparseVector(43, {3: 1.0, 9: 1.0, 16: 1.0, 20: 1.0, 24: 1.0, 28: 1.0, 36: 1.0, 41: 1.0}))]

In [19]:
#find the number of unique values for each column
uniqueVals = []
for i, col in enumerate(transformed.columns[1:9]):
    uniqueVals.append(len(transformed.select(col).groupby(col).count().distinct().collect()))

In [20]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

In [21]:
#from the indexed_encoded columns, create one hot encoded columns 
for i, col in enumerate(transformed.columns[-9:-1]):
    for j in range(uniqueVals[i]-1):
        transformed = transformed.withColumn(f'oh_{i}_{j}', ith(col, lit(j)))

In [22]:
#remove features so that only one hot encoded features are left
for col in transformed.columns[1:26]:
    transformed = transformed.drop(col)

We then add a bias term to our data with a value of 1.

In [23]:
transformed = transformed.withColumn('x0', lit(1))

In [24]:
toyRDD = transformed.rdd

Now we have converted our toy example CSV into something we can perform gradient descent using map reduce paradigm.

In [25]:
toyRDD.take(1)

[Row(y='1.0', oh_0_0=0.0, oh_0_1=0.0, oh_0_2=0.0, oh_0_3=1.0, oh_1_0=0.0, oh_1_1=0.0, oh_1_2=0.0, oh_1_3=0.0, oh_2_0=0.0, oh_2_1=0.0, oh_2_2=0.0, oh_2_3=0.0, oh_2_4=0.0, oh_2_5=0.0, oh_2_6=1.0, oh_3_0=0.0, oh_3_1=0.0, oh_3_2=1.0, oh_3_3=0.0, oh_3_4=0.0, oh_4_0=1.0, oh_4_1=0.0, oh_4_2=0.0, oh_5_0=1.0, oh_5_1=0.0, oh_6_0=0.0, oh_6_1=0.0, oh_6_2=0.0, oh_6_3=0.0, oh_6_4=0.0, oh_6_5=1.0, oh_6_6=0.0, oh_6_7=0.0, oh_6_8=0.0, oh_7_0=1.0, x0=1)]

## Perform Gradient Descent with Logistic Regression

To perform gradient descent with logistic regression, we defined five helper functions:

1. The predict function will return a probability for each observation by taking the dot product of the features and the weights, as defined by the linear model, $ y = \theta_0 + \theta_1 x + ... + \theta_l x_l = \theta^T x$. 
2. Then the sigmoid function maps that output to a probability value between 0 and 1 for class 1: $ g(z) = \frac{1}{1+e^{-z}} $. 
3. The calcCost functions calculates the log loss $ L(\hat{y},y) = -(ylog\hat{y} + (1-y)log(1-\hat{y})) $. The first part of the equation $-ylog\hat{y}$ is the cost when $y=1$ and the second part of the equation $-(1-y)log(1-\hat{y})$ is the cost when $y=0$. In the case of training with class weights for imbalanced data, the log loss becomes $ L(\hat{y},y) = -(rylog\hat{y} + (1-r)(1-y)log(1-\hat{y})) $, where $r$ is the balancing ratio. The balancing ratio is the ratio of the number of observations in the training data in class 0 over the total number of observations. In our click through rate data set, we have an imbalance with class 0 as the majority class. Thus, the observations in class 1 will be penalized more if classified incorrectly.
4. The gradients function calculates the gradient to be subtracted from the weights at each iteration. The gradient is the derivative of the loss function and is simplified to $ L' = x(\hat{y}-y)$, where $\hat{y}$ is the prediction and $y$ is the actual class label. In the case of training with class weights, the derivative becomes $ L' = x[(1-r)(1-y)\hat{y} - (ry\hat{y})]$. 
5. Finally, the decision function takes a threshold, typically 0.5, and predicts class 1 if the probability is greater or equal than the threshold, or predicts class 0 if the probability is lower.

https://ml-cheatsheet.readthedocs.io/en/latest/logistic_regression.html
http://www.scalaformachinelearning.com/2016/03/weighting-logistic-loss-for-imbalanced_7.html
https://stats.stackexchange.com/questions/278771/how-is-the-cost-function-from-logistic-regression-derivated

In [26]:
import math
from functools import partial

In [27]:
def sigmoid(z):
    return 1.0 / (1 + np.exp(-z))

In [28]:
#makes a prediction by taking the dot product of the values and the weights and passes it through the sigmoid function
def predict(weights, line):
    y = np.array(line[0]).astype(dtype='float')
    x = np.array(line[1:]).astype(dtype='float')
    z = np.dot(x, weights)
    
    return sigmoid(z)

In [29]:
def gradients(weights, lr, balancingRatio, line):
    y = np.array(line[0]).astype(dtype='float')
    x = np.array(line[1:]).astype(dtype='float')
    z = np.dot(x, weights)
    
    predictions = sigmoid(z)
    if balancingRatio:
        gradient = np.dot(x.T, (1-balancingRatio)*(1-y)*predictions - (balancingRatio*y*(1-predictions)))
    else:
        gradient = np.dot(x.T, predictions - y)
    gradient = gradient/len(theta)
    gradient = gradient*lr
    
    return gradient

In [30]:
def calcCost(weights, eps, balancingRatio, line):
    y = np.array(line[0]).astype(dtype='float')
    x = np.array(line[1:]).astype(dtype='float')
    z = np.dot(x, weights)
    pred = sigmoid(z)
    
    #Take the error when label=1
    class1_cost = -y*np.log(pred+eps) # Add epsilon to prevent nan

    #Take the error when label=0
    class0_cost = (1-y)*np.log(1-pred+eps) # Add epsilon to prevent nan
    
    #If training with class weights, multiply costs by the balancing ratio (majority class 0/total observations)
    if balancingRatio:
        class1_cost *= balancingRatio
        class0_cost *= (1-balancingRatio)
    
    cost = class1_cost - class0_cost

    return cost

In [31]:
def train(toyRDD, theta, ITERATIONS, LR, EPSILON, verbose=False, reg_param=0, class_weights=False):
    cost_history = []
    
    if class_weights == True:
        total_count = toyRDD.count()
        class0_count = toyRDD.map(lambda x: x[0]).filter(lambda x: x=="0.0").count()
        balancingRatio = class0_count/total_count
    else:
        balancingRatio = None

    for i in range(ITERATIONS+1):
        
        preds = toyRDD.map(partial(predict, theta)).collect()
        preds = ["%.2f" % v for v in preds] # Format to 2 decimals
        
        grads = toyRDD.map(partial(gradients, theta, LR, balancingRatio)).mean() + reg_param*2*np.append(theta[:-1],[0.0]) # Add in Regularization
        
        theta = theta - grads
        
        #Calculate Error for Tracking
        cost = toyRDD.map(partial(calcCost, theta, EPSILON, balancingRatio)).mean()
        cost_history.append(cost)

        # Log Progress
        if verbose:
            if i % 50 == 0:
                print(f"==========================================================")
                print(f"Iter: {i}, Cost: {cost:.3}")
                print(f"==========================================================")
                print(f"Preds: {preds}")
                print(f"==========================================================")
                print(f"Weights: {theta}")

    return theta, cost_history, preds

In [32]:
def decision(preds, threshold):
    outcome = ['1.0' if float(x) >= threshold else '0.0' for x in preds]
    return outcome

# Gradient Descent on Toy Example with no Regularization

In [33]:
# Here We Created Our Weight Matrix Initialized to 0's
theta = np.zeros(len(transformed.columns)-1).astype(dtype='float')
LR = 0.5
ITERATIONS = 1000
EPSILON = 1e-6

In [34]:
theta, cost_history, preds = train(toyRDD, theta, ITERATIONS, LR, EPSILON, verbose=True, reg_param=0, class_weights=False)

Iter: 0, Cost: 0.689
Preds: ['0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50']
Weights: [-0.00277778 -0.00069444  0.00069444  0.00069444 -0.00208333 -0.00069444
 -0.00138889  0.00069444  0.          0.         -0.00069444 -0.00069444
 -0.00069444 -0.00069444  0.00069444 -0.00277778 -0.00138889  0.00069444
  0.00069444  0.00069444 -0.00138889 -0.00138889 -0.00069444 -0.00138889
 -0.00069444 -0.00069444 -0.00069444 -0.00069444  0.00069444  0.00069444
  0.00069444 -0.00069444 -0.00069444 -0.00069444 -0.00208333 -0.00277778]
Iter: 50, Cost: 0.559
Preds: ['0.47', '0.38', '0.38', '0.35', '0.38', '0.37', '0.45', '0.38', '0.46', '0.37']
Weights: [-0.11495621 -0.03079023  0.03730521  0.03680068 -0.09075631 -0.02410253
 -0.06133872  0.03741829  0.00663218  0.00779024 -0.0306657  -0.03085775
 -0.03033804 -0.03054998  0.03680068 -0.12142201 -0.06122301  0.03680068
  0.03730521  0.03741829 -0.04732346 -0.06140773 -0.0306657  -0.04739098
 -0.02379739 -0.03079023 -0.0296

In [35]:
outcome = decision(preds, 0.5)
outcome

['1.0', '0.0', '0.0', '0.0', '0.0', '0.0', '1.0', '0.0', '1.0', '0.0']

In [36]:
truth = toyDF.select('y').rdd.map(lambda x: x[0]).collect()
truth

['1.0', '0.0', '0.0', '0.0', '0.0', '0.0', '1.0', '0.0', '1.0', '0.0']

In [37]:
outcome == truth

True

After 1000 iterations, we take the probability and compare it to the threshold of 0.5. If it is above 0.5, then we predict class 1, otherwise, we predict class 0. In this toy example, we find that our predictions matched the truth.

# Gradient Descent on Toy Example with L2 Regularization

In [38]:
# Here We Created Our Weight Matrix Initialized to 0's
theta = np.zeros(len(transformed.columns)-1).astype(dtype='float')
LR = 0.5
ITERATIONS = 1000
EPSILON = 1e-6

In [39]:
theta_2, cost_history_2, prob_2 = train(toyRDD, theta, ITERATIONS, LR, EPSILON, verbose=True, reg_param=0.0001, class_weights=False)

Iter: 0, Cost: 0.689
Preds: ['0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50']
Weights: [-0.00277778 -0.00069444  0.00069444  0.00069444 -0.00208333 -0.00069444
 -0.00138889  0.00069444  0.          0.         -0.00069444 -0.00069444
 -0.00069444 -0.00069444  0.00069444 -0.00277778 -0.00138889  0.00069444
  0.00069444  0.00069444 -0.00138889 -0.00138889 -0.00069444 -0.00138889
 -0.00069444 -0.00069444 -0.00069444 -0.00069444  0.00069444  0.00069444
  0.00069444 -0.00069444 -0.00069444 -0.00069444 -0.00208333 -0.00277778]
Iter: 50, Cost: 0.559
Preds: ['0.47', '0.38', '0.38', '0.35', '0.38', '0.37', '0.45', '0.38', '0.46', '0.38']
Weights: [-0.11440197 -0.03064016  0.03711983  0.03661857 -0.0903163  -0.02398889
 -0.06104014  0.03723218  0.0065961   0.00774689 -0.03051642 -0.03070727
 -0.03019084 -0.03040145  0.03661857 -0.12083271 -0.06092517  0.03661857
  0.03711983  0.03723218 -0.04710162 -0.06110872 -0.03051642 -0.04716873
 -0.02368568 -0.03064016 -0.0294

In [40]:
outcome = decision(preds, 0.5)
outcome

['1.0', '0.0', '0.0', '0.0', '0.0', '0.0', '1.0', '0.0', '1.0', '0.0']

In [41]:
outcome == truth

True

After 1000 iterations, we take the probability and compare it to the threshold of 0.5. If it is above 0.5, then we predict class 1, otherwise, we predict class 0. In this toy example, we find that our predictions matched the truth.

# Gradient Descent With Class Weighting of the Logistic Loss

In [42]:
# Here We Created Our Weight Matrix Initialized to 0's
theta = np.zeros(len(transformed.columns)-1).astype(dtype='float')
LR = 0.5
ITERATIONS = 1000
EPSILON = 1e-6

In [43]:
theta, cost_history, preds = train(toyRDD, theta, ITERATIONS, LR, EPSILON, verbose=True, class_weights=True)

Iter: 0, Cost: 0.291
Preds: ['0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50', '0.50']
Weights: [-5.55555556e-04 -2.08333333e-04  4.86111111e-04  4.86111111e-04
 -6.25000000e-04  6.94444444e-05 -4.16666667e-04  4.86111111e-04
  2.77777778e-04  2.77777778e-04 -2.08333333e-04 -2.08333333e-04
 -2.08333333e-04 -2.08333333e-04  4.86111111e-04 -8.33333333e-04
 -4.16666667e-04  4.86111111e-04  4.86111111e-04  4.86111111e-04
  1.38888889e-04 -4.16666667e-04 -2.08333333e-04  1.38888889e-04
  6.94444444e-05 -2.08333333e-04 -2.08333333e-04 -2.08333333e-04
  4.86111111e-04  4.86111111e-04  4.86111111e-04 -2.08333333e-04
 -2.08333333e-04 -2.08333333e-04  2.08333333e-04 -2.43945489e-19]
Iter: 50, Cost: 0.273
Preds: ['0.53', '0.48', '0.47', '0.48', '0.49', '0.48', '0.52', '0.49', '0.53', '0.48']
Weights: [-0.0278569  -0.01042161  0.02410596  0.02406257 -0.03128963  0.00318188
 -0.02085309  0.02434972  0.01356819  0.01392028 -0.01031532 -0.01049321
 -0.01043858 -0.01043088  0.024

In [44]:
outcome = decision(preds, 0.5)
outcome

['1.0', '0.0', '0.0', '0.0', '0.0', '0.0', '1.0', '0.0', '1.0', '0.0']

In [45]:
outcome == truth

True

Again, we found that our predictions match the truth. The cost at the 1000 iteration was smaller than when training without class weights, but the probabilities are further away from 0 and 1.