# Final Project

W261 Machine Learning at Scale<br>
12 December 2018

Wei Wang;
Alice Lam;
John Tabbone;
Noah Randolph

### Question Formulation

Alice Lam: "enhancing CTR means improved monetization of your current traffic (eyeballs/views). The algorithm to predict CTR accurately is useful for the platform to show specific ads to specific people who would have the highest CTR"

### Algorithm Explanation

In [1]:
%%writefile toyDataset.py
#!/usr/bin/env python

import numpy as np
import csv

SEED = 2615
NUMERICCOLS = 2
ONEHOTCOLS = 2


# start Spark Session
from pyspark.sql import SparkSession
app_name = "loadAndEDA"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .getOrCreate()
sc = spark.sparkContext


def generateToyDataset(w=[8, -3, -1, 3, 8]):
    '''generate toy logistic regression dataset with numerical and 1-hot encoded features'''
    nrows=8
    np.random.seed(SEED)
    x1 = np.random.randint(0, 10, nrows)
    x2 = np.random.randint(0, 10, nrows)
    x3 = np.random.randint(0, 2, nrows) # simulate 1-hot
    x4 = np.ones(nrows, np.int8) - x3   # with x3 and x4
    noise = np.random.normal(5, 1, nrows)
    v = (w[0] + x1*w[1] + x2*w[2] + x3*w[3] + x4*w[4] + noise)
    y = (v>0) * 2 - 1 # y = 1 or -1
    df = spark.createDataFrame(zip(y.tolist(), x1.tolist(), x2.tolist(), x3.tolist(), x4.tolist()))
    oldColNames = df.columns
    newColNames = ['Label']+['I{}'.format(i) for i in range(0,2)]+['C{}'.format(i) for i in range(0,2)]
    for oldName, newName in zip(oldColNames, newColNames):
        df = df.withColumnRenamed(oldName, newName)
    return df


def logLoss(dataRDD, W):
    """
    Compute mean squared error.
    Args:
        dataRDD - each record is a tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    """
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    loss = augmentedData.map(lambda p: (np.log(1 + np.exp(-p[1] * np.dot(W, p[0]))))) \
                        .reduce(lambda a, b: a + b)
    return loss


def GDUpdate(dataRDD, W, learningRate = 0.1):
    """
    Perform one OLS gradient descent step/update.
    Args:
        dataRDD - records are tuples of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    Returns:
        new_model - (array) updated coefficients, bias at index 0
    """
    # add a bias 'feature' of 1 at index 0
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
    
    grad = augmentedData.map(lambda p: (-p[1] * (1 - (1 / (1 + np.exp(-p[1] * np.dot(W, p[0]))))) * p[0])) \
                        .reduce(lambda a, b: a + b)
    new_model = W - learningRate * grad 
    return new_model


def dfToRDD(row):
    '''
    Converts dataframe row to rdd format.
        From: DataFrame['Label', 'I0', ..., 'C0', ...]
        To:   (features_array, y)
    '''    
    features_list = [row['I{}'.format(i)] for i in range(0, NUMERICCOLS)] + [row['C{}'.format(i)] for i in range(0, ONEHOTCOLS)]
    features_array = np.array(features_list)
    y = row['Label']
    return (features_array, y)


def normalize(dataRDD):
    """
    Scale and center data around the mean of each feature.
    """
    featureMeans = dataRDD.map(lambda x: x[0]).mean()
    featureStdev = np.sqrt(dataRDD.map(lambda x: x[0]).variance())
    normedRDD = dataRDD.map(lambda x: ((x[0] - featureMeans)/featureStdev, x[1]))
    return normedRDD


# create a toy dataset that includes 1-hot columns for development
df = generateToyDataset()   

# convert dataframe to RDD 
trainRDD = df.rdd.map(dfToRDD)




# normalize RDD
normedRDDcached = normalize(trainRDD).cache()
print(normedRDDcached.take(1))

# create initial weights to train
featureLen = len(normedRDDcached.take(1)[0][0])
wInitial = np.random.normal(size=featureLen+1) # add 1 for bias

# 1 iteration of gradient descent
w = GDUpdate(normedRDDcached, wInitial)

