We followed the footsteps of the Criteo winners, per presentation below:
http://www.csie.ntu.edu.tw/~r01922136/kaggle-2014-criteo.pdf

## Start Spark Context 

In [None]:
import os
import sys

# spark_home = os.environ['SPARK_HOME'] = '/usr/local/Cellar/apache-spark/1.5.0/libexec/'
spark_home = os.environ['SPARK_HOME'] = '/usr/local/Cellar/apache-spark/1.5.0/libexec/'


if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

### First we need to parse raw data,  create an RDD of (feaureID, value) tuples for ONLY categorical values (value is not an int)
Per PowerPoint of the Criteo Winners, we are ignoring first 14 features as 1 label and 13 integer features. 

In [None]:
def parsePointForCategoricalOnly(point):
    """Converts a comma separated string into a list of (featureID, value) tuples ONLY IF value is not an integer. 
    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
        [(1, "cat"), (1,"dog"), ... ]
    """
    values = [x for x in point.split("\t")]
    num_features = len(values)
    x = []
    
    #get only 14th feature and after
    for i in range(14, num_features):
            x.append((i, values[i]))
    return x


In [None]:
#test on sample
rawData = sc.textFile('dac_sample.txt')
rawData.cache()
parsedTrainFeat = rawData.map(parsePointForCategoricalOnly)
parsedTrainFeat.cache()

print parsedTrainFeat.take(1)


### Now we must create a feature of hot 

In [None]:


