# Final Project
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__  
Throughout this course you’ve engaged with key principles required to develop scalable machine learning analyses for structured and unstructured data. Working in Hadoop Streaming and Spark you’ve learned to translate common machine learning algorithms into Map-Reduce style implementations. You’ve developed the ability to evaluate Machine Learning approaches both in terms of their predictive performance as well as their scalability. For the final project you will demonstrate these skills by solving a machine learning challenge on a new dataset. Your job is to perform Click Through Rate prediction on a large dataset of Criteo advertising data made public as part of a Kaggle competition a few years back. As you perform your analysis, keep in mind that we are not grading you on the final performance of your model or how ‘advanced’ the techniques you use but rather on your ability to explain and develop a scalable machine learning approach to answering a real question.

More about the dataset:
https://www.kaggle.com/c/criteo-display-ad-challenge

# Notebook Set-Up
Before starting your homework run the following cells to confirm your setup.

In [1]:
import re
import ast
import time
import itertools
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import col,sum,when,isnan,count
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import time


In [2]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
# start Spark Session
app_name = "fproj_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

In [5]:
# create a data directory (RUN THIS CELL AS IS)
# !mkdir data

In [6]:
# grab the tar.gz file from kaggle

# !curl https://s3-eu-west-1.amazonaws.com/kaggle-display-advertising-challenge-dataset/dac.tar.gz -o data/dac.tar.gz

In [7]:
# I couldn't get this to unpack the tarball, so I just did it in Windows
# !tar -xvz data/dac.tar.gz

In [8]:
# take a look at the to the training data set
# !head data/train.txt

In [9]:
# ! ls ../Assignments/FinalProject/

# Change Data Source Path

In [10]:
projectRDD = sc.textFile('../Assignments/FinalProject/data/train.txt')
# projectRDD = projectRDD.sample(False,.001)
# sampleRDD = sc.textFile('../Assignments/FinalProject/data/train1-500.txt')
projectRDD.count()

45935

__`REMINDER:`__ If you are running this notebook on the course docker container, you can monitor the progress of your jobs using the Spark UI at: http://localhost:4040/jobs/

# Question 1: Question Formulation 
Introduce the goal of your analysis. What questions will you seek to answer, why do people perform this kind of analysis on this kind of data? Preview what level of performance your model would need to achieve to be practically useful.

# Question 2: Algorithm Explanation
Create your own toy example that matches the dataset provided and use this toy example to explain the math behind the algorithym that you will perform.

# Question 3: EDA & Discussion of Challenges
Determine 2-3 relevant EDA tasks that will help you make decisions about how you implement the algorithm to be scalable. Discuss any challenges that you anticipate based on the EDA you perform

In [11]:
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
# lines = sc.textFile("../Assignments/FinalProject/data/train.txt")

projectRDD = projectRDD.map(lambda l: l.split("\t"))\
                        .map(lambda p: Row(label=int(p[0]), I1=p[1], I2=p[2],\
                        I3=p[3], I4=p[4], I5=p[5], I6=p[6],\
                        I7=p[7], I8=p[8], I9=p[9], I10=p[10],\
                        I11=p[11], I12=p[12], I13=p[13], C1=p[14], C2=p[15], C3=p[16],\
                        C4=p[17], C5=p[18], C6=p[19], C7=p[20], C8=p[21], C9=p[22],\
                        C10=p[23], C11=p[24], C12=p[25], C13=p[26], C14=p[27], C15=p[28],\
                        C16=p[29], C17=p[30], C18=p[31], C19=p[32], C20=p[33], C21=p[34],\
                        C22=p[35], C23=p[36], C24=p[37], C25=p[38], C26=p[39]))

# Infer the schema, and register the DataFrame as a table.
projectDF = sqlContext.createDataFrame(projectRDD)
projectDF.registerTempTable("projectTable")


In [12]:
projectDF.printSchema()
projectDF.head()