nSteps = 5
for idx in range(nSteps):
    print("----------")
    print(f"STEP: {idx+1}")
    w = GDUpdate(normedRDDcached, w)
    loss = logLoss(normedRDDcached, w)
    print(f"Loss: {loss}")
    print(f"Model: {[round(i,3) for i in w]}")

Overwriting toyDataset.py


In [2]:
!python toyDataset.py

2018-12-05 22:39:10 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-12-05 22:39:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2018-12-05 22:39:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
[(array([-1.63525964,  0.62123652,  1.        , -1.        ]), 1)]
----------
STEP: 1
Loss: 7.7014879401802405
Model: [1.101, -0.162, -1.391, -0.468, -0.797]
----------
STEP: 2
Loss: 6.1460241873746195
Model: [0.865, -0.478, -1.282, -0.469, -0.795]
----------
STEP: 3
Loss: 5.006600355698076
Model: [0.66, -0.743, -1.182, -0.491, -0.774]
----------
STEP: 4
Loss: 4.183106733189442
Model: [0.485, -0.964, -1.092, -0.52, -0.744]
----------
STEP: 5
Loss: 3.591671886745936
Model: [0.337, -1.15, -1.016, -0.55, -

### EDA & Discussion of Challenges

In [33]:
%%writefile loadAndEDA.py
#!/usr/bin/env python

from pyspark.sql import types
from pyspark.sql.functions import udf, col, countDistinct, isnan, when, count, desc
import pandas as pd
from pyspark.mllib.stat import Statistics

MAINCLOUDPATH = 'gs://w261_final_project/train.txt'
MINICLOUDPATH = 'gs://w261_final_project/train_005.txt'
MINILOCALPATH = 'data/train_005.txt'

SEED = 2615


# start Spark Session
from pyspark.sql import SparkSession
app_name = "loadAndEDA"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .getOrCreate()
sc = spark.sparkContext


def loadData():
    '''load the data into a Spark dataframe'''
    # select path to data: MAINCLOUDPATH; MINICLOUDPATH; MINILOCALPATH
    df = spark.read.csv(path=MINILOCALPATH, sep='\t')
    # change column names
    oldColNames = df.columns
    newColNames = ['Label']+['I{}'.format(i) for i in range(0,13)]+['C{}'.format(i) for i in range(0,26)]
    for oldName, newName in zip(oldColNames, newColNames):
        df = df.withColumnRenamed(oldName, newName)
    # change int column types to int from string
    for col in df.columns[:14]:
        df = df.withColumn(col, df[col].cast('int'))
    return df


def splitIntoTestAndTrain(df):
    '''randomly splits 80/20 into training and testing dataframes'''
    splits = df.randomSplit([0.2, 0.8], seed=SEED)
    testDf = splits[0]
    trainDf = splits[1]
    return testDf, trainDf


def displayHead(df, n=5):
    '''returns head of the training dataset'''
    return df.head(n)


def getMedians(df, cols):
    '''returns approximate median values of the columns given, with null values ignored'''
    # 0.5 relative quantile probability and 0.05 relative precision error
    return df.approxQuantile(cols, [0.5], 0.05)

def getDescribe(df, cols):
    return df.select(cols).describe().show()

def getDistinctCount(df, cols):
    return df.agg(*(countDistinct(col(c)).alias(c) for c in cols)).show()

def checkNA(df, cols):
    return df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in cols]).show()

def getCorrMatrix(df, cols):
    df = df.select(cols)
    col_names = df.columns
    features = df.rdd.map(lambda row: row[0:])
    corr_mat=Statistics.corr(features, method="pearson")
    corr_df = pd.DataFrame(corr_mat)
    corr_df.index, corr_df.columns = col_names, col_names
    return corr_df
    
    
df = loadData().cache()
testDf, trainDf = splitIntoTestAndTrain(df)
#print("\nTEST DATASET ROW COUNTS: ", testDf.count())
#print("\nTRAIN DATASET ROW COUNTS: ", trainDf.count())
## print("HEAD\n", displayHead(trainDf))
#print("\nCOLUMN TYPES\n", df.dtypes)
#print("\nMEDIAN OF NUMERIC COLUMNS\n", getMedians(trainDf, trainDf.columns[1:14]))