# TODO: Replace <FILL IN> with appropriate code
def createOneHotDict(inputData):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
            made up of a list of (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
            [((3, "cat"),4),((1, "dog"),1)] 
    """
    return (inputData
            .flatMap(lambda x: x)
            .distinct()
            .sortByKey()
            .zipWithIndex()
            .collectAsMap()            
           )



sampleOHEDictAuto = createOneHotDict(parsedTrainFeat)

#cache it as it will be reused later
sampleOHEDictAuto.cache()
numCtrOHEFeats =  len(sampleOHEDictAuto)


In [None]:
print sampleOHEDictAuto

### Now forEach data we will create Labeled Point

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector

# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
        ignored.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    featureArray = []
    for featTuple in rawFeats:
        
        if featTuple in OHEDict: #tuple is in the dict 
            featureArray.append(OHEDict[featTuple])
        else: #skip it
            pass
    
    #sort the indices
    featureArray.sort()
    
    #we are assuming no duplicates, thus occurrence of each feature is 1 (last parameter in SparseVector)
    return SparseVector(numOHEFeats, featureArray,[1] * len(featureArray))


def parseOHEPoint(point, OHEDict, numOHEFeats, getRaw=False):
    """Obtain the label and feature vector for this raw observation.

    Note:
        You must use the function `oneHotEncoding` in this implementation or later portions
        of this lab may not function as expected.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.
        OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The number of unique features in the training dataset.
        getRaw: if false, return LabelPoint with SparseVector for categorical features. 
                if true, return "\t" delimited mixture of integer and categorical features 
                        with categorical features having value of either 0 or 1 

    Returns:
        LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
            raw features based on the provided OHE dictionary.
    """
    values = [x for x in point.split("\t")]
    num_features = len(values)
    x = []
    
    #per Criteo Winner's Powerpoint we will treat only 14th feature and later as categorical
    for i in range(14, num_features):
        x.append((i, values[i]))
    features = oneHotEncoding(x, OHEDict, numOHEFeats)
    
    if getRaw: #must collect label and integer features in addition to the categorical ones
        rawDataArray = []
        
        #get label and integer features
        for i in range(0,13):
            rawDataArray.append(values[i])
        
        #now convert SparseVector features to 
        rawDataArray.extend([str(int(x)) for x in features.toArray().tolist()])
        return "\t".join(rawDataArray)
        
    else: 
        return LabeledPoint(values[0], features)


#OHETrainData = rawData.map(lambda point: parseOHEPoint(point, sampleOHEDictAuto, numCtrOHEFeats))
#OHETrainData.cache()
#print OHETrainData.take(1)


In [None]:
bleh = SparseVector(20, [3,4],[1,1])
print bleh.toArray().tolist()

### Then we will count each features so we can prune features that don't happen too often 

In [None]:
#def bucketFeatByCount(featCount):
#    """Bucket the counts by powers of two."""
#    for i in range(11):
#        size = 2 ** i
#        if featCount <= size:
#            return size
#    return -1

featCounts = (OHETrainData
              .flatMap(lambda lp: lp.features.indices)
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x + y)).sortByKey()

print featCounts.collect()
#featCountsBuckets = (featCounts
#                     .map(lambda x: (bucketFeatByCount(x[1]), 1))
#                     .filter(lambda (k, v): k != -1)
#                     .reduceByKey(lambda x, y: x + y)
#                     .collect())
#print featCountsBuckets

### now we will prune the original dictionary 
.sortByKey() #.collectAsMap()

In [None]:
#limit = 10

#prunedOHCDictionary = sc.parallelize(sampleOHEDictAuto.items()).map(lambda x: (x[1],x[0])).join(featCounts).filter(lambda keyValue: keyValue[1][1] >= limit).sortByKey().map(lambda x:(x[1][0],x[0]))  #.collectAsMap() #take(10)

#print prunedOHCDictionary.take(10)

In [None]:
limit = 10

prunedOHCDictionary = sc.parallelize(sampleOHEDictAuto.items()).map(lambda x: (x[1],x[0])).join(featCounts).filter(lambda keyValue: keyValue[1][1] >= limit).sortByKey().map(lambda x: x[1][0]).zipWithIndex().collectAsMap() #take(10)     

print prunedOHCDictionary.take(10)

### we will re-encode the original data using new dictionary
but we never played with the mixture of integer and 

maybe for each feature it wold be 1 and 0 

In [None]:
OHETrainDataPruned = rawData.map(lambda point: parseOHEPoint(point, prunedOHCDictionary, len(prunedOHCDictionary),True))
OHETrainDataPruned.cache()
print OHETrainDataPruned.take(1)



## Putting everything together

In [None]:
%%writefile CriteoFirstPreprocessOHE.py
import sys
import ast
import json
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector


#Compile a tuple of categorical feature and its value
def parsePointForCategoricalOnly(point):
    """Converts a comma separated string into a list of (featureID, value) tuples ONLY IF value is not an integer. 
    Note:
        featureIDs should start at 0 and increase to the number of features - 1.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.

    Returns:
        list: A list of (featureID, value) tuples.
        [(1, "cat"), (1,"dog"), ... ]
    """
    values = [x for x in point.split("\t")]
    num_features = len(values)
    x = []
    
    #get only 14th feature and after
    for i in range(14, num_features):
            x.append((i, values[i]))
    return x

#create one-hot-encoder dictionary given a raw input
def createOneHotDict(rawData):
    """Creates a one-hot-encoder dictionary based on the input data.

    Args:
        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
            made up of a list of (featureID, value) tuples.

    Returns:
        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
            [((3, "cat"),4),((1, "dog"),1)] 
    """
    #get tuples for each 
    #inputData = rawData.map(parsePointForCategoricalOnly)
    #print parsedTrainFeat.take(1)
    
 
    return (rawData
            .map(parsePointForCategoricalOnly) #get tuple of all of the features
            .flatMap(lambda x: x) #everything here and below is to create a dictionary
            .distinct()
            .sortByKey()
            .zipWithIndex()
            .collectAsMap()            
           )

def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

    Note:
        If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
        ignored.

    Args:
        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
            value).

    Returns:
        SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    """
    featureArray = []
    for featTuple in rawFeats:
        
        if featTuple in OHEDict: #tuple is in the dict 
            featureArray.append(OHEDict[featTuple])
        else: #skip it
            pass
    
    #sort the indices
    featureArray.sort()
    
    #we are assuming no duplicates, thus occurrence of each feature is 1 (last parameter in SparseVector)
    return SparseVector(numOHEFeats, featureArray,[1] * len(featureArray))

#for each line of data return either sparse vector or raw data, given a dictionary
def parseOHEPoint(point, OHEDict, getRaw=False):
    """Obtain the label and feature vector for this raw observation.

    Note:
        You must use the function `oneHotEncoding` in this implementation or later portions
        of this lab may not function as expected.

    Args:
        point (str): A comma separated string where the first value is the label and the rest
            are features.
        OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
        getRaw: if false, return LabelPoint with SparseVector for categorical features. 
                if true, return "\t" delimited mixture of integer and categorical features 
                        with categorical features having value of either 0 or 1 

    Returns:
        LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
            raw features based on the provided OHE dictionary.
    """
    numOHEFeats = len(OHEDict)
    #print >> sys.stderr, OHEDict

    
    values = [x for x in point.split("\t")]
    num_features = len(values)
    x = []
    
    #per Criteo Winner's Powerpoint we will treat only 14th feature and later as categorical
    for i in range(14, num_features):
        x.append((i, values[i]))
    features = oneHotEncoding(x, OHEDict, numOHEFeats)
    
    if getRaw: #must collect label and integer features in addition to the categorical ones
        rawDataArray = []
        
        #get label and integer features
        for i in range(0,14):
            rawDataArray.append(values[i])
        
        #now convert SparseVector features to 
        rawDataArray.extend([str(int(x)) for x in features.toArray().tolist()])
        return "\t".join(rawDataArray)
        
    else: 
        return LabeledPoint(values[0], features)

def create_LIBSVM(point):
    values = [x for x in point.split("\t")]
    num_features = len(values)
    output = [values[0]]
    for i in range(1, num_features):
        input = str(i) + ":" + str(values[i])
        output.append(input)
    return "\t".join(output)

def main_preprocessWithOHE(input_file, limit, output_file):
    
    #test on sample
    rawData = sc.textFile(input_file) #'dac_sample.txt'
    rawData.cache() #cache since it will be used later to generate final preprocessed output
    
    prelimOHEDict = createOneHotDict(rawData) 
    
    #apply OHE to each data based on dictionary created above
    prelimOHETrainData = rawData.map(lambda point: parseOHEPoint(point, prelimOHEDict))
    
    #now get count of each feature so we can prune them
    featCounts = (prelimOHETrainData
              .flatMap(lambda lp: lp.features.indices)
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x + y)).sortByKey()
    
    #based on the count prune and create new dictionary - take top 26 features with highest counts
    prunedOHEDictionary = sc.parallelize(prelimOHEDict.items()) \
              .map(lambda x: (x[1],x[0])) \
              .join(featCounts) \
              .map(lambda x: (int(x[1][1]), x[1][0]))  \
              .sortByKey(False) \
              .take(26)
    prunedOHEDictionary = sc.parallelize(prunedOHEDictionary)  \
                           .map(lambda x: x[1]).zipWithIndex().collectAsMap()
    
    #parse raw data again and create features
    finalTrainData = rawData.map(lambda point: parseOHEPoint(point, prunedOHEDictionary, True))
    
    libSVMTrainData = finalTrainData.map(lambda point: create_LIBSVM(point))
    libSVMTrainData.saveAsTextFile(output_file)
    
if __name__ == "__main__":
    
    # three arguments
    #  1. input file
    #  2. limit
    #  3. output file
    
    if len(sys.argv) < 3:
        print >> sys.stderr, "Usage: input_file limit_for_frequency_of_features <output_file>"
        exit(-1)

    [input_file, limit, output_file] = sys.argv[1:4]
    
    sc = SparkContext(appName="PreprocessOHE")
    main_preprocessWithOHE(input_file, limit, output_file)    
    sc.stop()


In [None]:
#run locally

#ensure folder with output name does not exist
!rm -r output

#submit spark job (note that page_rank.py has 4 arguments)
!time /usr/local/Cellar/apache-spark/1.5.0/libexec/bin/spark-submit CriteoFirstPreprocessOHE.py dac_sample.txt 10 output     

#output results:
!rm output/_SUCCESS
!echo '-----------------OUTPUT-----------------'
!cat output/*


In [None]:
!sed 1d output/* > output/output.txt