root
 |-- C1: string (nullable = true)
 |-- C10: string (nullable = true)
 |-- C11: string (nullable = true)
 |-- C12: string (nullable = true)
 |-- C13: string (nullable = true)
 |-- C14: string (nullable = true)
 |-- C15: string (nullable = true)
 |-- C16: string (nullable = true)
 |-- C17: string (nullable = true)
 |-- C18: string (nullable = true)
 |-- C19: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C20: string (nullable = true)
 |-- C21: string (nullable = true)
 |-- C22: string (nullable = true)
 |-- C23: string (nullable = true)
 |-- C24: string (nullable = true)
 |-- C25: string (nullable = true)
 |-- C26: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: string (nullable = true)
 |-- C6: string (nullable = true)
 |-- C7: string (nullable = true)
 |-- C8: string (nullable = true)
 |-- C9: string (nullable = true)
 |-- I1: string (nullable = true)
 |-- I10: string (nullable = true)
 |-- I11: string (nullabl

Row(C1='05db9164', C10='c31e5ea3', C11='8b94178b', C12='7a473dbc', C13='025225f2', C14='07d13a8f', C15='520cb89e', C16='4adb3127', C17='e5ba7672', C18='fb299884', C19='', C2='6c2cbbdc', C20='', C21='1d1b3e07', C22='', C23='32c7478e', C24='145ae095', C25='', C26='', C3='9199713a', C4='8b3b6b2e', C5='25c83c98', C6='7e0ccccf', C7='4b3c7cfe', C8='0b153874', C9='7cc72ec2', I1='', I10='', I11='5', I12='', I13='4', I2='1', I3='6', I4='4', I5='23940', I6='70', I7='21', I8='5', I9='126', label=0)

In [13]:
projectDF = projectDF.withColumn("I1", projectDF["I1"].cast("int"))
projectDF = projectDF.withColumn("I2", projectDF["I2"].cast("int"))
projectDF = projectDF.withColumn("I3", projectDF["I3"].cast("int"))
projectDF = projectDF.withColumn("I4", projectDF["I4"].cast("int"))
projectDF = projectDF.withColumn("I5", projectDF["I5"].cast("int"))
projectDF = projectDF.withColumn("I6", projectDF["I6"].cast("int"))
projectDF = projectDF.withColumn("I7", projectDF["I7"].cast("int"))
projectDF = projectDF.withColumn("I8", projectDF["I8"].cast("int"))
projectDF = projectDF.withColumn("I9", projectDF["I9"].cast("int"))
projectDF = projectDF.withColumn("I10", projectDF["I10"].cast("int"))
projectDF = projectDF.withColumn("I11", projectDF["I11"].cast("int"))
projectDF = projectDF.withColumn("I12", projectDF["I12"].cast("int"))
projectDF = projectDF.withColumn("I13", projectDF["I13"].cast("int"))

In [17]:
def handle_missing(data):
    """
    Replaces missiong values ('',' ',NA, NaN) with specified value
    Args:
        data - Spark DF with missing values
        filler - Value to replace missing with defaults to 0
    Returns:
        data - Spark DF with missing vales filled in
    """
    
    for c in data.columns:
        data = data.replace(' ',str('EMPTY'), c)
        data = data.replace('',str('EMPTY'), c)
    data = data.na.fill(0)
    return data

def pipeline_stages(categorical_features,numeric_features,OHE=True):
    """Constructs a list of pipeline stages which will transform
       All categorical features into one-hot vectors
       
       Args:
           categorical_features - list of categorical feature column names
           numeric_features - list of numeric feature column names
           OHE - Bool - If true Apply One Hot Encoding to categroical features
               - Use false for tree based models
       Returns: 
           stages - list of pipeline stages which will convert categorical features to
               one-hot vectors
        """
    stages = []
    
    
    if OHE:
        for cat_col in categorical_features:
            #Convert Strings to numeric labels
            stringIndexer = StringIndexer(inputCol = cat_col, outputCol = cat_col + 'Index')
            #Convert to One Hot Encoded vectors
            encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[cat_col + "classVec"],handleInvalid='keep')
            stages += [stringIndexer, encoder]
        #Vector assembler to combine features into a single column
        assemblerInputs = [c + "classVec" for c in categorical_features] + numeric_features
    else:
        for cat_col in categorical_features:
            stringIndexer = StringIndexer(inputCol = cat_col, outputCol = cat_col + 'Index')

            stages += [stringIndexer]
        assemblerInputs = [c + "Index" for c in categorical_features] + numeric_features

    assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
    stages += [assembler]
    
    return stages