#print("\nDESCRIPTIONS OF NUMERICAL COLUMNS 1-7\n")
#getDescribe(trainDf, trainDf.columns[1:8])
#print("\nDESCRIPTIONS OF NUMERICAL COLUMNS 8-14\n")
#getDescribe(trainDf, trainDf.columns[8:15])
#print("\nCOUNTS OF DISTINCT VALUE FOR CATEGORICAL VARIABLE COLUMNS")
#getDistinctCount(trainDf, trainDf.columns[15:])

#print("\nCOUNTS OF NAs FOR COLUMN 0 - 19")
#checkNA(trainDf, trainDf.columns[:20])
#print("\nCOUNTS OF NAs FOR COLUMN 20 - 39")
#checkNA(trainDf, trainDf.columns[20:])

##getCorrMatrix(trainDf, trainDf.columns[1:14]) # This doesn't work if there's NA in there



Overwriting loadAndEDA.py


In [34]:
!python loadAndEDA.py

2018-12-05 23:32:10 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-12-05 23:32:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2018-12-05 23:32:11 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
2018-12-05 23:32:19 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
2018-12-05 23:32:23 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2018-12-05 23:32:23 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


In [74]:
!python submit_job_to_cluster.py --project_id=w261-222623 --zone=us-central1-b --cluster_name=testcluster --gcs_bucket=w261_final_project --key_file=$HOME/w261.json --create_new_cluster --pyspark_file=row_counts.py --instance_type=n1-standard-4 --worker_nodes=2

python: can't open file 'submit_job_to_cluster.py': [Errno 2] No such file or directory


#### Results from running EDA code above:
Main dataset:<br>
('TEST DATASET ROW COUNTS: ', 9164811)<br>
('TRAIN DATASET ROW COUNTS: ', 36675806)<br>

Toy dataset:<br>
('TEST DATASET ROW COUNTS: ', 4578)<br>
('TRAIN DATASET ROW COUNTS: ', 18379)<br>

### Algorithm Implementation

In [280]:
%%writefile algorithmImplementation.py
#!/usr/bin/env python

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, desc, isnan, when
import numpy as np
from operator import add
import copy


MAINCLOUDPATH = 'gs://w261_final_project/train.txt'
TOYCLOUDPATH = 'gs://w261_final_project/train_005.txt'
TOYLOCALPATH = 'data/train_005.txt'
NUMERICCOLS = 13
CATEGORICALCOLS = 26
NUMERICCOLNAMES = ['I{}'.format(i) for i in range(0,NUMERICCOLS)]
CATCOLNAMES = ['C{}'.format(i) for i in range(0,CATEGORICALCOLS)]
SEED = 2615


# start Spark Session
from pyspark.sql import SparkSession
app_name = "featureEngineering"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .getOrCreate()
sc = spark.sparkContext


def loadData():
    '''load the data into a Spark dataframe'''
    # select path to data: MAINCLOUDPATH; TOYCLOUDPATH; TOYLOCALPATH
    df = spark.read.csv(path=TOYLOCALPATH, sep='\t')
    # change column names
    oldColNames = df.columns
    newColNames = ['Label'] + NUMERICCOLNAMES + CATCOLNAMES
    for oldName, newName in zip(oldColNames, newColNames):
        df = df.withColumnRenamed(oldName, newName)
    # change int column types to int from string
    for col in df.columns[:14]:
        df = df.withColumn(col, df[col].cast('int'))
    return df


def splitIntoTestAndTrain(df):
    '''randomly splits 80/20 into training and testing dataframes'''
    splits = df.randomSplit([0.2, 0.8], seed=SEED)
    testDf = splits[0]
    trainDf = splits[1]
    return testDf, trainDf


def getMedians(df, cols):
    '''
    returns approximate median values of the columns given, with null values ignored
    '''
    # 0.5 relative quantile probability and 0.05 relative precision error
    return df.approxQuantile(cols, [0.5], 0.05)


def getMostFrequentCats(df, cols, n):
    '''
    returns a dict where the key is the column and value is an ordered list
    of the top n categories in that column in descending order
    '''
    freqCatDict = {col: None for col in df.columns[cols:]}
    for col in df.columns[cols:]:
        listOfRows = df.groupBy(col).count().sort('count', ascending=False).take(n)
        topCats = [row[col] for row in listOfRows]
        freqCatDict[col] = topCats[:n]
    return freqCatDict
    

