# Click-Through Rate Prediction
### Notebook created by [Wenyi Xu](https://github.com/xuwenyihust)
### Create a [click-through rate](https://www.kaggle.com/c/criteo-display-ad-challenge) (CTR) prediction pipeline.

In [2]:
import numpy as np
from pyspark.mllib.linalg import SparseVector

### 1. Parse CTR Data

View Criteo's agreement.

In [4]:
from IPython.lib.display import IFrame

IFrame("http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/", 600, 350)

#### Load the data

In [6]:
# Hidden path
rawData = sc.textFile('/FileStore/tables/86sw321e1469469485636/dac_sample.txt').map(lambda x: x.replace('\t', ','))
print rawData.take(1)
print type(rawData)

#### Split the dataset into training, validation and test sets

Specify the weights & seed for randomSplit method.

**Training : Validation : Test** will be **8 : 1 : 1**.

In [8]:
weights = [.8, .1, .1]
seed = 42

In [9]:
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed)

**Cache** the splitted datasets, since we will be repeatedly using them.

In [11]:
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()

**Data preview**.

In [13]:
nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawData.take(1)

#### Extract features

Split each datapoint of type string into different field.

Drop the first field -> **label** (clicked or not)

Save the remaining fields -> **features**

Define a *parse_data_point* function, input each data point(row), return a list of **(featureID, value)** tuples.

In [16]:
def parsePoint(point):
    """Converts a comma separated string into a list of (featureID, value) tuples.

    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
    """
    features = point.split(',')[1:]
    return [(idx, value) for (idx, value) in enumerate(features)]

In [17]:
parsedTrainFeat = rawTrainData.map(parsePoint)
print parsedTrainFeat.take(1)

We can see that now the string of features has been splitted into a list of features.

Count the **number of distinct values for each feature**.

In [19]:
numCategories = (parsedTrainFeat
                 # Flatten all the elements in the list
                 .flatMap(lambda x: x)
                 # Drop the duplicated values for features
                 .distinct()
                 # Set feature value to 1 for the convenience of counting
                 .map(lambda x: (x[0], 1))
                 # Count how many times each key (featureID) occurs
                 .reduceByKey(lambda x, y: x + y)
                 .sortByKey()
                 .collect())

print numCategories[2][1]

### 2. Generate OHE Features

#### Construct an OHE dictionary

Categorical feature:  **(featureID, category)** tuple.

OHE dictionary:  **Map** each tuple **to** a distinct **integer**.

Function:

- Input the lists of (featureID, category) tuples.
- Flatten the lists to get all the unique (featureID, category) tuples.
- Attach a unique integer to each distinct feature to create the dictionary.

