##  Featurize categorical data using one-hot-encoding

In [2]:
sqlContext.setConf('spark.sql.shuffle.partitions', '6') 

In [3]:
import numpy as np
from pyspark.mllib.linalg import SparseVector

In [4]:
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    y=[]
    for x in raw_feats:
      y.append(ohe_dict_broadcast.value[x])
      #print y
    #print ohe_dict_broadcast.value[raw_feats]
    #print np.ones(len(raw_feats))
    y.sort()
    return SparseVector(num_ohe_feats,y,np.ones(len(raw_feats)))

# Calculate the number of features in sample_ohe_dict_manual
num_sample_ohe_feats = len(sample_ohe_dict_manual)
sample_ohe_dict_manual_broadcast = sc.broadcast(sample_ohe_dict_manual)

In [5]:
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import VectorUDT

def ohe_udf_generator(ohe_dict_broadcast):
    """Generate a UDF that is setup to one-hot-encode rows with the given dictionary.

    Note:
        We'll reuse this function to generate a UDF that can one-hot-encode rows based on a
        one-hot-encoding dictionary built from the training data.  Also, you should calculate
        the number of features before calling the one_hot_encoding function.

    Args:
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.

    Returns:
        UserDefinedFunction: A UDF can be used in `DataFrame` `select` statement to call a
            function on each row in a given column.  This UDF should call the one_hot_encoding
            function with the appropriate parameters.
    """
    
    length =len(ohe_dict_broadcast.value)
    return udf(lambda x: one_hot_encoding(x,ohe_dict_broadcast,length), VectorUDT())

### Automated creation of an OHE dictionary