def rareReplacer(df, dictOfMostFreqSets):
    '''
    Iterates through columns and replaces non-Frequent categories with 'rare' string.
    '''
    for colName in df.columns[NUMERICCOLS+1:]:
        bagOfCats = dictOfMostFreqSets[colName]
        df = df.withColumn(colName, udf(lambda x: 'rare' if x not in bagOfCats else x, StringType())(df[colName])).cache()
    return df

    
def dfToRDD(row):
    '''
    Converts dataframe row to rdd format.
        From: DataFrame['Label', 'I0', ..., 'C0', ...]
        To:   (features_array, y)
    '''    
    features_list = [row['I{}'.format(i)] for i in range(0, NUMERICCOLS)] + [row['C{}'.format(i)] for i in range(0, CATEGORICALCOLS)]
    features_array = np.array(features_list)
    y = row['Label']
    return (features_array, y)


def emitColumnAndCat(line):
    """
    Takes in a row from RDD and emits a record for each categorical column value along with a zero for one-hot encoding.
    The emitted values will become a reference dictionary for one-hot encoding in later steps.
        Input: (array([features], dtype='<U21'), 0) or (features, label)
        Output: ((categorical column, category), 0) or (complex key, value)
    The last zero in the output is for initializing one-hot encoding.
    """
    elements = line[0][NUMERICCOLS:]
    for catColName, element in zip(CATCOLNAMES, elements):
        yield ((catColName, element), 0)


def oneHotEncoder(line):
    """
    Takes in a row from RDD and emits row where categorical columns are replaced with 1-hot encoded columns.
        Input: (numerical and categorical features, label)
        Output: (numerical and one-hot encoded categorical features, label)
    """
    oneHotDict = copy.deepcopy(oneHotReference)
    elements = line[0][NUMERICCOLS:]
    for catColName, element in zip(CATCOLNAMES, elements):
        oneHotDict[(catColName, element)] = 1
    numericElements = list(line[0][:NUMERICCOLS])
    features = np.array(numericElements + [value for key, value in oneHotDict.items()], dtype=np.float)
    return (features, line[1])


def normalize(dataRDD):
    """
    Scale and center data around the mean of each feature.
    """
    featureMeans = dataRDD.map(lambda x: x[0]).mean()
    featureStdev = np.sqrt(dataRDD.map(lambda x: x[0]).variance())
    normedRDD = dataRDD.map(lambda x: ((x[0] - featureMeans)/featureStdev, x[1]))
    return normedRDD


def logLoss(dataRDD, W):
    """
    Compute mean squared error.
    Args:
        dataRDD - each record is a tuple of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    """
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1]))
    loss = augmentedData.map(lambda p: (np.log(1 + np.exp(-p[1] * np.dot(W, p[0]))))) \
                        .reduce(lambda a, b: a + b)
    return loss


def GDUpdate(dataRDD, W, learningRate = 0.1):
    """
    Perform one OLS gradient descent step/update.
    Args:
        dataRDD - records are tuples of (features_array, y)
        W       - (array) model coefficients with bias at index 0
    Returns:
        new_model - (array) updated coefficients, bias at index 0
    """
    # add a bias 'feature' of 1 at index 0
    augmentedData = dataRDD.map(lambda x: (np.append([1.0], x[0]), x[1])).cache()
    
    grad = augmentedData.map(lambda p: (-p[1] * (1 - (1 / (1 + np.exp(-p[1] * np.dot(W, p[0]))))) * p[0])) \
                        .reduce(lambda a, b: a + b)
    new_model = W - learningRate * grad 
    return new_model


# load data
df = loadData()
testDf, trainDf = splitIntoTestAndTrain(df)
testDf.cache()
trainDf.cache()

# get top n most frequent categories for each column (in training set only)
n = 10
mostFreqCatDict = getMostFrequentCats(trainDf, NUMERICCOLS+1, n)

# get dict of sets of most frequent categories in each column for fast lookups during filtering (in later code)
setsMostFreqCatDict = {key: set(value) for key, value in mostFreqCatDict.items()}

# get the top category from each column for imputation of missing values (in training set only)
fillNADictCat = {key: (value[0] if value[0] is not None else value[1]) for key, value in mostFreqCatDict.items()}