In [22]:
def createOneHotDict(inputData):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
            made up of a list of (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
    """
    distinctFeats = (inputData
                       .flatMap(lambda row: row)
                       .distinct())
    
    return (distinctFeats
                           # Zips this RDD with its element indices
                           # (featureID, value) -> ((featureID, value), int)
                           .zipWithIndex()
                           # Return the key-value pairs in this RDD to the master as a dictionary.
                           .collectAsMap())

Create a **OHE dictionary** based on the **parsedTrainFeat**:

    [[(0, u'1'), (1, u'1'), (2, u'5'), (3, u'0'), (4, u'1382'), (5, u'4'), (6, u'15'), ...

In [24]:
ctrOHEDict = createOneHotDict(parsedTrainFeat)
numCtrOHEFeats = len(ctrOHEDict.keys())
print 'Number of elements in the dictionary: ', numCtrOHEFeats

In [25]:
print "(0, '1'): ", ctrOHEDict[(0, '1')]
print "(0, '3'): ", ctrOHEDict[(0, '3')]
print "(1, '4'): ", ctrOHEDict[(1, '4')]
print "(3, '1'): ", ctrOHEDict[(3, '1')]
print "(9, '3'): ", ctrOHEDict[(9, '3')]

#### Define a OHE function

Use it to generate **OHE features** from the original categorical data.

The OHE features should be **SparseVector** format to reduce the storage & computational burdens.

In [27]:
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    return SparseVector(numOHEFeats, [(OHEDict[(featID, value)],1) for (featID, value) in rawFeats])

#### Apply OHE to the dataset

For each data point (sample):

**data point -> one-hot encoded (categorical to numerical) -> sparse vector -> labeled point**

In [29]:
from pyspark.mllib.regression import LabeledPoint

In [30]:
def parseOHEPoint(point, OHEDict, numOHEFeats):
    """Obtain the label and feature vector for this raw observation.

    Note:
        You must use the function `oneHotEncoding` in this implementation or later portions
        of this lab may not function as expected.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.
        OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The number of unique features in the training dataset.

    Returns:
        LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
            raw features based on the provided OHE dictionary.
    """
    return LabeledPoint(point.split(',')[0],oneHotEncoding(parsePoint(point), OHEDict, numCtrOHEFeats))

In [31]:
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
print OHETrainData.take(1)

To explain the format of the resulting data points:

In [33]:
SparseVector(10, [(1,1), (6,1)])

In [34]:
LabeledPoint(0, SparseVector(10, [(1,1), (6,1)]))

#### Handle unseen features

Compute OHE features for the validation and test datasets. 

**Some categorical values will likely appear in new data that did not exist in the training data.**

To deal with this situation, update the oneHotEncoding() function to ignore previously unseen categories, and then compute OHE features for the validation data.

In [36]:
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
        ignored.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    validFeatureTuples = []
    for (featID, value) in rawFeats:
        try:
            validFeatureTuples.append((OHEDict[(featID, value)],1))
        except KeyError:
            pass
    return SparseVector(numOHEFeats, validFeatureTuples)

In [37]:
OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHEValidationData.cache()
print OHEValidationData.take(1)

### 3. Model Construction

Choose **logistic regression**. 

It models the **probability of a click-through event** rather than returning a binary response, and when working with rare events, probabilistic predictions are useful.

In [39]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

Set the **hyperparameters**.

In [41]:
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True

**Train** the logistic regression model.

In [43]:
model0 = LogisticRegressionWithSGD.train(OHETrainData,iterations=numIters,
                                         step=stepSize,regParam=regParam,
                                         regType=regType,intercept=includeIntercept)

In [44]:
sortedWeights = sorted(model0.weights)
print sortedWeights[:5], model0.intercept

### 4. Model Evaluation

Use **log loss** to evaluate the quality of models.

Write a function to compute **log loss**.

In [47]:
from math import log

def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

    Note:
        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

    Args:
        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

    Returns:
        float: The log loss value.
    """

    epsilon = 10e-12
    if p == 0:
        p += epsilon
    elif p == 1:
        p -= epsilon
    if y == 1:
        return -log(p)
    else:
        return -log(1 - p)

Use the log loss to evaluate some sample imputs.

In [49]:
print computeLogLoss(.5, 1)
print computeLogLoss(.5, 0)
print computeLogLoss(.99, 1)
print computeLogLoss(.99, 0)
print computeLogLoss(.01, 1)
print computeLogLoss(.01, 0)
print computeLogLoss(0, 1)
print computeLogLoss(1, 1)
print computeLogLoss(1, 0)

#### Baseline Log Loss

Always make the same prediction independent of the given datapoint, setting the predicted value equal to the fraction of training points that correspond to click-through events (i.e., where the label is one). 

Compute this value (which is simply the mean of the training labels), and then use it to compute the training log loss for the baseline model. The log loss for multiple observations is the mean of the individual log loss values.

In [51]:
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower
classOneFracTrain = OHETrainData.map(lambda lp: lp.label).mean()
print classOneFracTrain

logLossTrBase = OHETrainData.map(lambda lp: computeLogLoss(classOneFracTrain, lp.label)).mean()
print 'Baseline Train Logloss = {0:.3f}\n'.format(logLossTrBase)

#### Predict the probability

Generate predictions from the model.

In [53]:
from math import exp #  exp(-t) = e^-t

def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

    Note:
        We'll bound our raw prediction between 20 and -20 for numerical purposes.

    Args:
        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

    Returns:
        float: A probability between 0 and 1.
    """
    rawPrediction = w.dot(x) + intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return 1/(1 + exp(-rawPrediction))

In [54]:
trainingPredictions = OHETrainData.map(lambda lp: getP(lp.features, model0.weights, model0.intercept))

print trainingPredictions.take(5)

#### Evaluate the Model

In [56]:
def evaluateResults(model, data):
    """Calculates the log loss for the data given the model.

    Args:
        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

    Returns:
        float: Log loss for the data.
    """
    return (data
            # First get the predictions.
            .map(lambda lp: (lp.label, getP(lp.features, model.weights, model.intercept)))
            # Then use the predictions to compute the log loss
            .map(lambda (label, prediction): computeLogLoss(prediction, label))
            .mean())

In [57]:
logLossTrLR0 = evaluateResults(model0, OHETrainData)
print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossTrBase, logLossTrLR0))

#### Validation Log Loss

Compute the validation log loss for both the baseline and logistic regression models.

In [59]:
logLossValBase = OHEValidationData.map(lambda lp: computeLogLoss(classOneFracTrain, lp.label)).mean()

logLossValLR0 = evaluateResults(model0, OHEValidationData)
print ('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossValBase, logLossValLR0))

#### ROC Curve

Show the trade-off between the false positive rate and true positive rate.

In [61]:
import matplotlib.pyplot as plt

x, y = zip(*featCountsBuckets)
x, y = np.log(x), np.log(y)

def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

In [62]:
labelsAndScores = OHEValidationData.map(lambda lp:
                                            (lp.label, getP(lp.features, model0.weights, model0.intercept)))
labelsAndWeights = labelsAndScores.collect()
labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
labelsByWeight = np.array([k for (k, v) in labelsAndWeights])

length = labelsByWeight.size
truePositives = labelsByWeight.cumsum()
numPositive = truePositives[-1]
falsePositives = np.arange(1.0, length + 1, 1.) - truePositives

truePositiveRate = truePositives / numPositive
falsePositiveRate = falsePositives / (length - numPositive)

# Generate layout and plot data
fig, ax = preparePlot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(falsePositiveRate, truePositiveRate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
pass

In [63]:
display(fig)

### 5. Reduce Feature Dimension

In [65]:
from collections import defaultdict
import hashlib

def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use printMapping=True for debug purposes and to better understand how the hashing works.

    Args:
        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

#### Create hashed features.

In [67]:
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

    Args:
        point (str): A comma separated string where the first value is the label and the rest are
            features.
        numBuckets: The number of buckets to hash to.

    Returns:
        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
            features.
    """
    fields = point.split(',')
    label = fields[0]
    features = parsePoint(point)
    return LabeledPoint(label, SparseVector(numBuckets, hashFunction(numBuckets, features)))

In [68]:
numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda point: parseHashPoint(point, numBucketsCTR))
hashTestData.cache()

print hashTrainData.take(1)

#### Re-train the model

Run a **grid search** to find suitable hyperparameters for the hashed features, evaluating via log loss on the validation data.

In [70]:
numIters = 500
regType = 'l2'
includeIntercept = True

# Initialize variables using values from initial model training
bestModel = None
bestLogLoss = 1e10

In [71]:
stepSizes = [1, 10]
regParams = [1e-6, 1e-3]
for stepSize in stepSizes:
    for regParam in regParams:
        model = (LogisticRegressionWithSGD
                 .train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
                        intercept=includeIntercept))
        logLossVa = evaluateResults(model, hashValidationData)
        print ('\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'
               .format(stepSize, regParam, logLossVa))
        if (logLossVa < bestLogLoss):
            bestModel = model
            bestLogLoss = logLossVa

print ('Hashed Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossValBase, bestLogLoss))

#### Evaluate on the test set.

In [73]:
# Log loss for the best model from (5d)
logLossTest = evaluateResults(bestModel, hashTestData)

# Log loss for the baseline model
logLossTestBaseline = hashTestData.map(lambda lp: computeLogLoss(classOneFracTrain, lp.label)).mean()

print ('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(logLossTestBaseline, logLossTest))