In [7]:
def create_one_hot_dict(input_df):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        input_df (DataFrame with 'features' column): A DataFrame where each row contains a list of
            (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
    """
    return (input_df
              .select(explode(input_df.features))
              .distinct()
              .rdd
              .map(lambda l:tuple(l[0]))
              .zipWithIndex()
              .collectAsMap()
            )

##  Parse CTR data and generate OHE features

In [9]:
def cleanup_old_downloads():
  from fnmatch import fnmatch
  for f in dbutils.fs.ls('/tmp'):
    name = str(f.name)
    if fnmatch(name, 'criteo_*'):
      dbutils.fs.rm(str(f.path), recurse=True)

def download_criteo(url):
  from io import BytesIO
  import urllib2
  import tarfile
  import uuid
  import tempfile
  import random
  import string
  import os

  if not url.endswith('dac_sample.tar.gz'):
    raise Exception('Check your download URL. Are you downloading the sample dataset?')

  cleanup_old_downloads()


  rng = random.SystemRandom()
  tlds = ('.org', '.net', '.com', '.info', '.biz')
  random_domain_name = (
    ''.join(rng.choice(string.letters + string.digits) for i in range(64)) +
    rng.choice(tlds)
  )
  random_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, random_domain_name)).replace('-', '_')
  unique_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, random_id)).replace('-', '_')
  dbfs_dir  = 'dbfs:/tmp/criteo_{0}'.format(unique_id)
  dbfs_path = '{0}/data.txt'.format(dbfs_dir)
  dbutils.fs.mkdirs(dbfs_dir)

  
  tmp = BytesIO()
  req = urllib2.Request(url, headers={'User-Agent': 'Databricks'})
  url_handle = urllib2.urlopen(req)
  tmp.write(url_handle.read())
  tmp.seek(0)
  tf = tarfile.open(fileobj=tmp)
  dac_sample = tf.extractfile('dac_sample.txt')
  dac_sample = '\n'.join([unicode(x.replace('\n', '').replace('\t', ',')) for x in dac_sample])

  with tempfile.NamedTemporaryFile(mode='wb', delete=False, prefix='dac', suffix='.txt') as t:
    t.write(dac_sample)
    t.close()
    dbutils.fs.cp('file://{0}'.format(t.name), dbfs_path)
    os.unlink(t.name)

  return dbfs_path

In [10]:
criteo_url = 'http://criteolabs.wpengine.com/wp-content/uploads/2015/04/dac_sample.tar.gz'

In [11]:
if ('downloaded_data_file' not in locals()) or (downloaded_data_file is None):
  downloaded_data_file = download_criteo(criteo_url)

if ('raw_df' in locals()) and (raw_df is not None):
  print "raw_df is already loaded. Nothing to do. (Set raw_df=None to reload it, then re-run this cell.)"
else:
  raw_df = sqlContext.read.text(downloaded_data_file).withColumnRenamed("value", "text")

print "raw_df initialized to read from {0}".format(downloaded_data_file)

In [12]:
  raw_df.show()

###  Loading and splitting the data

In [14]:
weights = [.8, .1, .1]
seed = 42

raw_train_df, raw_validation_df, raw_test_df = raw_df.randomSplit(weights,seed)

n_train = raw_train_df.cache().count()
n_val = raw_validation_df.cache().count()
n_test = raw_test_df.cache().count()
print n_train, n_val, n_test, n_train + n_val + n_test
raw_df.show(1,truncate=False)

###  Extract features

In [16]:
def parse_point(point):
    """Converts a comma separated string into a list of (featureID, value) tuples.

    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
    """
    y= point.split(',')
    return (list(enumerate(y[1:])))

print parse_point(raw_df.select('text').first()[0])

In [17]:
from pyspark.sql.functions import udf, split
from pyspark.sql.types import ArrayType, StructType, StructField, LongType, StringType, DoubleType

parse_point_udf = udf(parse_point, ArrayType(StructType([StructField('_1', LongType()),
                                                         StructField('_2', StringType())])))

def parse_raw_df(raw_df):
    """Convert a DataFrame consisting of rows of comma separated text into labels and feature.


    Args:
        raw_df (DataFrame with a 'text' column): DataFrame containing the raw comma separated data.

    Returns:
        DataFrame: A DataFrame with 'label' and 'feature' columns.
    """
    return raw_df.select(split(raw_df.text,',').getItem(0).cast(DoubleType()).alias('label'),parse_point_udf(raw_df.text).alias('feature')).cache()
    
    

parsed_train_df = parse_raw_df(raw_train_df)

from pyspark.sql.functions import (explode, col)
num_categories = (parsed_train_df
                    .select(explode('feature').alias('feature'))
                    .distinct()
                    .select(col('feature').getField('_1').alias('featureNumber'))
                    .groupBy('featureNumber')
                    .sum()
                    .orderBy('featureNumber')
                    .collect())

print num_categories[2][1]

###  Create an OHE dictionary from the dataset

In [19]:
ctr_ohe_dict = create_one_hot_dict(parsed_train_df.select(parsed_train_df.feature.alias('features')))
num_ctr_ohe_feats = len(ctr_ohe_dict)
print num_ctr_ohe_feats
print ctr_ohe_dict[(0, '')]

###  Applying OHE to the dataset

In [21]:
ohe_dict_broadcast = sc.broadcast(ctr_ohe_dict)
ohe_dict_udf = ohe_udf_generator(ohe_dict_broadcast)
ohe_train_df = (parsed_train_df
                  .select(parsed_train_df.label.alias('label'),ohe_dict_udf(parsed_train_df.feature).alias('features'))
                  .cache()
               )

print ohe_train_df.count()
print ohe_train_df.take(1)

### Visualization: Feature frequency

In [23]:
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql.functions import log

get_indices = udf(lambda sv: map(int, sv.indices), ArrayType(IntegerType()))
feature_counts = (ohe_train_df
                   .select(explode(get_indices('features')))
                   .groupBy('col')
                   .count()
                   .withColumn('bucket', log('count').cast('int'))
                   .groupBy('bucket')
                   .count()
                   .orderBy('bucket')
                   .collect())
feature_counts

In [24]:
import matplotlib.pyplot as plt

x, y = zip(*feature_counts)
x, y = x, np.log(y)

def prepare_plot(xticks, yticks, figsize=(10.5, 6), hide_labels=False, grid_color='#999999',
                 grid_width=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hide_labels: axis.set_ticklabels([])
    plt.grid(color=grid_color, linewidth=grid_width, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 12, 1), np.arange(0, 14, 2))
ax.set_xlabel(r'$\log_e(bucketSize)$'), ax.set_ylabel(r'$\log_e(countInBucket)$')
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
display(fig)

###  Handling unseen features

In [26]:
def one_hot_encoding(raw_feats, ohe_dict_broadcast, num_ohe_feats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        You should ensure that the indices used to create a SparseVector are sorted, and that the
        function handles missing features.

    Args:
        raw_feats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sample_one)
        ohe_dict_broadcast (Broadcast of dict): Broadcast variable containing a dict that maps
            (featureID, value) to unique integer.
        num_ohe_feats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length num_ohe_feats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    y=[]
    z=0
    for x in raw_feats:
      if x in ohe_dict_broadcast.value:
        y.append(ohe_dict_broadcast.value[x])
        z+=1
    y.sort()
    return SparseVector(num_ohe_feats,y,np.ones(z))

ohe_dict_missing_udf = ohe_udf_generator(ohe_dict_broadcast)
parsed_val=parse_raw_df(raw_validation_df)
ohe_validation_df = parsed_val.select(parsed_val.label.alias('label'),ohe_dict_missing_udf(parsed_val.feature).alias('features')).cache()

ohe_validation_df.count()
ohe_validation_df.show(1, truncate=False)

##  CTR prediction and logloss evaluation

### Logistic regression

In [29]:
standardization = False
elastic_net_param = 0.0
reg_param = .01
max_iter = 20

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=reg_param,standardization=standardization,elasticNetParam=elastic_net_param,maxIter=max_iter)

lr_model_basic = lr.fit(ohe_train_df)

print 'intercept: {0}'.format(lr_model_basic.intercept)
print 'length of coefficients: {0}'.format(len(lr_model_basic.coefficients))
sorted_coefficients = sorted(lr_model_basic.coefficients)[:5]

In [30]:
from pyspark.sql.functions import when, log, col
epsilon = 1e-16

def add_log_loss(df):
    """Computes and adds a 'log_loss' column to a DataFrame using 'p' and 'label' columns.

    Note:
        log(0) is undefined, so when p is 0 we add a small value (epsilon) to it and when
        p is 1 we subtract a small value (epsilon) from it.

    Args:
        df (DataFrame with 'p' and 'label' columns): A DataFrame with a probability column
            'p' and a 'label' column that corresponds to y in the log loss formula.

    Returns:
        DataFrame: A new DataFrame with an additional column called 'log_loss' where 'log_loss' column contains the loss value as explained above.
    """
    return df.select(df.p,df.label,when(df.p==0,epsilon).otherwise(when(df.p==1,log(df.p)-epsilon).otherwise(when(df.label==1,-log(df.p)).otherwise(when(df.label==0,-log(1-df.p))))).alias('log_loss'))

add_log_loss(example_log_loss_df).show()

###  Baseline log loss

In [32]:

from pyspark.sql.functions import lit
class_one_frac_train = ohe_train_df.groupBy().avg('label').first()[0]
print 'Training class one fraction = {0:.3f}'.format(class_one_frac_train)

log_loss_tr_base = add_log_loss(ohe_train_df.select(ohe_train_df.label.alias('label'),lit(class_one_frac_train).alias('p'))).groupBy().avg('log_loss').first()[0]
print 'Baseline Train Logloss = {0:.3f}\n'.format(log_loss_tr_base)

###  Predicted probability

In [34]:
from pyspark.sql.types import DoubleType
from math import exp #  exp(-t) = e^-t

def add_probability(df, model):
    """Adds a probability column ('p') to a DataFrame given a model"""
    coefficients_broadcast = sc.broadcast(model.coefficients)
    intercept = model.intercept

    def get_p(features):
        """Calculate the probability for an observation given a list of features.

        Note:
            We'll bound our raw prediction between 20 and -20 for numerical purposes.

        Args:
            features: the features

        Returns:
            float: A probability between 0 and 1.
        """
        # Compute the raw value
        raw_prediction = intercept+coefficients_broadcast.value.dot(features)
        # Bound the raw value between 20 and -20
        raw_prediction = 20 if raw_prediction > 20 else -20 if raw_prediction < -20 else raw_prediction
        # Return the probability
        return (1+exp(-raw_prediction))**(-1)

    get_p_udf = udf(get_p, DoubleType())
    return df.withColumn('p', get_p_udf('features'))

add_probability_model_basic = lambda df: add_probability(df, lr_model_basic)
training_predictions = add_probability_model_basic(ohe_train_df).cache()

training_predictions.show(5)

###  Evaluating the model

In [36]:
def evaluate_results(df, model, baseline=None):
    """Calculates the log loss for the data given the model.

    Note:
        If baseline has a value the probability should be set to baseline before
        the log loss is calculated.  Otherwise, use add_probability to add the
        appropriate probabilities to the DataFrame.

    Args:
        df (DataFrame with 'label' and 'features' columns): A DataFrame containing
            labels and features.
        model (LogisticRegressionModel): A trained logistic regression model. This
            can be None if baseline is set.
        baseline (float): A baseline probability to use for the log loss calculation.

    Returns:
        float: Log loss for the data.
    """
    if (baseline):
      with_probability_df = df.withColumn('p', lit(baseline))
    else:
        with_probability_df = add_probability(df, model)
    
    with_log_loss_df = add_log_loss(with_probability_df)
    log_loss = with_log_loss_df.groupBy().avg('log_loss').first()[0]
    return log_loss

log_loss_train_model_basic = evaluate_results(ohe_train_df, lr_model_basic)
print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_basic))

###  Validation log loss

Next, using the `evaluate_results` function compute the validation log loss for both the baseline and logistic regression models. Notably, the baseline model for the validation data should still be based on the label fraction from the training dataset.

In [38]:
log_loss_val_base = evaluate_results(ohe_validation_df,lr_model_basic,baseline=class_one_frac_train)

log_loss_val_l_r0 = evaluate_results(ohe_validation_df,lr_model_basic)
print ('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_val_base, log_loss_val_l_r0))

### Visualization: ROC curve

In [40]:
labels_and_scores = add_probability_model_basic(ohe_validation_df).select('label', 'p')
labels_and_weights = labels_and_scores.collect()
labels_and_weights.sort(key=lambda (k, v): v, reverse=True)
labels_by_weight = np.array([k for (k, v) in labels_and_weights])

length = labels_by_weight.size
true_positives = labels_by_weight.cumsum()
num_positive = true_positives[-1]
false_positives = np.arange(1.0, length + 1, 1.) - true_positives

true_positive_rate = true_positives / num_positive
false_positive_rate = false_positives / (length - num_positive)

# Generate layout and plot data
fig, ax = prepare_plot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(false_positive_rate, true_positive_rate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
display(fig)

## Reducing feature dimension via feature hashing

In [42]:
from collections import defaultdict
import hashlib

def hash_function(raw_feats, num_buckets, print_mapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

    Note:
        Use print_mapping=True for debug purposes and to better understand how the hashing works.

    Args:
        raw_feats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        num_buckets (int): Number of buckets to use as features.
        print_mapping (bool, optional): If true, the mappings of featureString to index will be
            printed.

    Returns:
        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    """
    mapping = { category + ':' + str(ind):
                int(int(hashlib.md5(category + ':' + str(ind)).hexdigest(), 16) % num_buckets)
                for ind, category in raw_feats}
    if(print_mapping): print mapping

    def map_update(l, r):
        l[r] += 1.0
        return l

    sparse_features = reduce(map_update, mapping.values(), defaultdict(float))
    return dict(sparse_features)



###  Creating hashed features

In [44]:
parsed_train_df.first()

In [45]:
from pyspark.mllib.linalg import Vectors
num_hash_buckets = 2 ** 15

# UDF that returns a vector of hashed features given an Array of tuples
tuples_to_hash_features_udf = udf(lambda x: Vectors.sparse(num_hash_buckets, hash_function(x, num_hash_buckets)), VectorUDT())

def add_hashed_features(df):
    """Return a DataFrame with labels and hashed features.

    Note:
        Make sure to cache the DataFrame that you are returning.

    Args:
        df (DataFrame with 'tuples' column): A DataFrame containing the tuples to be hashed.

    Returns:
        DataFrame: A DataFrame with a 'label' column and a 'features' column that contains a
            SparseVector of hashed features.
    """
    return df.select(df.label,tuples_to_hash_features_udf(df.feature).alias('features')).cache()

hash_train_df = add_hashed_features(parsed_train_df)
hash_validation_df = add_hashed_features(parsed_val)
parsed_test_df=parse_raw_df(raw_test_df)
hash_test_df = add_hashed_features(parsed_test_df)

hash_train_df.show()

###  Sparsity

In [47]:
def vector_feature_sparsity(sparse_vector):
    """Calculates the sparsity of a SparseVector.

    Args:
        sparse_vector (SparseVector): The vector containing the features.

    Returns:
        float: The ratio of features found in the vector to the total number of features.
    """
    return float(len(sparse_vector.values))/float(len(sparse_vector))

feature_sparsity_udf = udf(vector_feature_sparsity, DoubleType())

a_sparse_vector = Vectors.sparse(5, {0: 1.0, 3: 1.0})
a_sparse_vector_sparsity = vector_feature_sparsity(a_sparse_vector)
print 'This vector should have sparsity 2/5 or .4.'
print 'Sparsity = {0:.2f}.'.format(a_sparse_vector_sparsity)

In [48]:
feature_sparsity_udf = udf(vector_feature_sparsity, DoubleType())

def get_sparsity(df):
    """Calculates the average sparsity for the features in a DataFrame.

    Args:
        df (DataFrame with 'features' column): A DataFrame with sparse features.

    Returns:
        float: The average feature sparsity.
    """
    return df.select(feature_sparsity_udf(df.features).alias('avga')).groupBy().avg('avga').first()[0]

average_sparsity_ohe = get_sparsity(ohe_train_df)
average_sparsity_hash = get_sparsity(hash_train_df)

print 'Average OHE Sparsity: {0:.7e}'.format(average_sparsity_ohe)
print 'Average Hash Sparsity: {0:.7e}'.format(average_sparsity_hash)

###  Logistic model with hashed features

In [50]:
standardization = False
elastic_net_param = 0.7
reg_param = .001
max_iter = 20

lr_hash = LogisticRegression(elasticNetParam=elastic_net_param,maxIter=max_iter,regParam=reg_param,standardization=standardization)

lr_model_hashed = lr_hash.fit(hash_train_df)
print 'intercept: {0}'.format(lr_model_hashed.intercept)
print len(lr_model_hashed.coefficients)

log_loss_train_model_hashed = evaluate_results(hash_train_df,lr_model_hashed)
print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\thashed = {1:.3f}'
       .format(log_loss_tr_base, log_loss_train_model_hashed))

###  Evaluating on the test set

In [52]:
log_loss_test = evaluate_results(hash_test_df,lr_model_hashed)

# Log loss for the baseline model
class_one_frac_test = hash_test_df.groupBy().avg('label').first()[0]
print 'Class one fraction for test data: {0}'.format(class_one_frac_test)
log_loss_test_baseline = evaluate_results(hash_test_df,lr_model_basic,baseline=class_one_frac_test)

print ('Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'
       .format(log_loss_test_baseline, log_loss_test))

##  Cleanup

In [54]:
downloaded_data_file = None
raw_df = None
cleanup_old_downloads()