def apply_pipeline(data,stages):
    """
    Applies pipeline stages to data
    Args: 
        data - Spark DF of data to be modified
        stages - list of pipeline stages to be applied to data
    returns:
        preppedDataDF - Spark DF with variable transformations applied

    """
    partialPipeline = Pipeline().setStages(stages)
    pipelineModel = partialPipeline.fit(data)
    preppedDataDF = pipelineModel.transform(data)

    
    return preppedDataDF

In [18]:
numeric_features = ['I4','I13','I7','I11', 'I1']
categorical_features =  ['C6','C9','C14','C17','C20','C22','C23']
label_and_numeric = ["label", 'I4','I13','I7','I11', 'I1']

## Prep the Data

In [21]:

weights = [.8, .2]
seed = 4

stages = []
stages = pipeline_stages(categorical_features,numeric_features)

keep_cols = categorical_features + label_and_numeric
projectDF = projectDF.select([c for c in projectDF.columns if c in keep_cols])
#Handle empty strings by imputing 'EMPTY'
for c in projectDF.columns:
    projectDF = projectDF.replace(' ',str('EMPTY'), c)
    projectDF = projectDF.replace('',str('EMPTY'), c)
#Set NA's to 0
projectDF = projectDF.na.fill(0)

#OHE -> VectorAssembler
t_lr_prep = time.time()
preppedDF = apply_pipeline(projectDF,stages)
train,test = preppedDF.randomSplit(weights,seed)
print('Time to prep LR Data :', time.time() - t_lr_prep)

Time to prep LR Data : 83.95914077758789


In [22]:
print('Train Size - ',train.count())
print('Test Size - ',test.count())

print('Train Label Distribution -',train.groupBy('label').count().show())
print('Test Label Distribution -',test.groupBy('label').count().show())

Train Size -  36730
Test Size -  9205
+-----+-----+
|label|count|
+-----+-----+
|    0|27253|
|    1| 9477|
+-----+-----+

Train Label Distribution - None
+-----+-----+
|label|count|
+-----+-----+
|    0| 6904|
|    1| 2301|
+-----+-----+

Test Label Distribution - None


## Logistic Regression

In [23]:
#Logistic Regression
t_lr = time.time()
lrModel = LogisticRegression().fit(train)

print('Total training time - Logistic Regression : ',time.time()-t_lr)
predictions_lr = lrModel.transform(test)
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction",labelCol="label")


Total training ime : 23.617695569992065
AUC ROC 0.5456343166329517


In [24]:
#LR Predictions
predictions_lr.groupBy('prediction').count().show()

TP = predictions_lr.select("label", "prediction").filter("label = 1 and prediction = 1").count()

TN = predictions_lr.select("label", "prediction").filter("label = 0 and prediction = 0").count()
FP = predictions_lr.select("label", "prediction").filter("label = 0 and prediction = 1").count()
FN = predictions_lr.select("label", "prediction").filter("label = 1 and prediction = 0").count()
total = predictions_lr.select("label").count()
total = float(total)

print("Total :",total)
print("TN :",TN)
print("FP :",FP)
print("FN :",FN)
print("TP :",TP)

print("\nConfusion Matrix:")
print(TP,'\t',FP)
print(FN,'\t',TN)
print("\n")

accuracy	= (TP + TN) / total
precision   = TP / (TP + FP)
recall      = TP / (TP + FN)
F1		= 2/(1/precision + 1/recall)


print('AUC ROC -',evaluator.evaluate(predictions_lr))
print("Accuracy :",accuracy)
print("Precision :",precision)
print("Recall :",recall)
print("F1 :",F1)

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 8743|
|       1.0|  462|
+----------+-----+

Total : 9205.0
TN : 6715
FP : 189
FN : 2028
TP : 273

Confusion Matrix:
273 	 189
2028 	 6715


AUC ROC - 0.5456343166329517
Accuracy : 0.7591526344378056
Precision : 0.5909090909090909
Recall : 0.11864406779661017
F1 : 0.1976112920738328


In [25]:
#Prep the data for Trees (skip the one hot encoding step)
t_tree_prep = time.time()
stages = []
stages = pipeline_stages(categorical_features,numeric_features,OHE=False)
preppedDF = apply_pipeline(projectDF,stages)
train,test = preppedDF.randomSplit(weights,seed)
print('Total time to prep tree data :',time.time()-t_tree_prep)

