# W261 Final Project: Click Through Rate Prediction
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__

__`Rutika Banakar, Stanley Opara, Subha Vadakkumkkor`__


# Section 1: Introduction

Online advertising is a major business for internet companies and one of the core problem in that field is to be able to match the right advertisement to the right user at the right time. For ads on most websites and search engines, advertisers only pay for measurable user responses such are clicks on the ads and other actions that users take on the website such as registering, purchasing etc. Thus an optimal approach (as a search engine) is to choose an ad based on expected value, which is price of a click times the likelihood that the ad will be clicked. In order for websites to maximize expected value, they need to accurately predict the likelihood that a given ad will be clicked, also known as click-through rate (CTR).

# Notebook Set-Up


In [1]:
# imports
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
# start Spark Session
from pyspark.sql import SparkSession
app_name = "hw5_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

# Section 2: Problem definition and Dataset

Given a user and the page she is visiting, what is the probability that she will click on a given ad?

## About the data

__Source:__

The data we use in this project was made available by Criteo Labs as part of a Kaggle competition. They shared a week’s worth of data to develop models predicting ad click-through rate (CTR). The goal of the challenge was to benchmark the most accurate ML algorithms for CTR estimation.

__File descriptions:__

* __train.csv__ - The training set consists of a portion of Criteo's traffic over a period of 7 days. Each row corresponds to a display ad served by Criteo. Positive (clicked) and negatives (non-clicked) examples have both been subsampled at different rates in order to reduce the dataset size. The examples are chronologically ordered.
* __test.csv__ - The test set is computed in the same way as the training set but for events on the day following the training period.
* __random_submission.csv__ - A sample submission file in the correct format.

__Data fields:__

* __Label__ - Target variable that indicates if an ad was clicked (1) or not (0).
* __I1-I13__ - A total of 13 columns of integer features (mostly count features).
* __C1-C26__ - A total of 26 columns of categorical features. The values of these features have been hashed onto 32 bits for anonymization purposes. The semantic of the features is undisclosed.

In [56]:
# !wget https://www.kaggle.com/c/3934/download-all

In [5]:
TOY_EXAMPLE = PWD + '/data/toy_example.txt'
TEST_DATA = PWD + '/data/test.txt'
TRAIN_DATA = PWD + '/data/train.txt'

As we develop our code we will be using a toy example to test out our algorithm before running it on the larger dataset. We create this toy example from the large training data below.

In [6]:
!head -n 50 {TRAIN_DATA} > {TOY_EXAMPLE}

In [7]:
# Look at the first 5 lines of toy example
!head -n 5 {TOY_EXAMPLE}

0	1	1	5	0	1382	4	15	2	181	1	2		2	68fd1e64	80e26c9b	fb936136	7b4723c4	25c83c98	7e0ccccf	de7995b8	1f89b562	a73ee510	a8cd5504	b2cb9c98	37c9c164	2824a5f6	1adce6ef	8ba8b39a	891b62e7	e5ba7672	f54016b9	21ddcdc9	b1252a9d	07b5194c		3a171ecb	c5c50484	e8b83407	9727dd16
0	2	0	44	1	102	8	2	2	4	1	1		4	68fd1e64	f0cf0024	6f67f7e5	41274cd7	25c83c98	fe6b92e5	922afcc0	0b153874	a73ee510	2b53e5fb	4f1b46f3	623049e6	d7020589	b28479f6	e6c5b5cd	c92f3b61	07c540c4	b04e4670	21ddcdc9	5840adea	60f6221e		3a171ecb	43f13e8b	e8b83407	731c3655
0	2	0	1	14	767	89	4	2	245	1	3	3	45	287e684f	0a519c5c	02cf9876	c18be181	25c83c98	7e0ccccf	c78204a1	0b153874	a73ee510	3b08e48b	5f5e6091	8fe001f4	aa655a2f	07d13a8f	6dc710ed	36103458	8efede7f	3412118d			e587c466	ad3062eb	3a171ecb	3b183c5c		
0		893			4392		0	0	0		0			68fd1e64	2c16a946	a9a87e68	2e17d6f6	25c83c98	fe6b92e5	2e8a689b	0b153874	a73ee510	efea433b	e51ddf94	a30567ca	3516f6e6	07d13a8f	18231224	52b8680f	1e88c74f	74ef3502			6b3a5ca6		3a171ecb	9117a34a		
0	3	-1		0	2	0	3	0	0	1	1	

In [8]:
# see how many messages/lines are in the file 
!wc -l {TOY_EXAMPLE}

50 /media/notebooks/w261_final_project/data/toy_example.txt


In [9]:
# Spark RDDs for each dataset
testRDD = sc.textFile(TOY_EXAMPLE) 
dataRDD = sc.textFile(TRAIN_DATA)

# Section 3: EDA

The first thing we want to check is the count of the ads that were clicked (label=1) and count of ads that were not clicked (label=0) and also get what percent of ads in the dataset were actually clicked. In this case we can do that by just taking the mean of the label column. 