# get dict of median numeric values for imputation of missing values (in training set only)
fillNADictNum = {key: value for (key, value) in zip(trainDf.columns[1:NUMERICCOLS+1], 
                                                    [x[0] for x in getMedians(trainDf,
                                                                              trainDf.columns[1:NUMERICCOLS+1])])}

# impute missing values in training and test set
trainDf = trainDf.na.fill(fillNADictNum) \
                 .na.fill(fillNADictCat).cache()
testDf = testDf.na.fill(fillNADictNum) \
               .na.fill(fillNADictCat).cache()

# replace low-frequency categories with 'rare' string in training and test set
trainDf = rareReplacer(trainDf, setsMostFreqCatDict) # df gets cached in function
testDf = rareReplacer(testDf, setsMostFreqCatDict) # df gets cached in function

# convert dataframe to RDD 
trainRDD = trainDf.rdd.map(dfToRDD).cache()
testRDD = testDf.rdd.map(dfToRDD).cache()
        
# create and broadcast reference dictionary to be used in constructing 1 hot encoded RDD
oneHotReference = trainRDD.flatMap(emitColumnAndCat) \
                          .reduceByKeyLocally(add) # note: only the zero values are being added here (main goal is to output a dictionary)
sc.broadcast(oneHotReference)

# replace rows with new rows having categorical columns 1-hot encoded
trainRDD = trainRDD.map(oneHotEncoder).cache()
testRDD = testRDD.map(oneHotEncoder).cache()

# normalize RDD
normedTrainRDD = normalize(trainRDD).cache()
normedTestRDD = normalize(testRDD).cache() # use the mean and st. dev. from trainRDD

# create initial weights to train
featureLen = len(normedTrainRDD.take(1)[0][0])
wInitial = np.random.normal(size=featureLen+1) # add 1 for bias

# 1 iteration of gradient descent
w = GDUpdate(normedTrainRDD, wInitial)

nSteps = 10
for idx in range(nSteps):
    print("----------")
    print(f"STEP: {idx+1}")
    w = GDUpdate(normedTrainRDD, w)
    loss = logLoss(normedTrainRDD, w)
    print(f"Loss: {loss}")
    print(f"Model: {[round(i,3) for i in w]}")



Overwriting algorithmImplementation.py


In [281]:
!python algorithmImplementation.py

2018-12-08 02:44:29 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-12-08 02:44:41 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
----------
STEP: 1
Loss: inf
Model: [358.729, 21.922, 5.146, -1.764, -18.671, -41.75, -36.87, 10.065, -21.623, 0.408, 46.777, 51.258, 12.894, -45.742, -27.931, -10.78, -3.445, 3.091, 26.472, 13.583, -13.006, -5.371, 43.822, -25.15, -16.464, -13.975, -10.827, -17.257, -18.932, -15.589, -40.385, -18.487, -14.221, 1.164, -12.466, -5.398, -40.994, -0.752, -5.702, 12.001, 10.257, -35.896, -0.159, -5.749, -20.26, -23.03, 28.947, 26.019, 24.446, -8.47, 24.25, 10.691, -3.33, 2.283, -1.795, 9.354, -14.616, 12.23, 0.696, -31.1

In [21]:
!python submit_job_to_cluster.py --project_id=w261-222623 --zone=us-central1-b --cluster_name=featengcluster --gcs_bucket=w261_final_project --key_file=$HOME/w261.json --create_new_cluster --pyspark_file=featureEngineering.py --instance_type=n1-standard-4 --worker_nodes=2

Traceback (most recent call last):
  File "/anaconda3/lib/python3.5/site-packages/googleapiclient/discovery_cache/__init__.py", line 36, in autodetect
    from google.appengine.api import memcache
ImportError: No module named 'google.appengine'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.5/site-packages/googleapiclient/discovery_cache/file_cache.py", line 33, in <module>
    from oauth2client.contrib.locked_file import LockedFile
ImportError: No module named 'oauth2client.contrib.locked_file'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.5/site-packages/googleapiclient/discovery_cache/file_cache.py", line 37, in <module>
    from oauth2client.locked_file import LockedFile
ImportError: No module named 'oauth2client.locked_file'

During handling of the above exception, another exception occurred:

Traceback (

### Application of Course Concepts