Total time to prep tree data : 80.91630029678345


## Decision Tree

In [26]:
# Create initial Decision Tree Model
t_dt = time.time()
dtModel = DecisionTreeClassifier(
    labelCol="label", featuresCol="features", maxDepth=3).fit(train)
print('Total Training Time - Decision Tree : ',time.time()-t_dt)
#make predictions
predictions_dt = dtModel.transform(test)
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction",labelCol="label")


Total Training Time : 49.28633713722229


In [27]:
# DT Predictions
predictions_dt.groupBy('prediction').count().show()

TP = predictions_dt.select("label", "prediction").filter("label = 1 and prediction = 1").count()

TN = predictions_dt.select("label", "prediction").filter("label = 0 and prediction = 0").count()
FP = predictions_dt.select("label", "prediction").filter("label = 0 and prediction = 1").count()
FN = predictions_dt.select("label", "prediction").filter("label = 1 and prediction = 0").count()
total = predictions_dt.select("label").count()
total = float(total)

print("Total :",total)
print("TN :",TN)
print("FP :",FP)
print("FN :",FN)
print("TP :",TP)

print("\nConfusion Matrix:")
print(TP,'\t',FP)
print(FN,'\t',TN)
print("\n")

accuracy	= (TP + TN) / total
precision   = TP / (TP + FP)
recall      = TP / (TP + FN)
F1		= 2/(1/precision + 1/recall)


print('AUC ROC -',evaluator.evaluate(predictions_dt))
print("Accuracy :",accuracy)
print("Precision :",precision)
print("Recall :",recall)
print("F1 :",F1)

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 8577|
|       1.0|  628|
+----------+-----+

Total : 9205.0
TN : 6616
FP : 288
FN : 1961
TP : 340

Confusion Matrix:
340 	 288
1961 	 6616


AUC ROC - 0.553023447410391
Accuracy : 0.7556762629005975
Precision : 0.5414012738853503
Recall : 0.1477618426770969
F1 : 0.23216114714919767


## Random Forest

In [28]:
# Create an initial RandomForest model.
t_rf = time.time()
rfModel = RandomForestClassifier(
    labelCol="label", featuresCol="features").fit(train)

print('Total training time - Random Forrest : ',time.time()-t_rf)

#Make Predictions
predictions_rf = rfModel.transform(test)
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction",labelCol="label")


Total training time : 52.869446754455566


In [29]:
#RF Predictions
predictions_rf.groupBy('prediction').count().show()

TP = predictions_rf.select("label", "prediction").filter("label = 1 and prediction = 1").count()

TN = predictions_rf.select("label", "prediction").filter("label = 0 and prediction = 0").count()
FP = predictions_rf.select("label", "prediction").filter("label = 0 and prediction = 1").count()
FN = predictions_rf.select("label", "prediction").filter("label = 1 and prediction = 0").count()
total = predictions_rf.select("label").count()
total = float(total)

print("Total :",total)
print("TN :",TN)
print("FP :",FP)
print("FN :",FN)
print("TP :",TP)

print("\nConfusion Matrix:")
print(TP,'\t',FP)
print(FN,'\t',TN)
print("\n")

accuracy	= (TP + TN) / total
precision   = TP / (TP + FP)
recall      = TP / (TP + FN)
F1		= 2/(1/precision + 1/recall)

print('AUC ROC -',evaluator.evaluate(predictions_rf))
print("Accuracy :",accuracy)
print("Precision :",precision)
print("Recall :",recall)
print("F1 :",F1)

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 8865|
|       1.0|  340|
+----------+-----+

Total : 9205.0
TN : 6777
FP : 127
FN : 2088
TP : 213

Confusion Matrix:
213 	 127
2088 	 6777


AUC ROC - 0.5370866576222842
Accuracy : 0.759369907658881
Precision : 0.6264705882352941
Recall : 0.09256844850065189
F1 : 0.16130253691783417


# Question 4: Algorithm Implementation 
Develop a 'homegrown' implementation of the algorithn, apply it to the training dataset and evaluate your results on the test set. 

# Question 5: Application of Course Concepts
Pick 3-5 key course concepts and discuss how your work on this assignment illustrates an understanding of these concepts. 