In [10]:
def EDA1(rdd):
    total, total_clicked = None, None
    
    result = rdd.map(lambda line: line.split('\t')[0]).cache()

    total = result.count()
    total_clicked = result.reduce(lambda a, b: int(a) + int(b))
    
    return total, total_clicked

In [11]:
# test run on the toy example
start = time.time()
total, total_clicked = EDA1(testRDD)
print("Wall time: {} seconds".format(time.time() - start))

Wall time: 1.3171603679656982 seconds


In [12]:
# EDA1 on the whole train data
start = time.time()
total, total_clicked = EDA1(dataRDD)
print("Wall time: {} seconds".format(time.time() - start))

Wall time: 88.06063556671143 seconds


In [13]:
total_unclicked = total - total_clicked
mean = total_clicked / float(total)

In [14]:
print("Training dataset size: ", total)
print("Total examples with positive Label: ", total_clicked)
print("Total negative examples: ", total_unclicked)
print("Percent of ads in the data that were actually clicked: ", mean)

Training dataset size:  45840617
Total examples with positive Label:  11745438
Total negative examples:  34095179
Percent of ads in the data that were actually clicked:  0.2562233837297609


We see that 25.62% of the ads were clicked or the CTR is 25.62%. 

Next let's see how many unique values each categorical variables have.

In [None]:
def EDA2(rdd):
    def parseLine(line):
        values = line.replace('\n', '').split('\t')
        cf = values[14:]
        for i in range(len(cf)):
            key = f"C{i}_{cf[i]}"
            yield (key, 1)
   
    def gatherCounts(record):
        values = record[0].split('_');
        yield (values[0], 1)
        
    result = rdd.flatMap(parseLine) \
                .reduceByKey(lambda x,y : x + y) \
                .flatMap(gatherCounts) \
                .reduceByKey(lambda x,y : x + y) \
                .collect()
    return result


In [18]:
# test run on the toy example
start = time.time()
categorical_feature_values = EDA2(testRDD)
print(categorical_feature_values)
print("Wall time: {} seconds".format(time.time() - start))


[('C0', 12), ('C14', 47), ('C21', 5), ('C1', 33), ('C2', 45), ('C10', 45), ('C15', 44), ('C17', 44), ('C25', 16), ('C19', 4), ('C22', 10), ('C8', 2), ('C3', 41), ('C5', 6), ('C6', 47), ('C9', 34), ('C11', 45), ('C16', 8), ('C18', 8), ('C20', 45), ('C24', 7), ('C7', 7), ('C12', 44), ('C13', 8), ('C23', 36), ('C4', 6)]
Wall time: 0.2873992919921875 seconds


In [20]:
start = time.time()
categorical_feature_values = EDA2(dataRDD)
print(categorical_feature_values)
print("Wall time: {} seconds".format(time.time() - start))

[('C25', 142572), ('C14', 14992), ('C6', 12517), ('C18', 2173), ('C22', 15), ('C8', 3), ('C9', 93145), ('C2', 10131227), ('C1', 583), ('C5', 24), ('C23', 286181), ('C15', 5461306), ('C7', 633), ('C19', 4), ('C3', 2202608), ('C20', 7046547), ('C0', 1460), ('C16', 10), ('C21', 18), ('C11', 8351593), ('C24', 105), ('C4', 305), ('C17', 5652), ('C12', 3194), ('C13', 27), ('C10', 5683)]
Wall time: 512.3944199085236 seconds


# Section 4: Algorithm 
    
##Algorithm description with math
##Toy example

In [19]:
type(testRDD)

pyspark.rdd.RDD

In [22]:
data = testRDD.takeSample(False, 2000, 42)
sampleData = sc.parallelize(data)

In [32]:
# Load and parse the data
def parsePoint(line):
    values = line.replace('\n', '').split('\t')
    label = int(values[0])
    features = []
    for f in values[1:14]:
        if f == '':
            features.append(0)
        else:
            features.append(int(f))
    for f in values[14:]:
        features.append(f)
        
    return LabeledPoint(label, features)

parsedData = sampleData.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 20.0 failed 1 times, most recent failure: Lost task 0.0 in stage 20.0 (TID 692, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/rdd.py", line 1371, in takeUpToNumLeft
    yield next(iterator)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-da7f1cb6837a>", line 14, in parsePoint
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 54, in __init__
    self.features = _convert_to_vector(features)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '5a9ed9b0'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda/lib/python3.6/site-packages/pyspark-2.3.1-py3.6.egg/pyspark/rdd.py", line 1371, in takeUpToNumLeft
    yield next(iterator)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 55, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-da7f1cb6837a>", line 14, in parsePoint
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/regression.py", line 54, in __init__
    self.features = _convert_to_vector(features)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 74, in _convert_to_vector
    return DenseVector(l)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 289, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: '5a9ed9b0'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:149)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


# Section4: Implementation 