In [1]:
# Imports 
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import time
import itertools

from pyspark.sql import SQLContext
from pyspark.sql import types
from pyspark.sql.functions import isnan, when, count, col, rand
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, StandardScaler, VectorIndexer, Normalizer, SQLTransformer, RFormula
from pyspark.sql.functions import udf, avg, col, struct
from pyspark.sql.types import FloatType
from pyspark.ml.linalg import VectorUDT
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

%matplotlib inline
plt.style.use('ggplot')

sc.setCheckpointDir("/data/")

# Feature Selection
In the absence of information about what each of these columns are, we can run test Logistic Regressions against each of them to get a sense of whether and how much they affect our predictions.

In [4]:
# Pull the balanced train/test data from the bucket
trainset = "gs://w261-final-hoky/data/cross_validation_data_set/balanced_set_1/train/"
testset = "gs://w261-final-hoky/data/cross_validation_data_set/balanced_set_1/test/"
train_df = spark.read.option("header", "false").parquet(trainset)
test_df = spark.read.option("header", "false").parquet(testset)
train_df = train_df.sample(.1, False, seed=261)
test_df = test_df.sample(.1, False, seed=261)
train_df.cache()
test_df.cache()

DataFrame[label: int, daypart: int, I1Scaled: double, I2Scaled: double, I3Scaled: double, I4Scaled: double, I5Scaled: double, I6Scaled: double, I7Scaled: double, I8Scaled: double, I9Scaled: double, I10Scaled: double, I11Scaled: double, I12Scaled: double, I13Scaled: double, I1BlankInd: int, I2BlankInd: int, I3BlankInd: int, I4BlankInd: int, I5BlankInd: int, I6BlankInd: int, I7BlankInd: int, I8BlankInd: int, I9BlankInd: int, I10BlankInd: int, I11BlankInd: int, I12BlankInd: int, I13BlankInd: int, C1: string, C2: string, C3: string, C4: string, C5: string, C6: string, C7: string, C8: string, C9: string, C10: string, C11: string, C12: string, C13: string, C14: string, C15: string, C16: string, C17: string, C18: string, C19: string, C20: string, C21: string, C22: string, C22BlankInd: int, C23: string, C24: string, C25: string, C26: string]

In [3]:
train_df.count()

187806

In [25]:
train_df.take(1)

[Row(label=0, daypart=2, I1Scaled=0.0, I2Scaled=-0.276854459264811, I3Scaled=0.0, I4Scaled=0.0, I5Scaled=-0.26700788528012814, I6Scaled=0.0, I7Scaled=0.0, I8Scaled=-0.7568641453123016, I9Scaled=0.0, I10Scaled=0.0, I11Scaled=0.0, I12Scaled=0.0, I13Scaled=0.0, I1BlankInd=1, I2BlankInd=0, I3BlankInd=1, I4BlankInd=1, I5BlankInd=0, I6BlankInd=1, I7BlankInd=1, I8BlankInd=0, I9BlankInd=1, I10BlankInd=1, I11lankInd=1, I12BlankInd=1, I13BlankInd=1, C1='05db9164', C2='bc6e3dc1', C3='67799c69', C4='d00d0f35', C5='25c83c98', C6='fe6b92e5', C7='b1ff5115', C8='0b153874', C9='a73ee510', C10='3b08e48b', C11='c708d1a1', C12='b9f28c33', C13='86dc4b63', C14='cfef1c29', C15='8b9021f6', C16='0f655650', C17='776ce399', C18='3a2028fd', C19='C19_no_value', C20='C20_no_value', C21='b426bc93', C22='C22_no_value', C22BlankInd=1, C23='32c7478e', C24='2e0a0035', C25='C25_no_value', C26='C26_no_value')]

Next we create a series of functions that will allow us to run regressions with various column inputs and report back scores for accuracy, log loss, etc.

In [10]:
def runRegression(num_feats, cat_feats, train, test):
    ''' Runs features through a pipeline into a Logistic Regression
    Args:
        num_feats - list of column names for numeric features
        cat_feats - list of column names for categorical features
        train - dataframe for training
        test - dataframe for testing
    Output:
        predictions - test dataframe with column of predictions from the logistic regression
    '''
    # Pipeline step 1: one hot encoding for the categorical variables
    stages = []
    for c in cat_feats:
        # cast each record in in categorical column c to an index
        stridx = StringIndexer(inputCol=c, outputCol = c + "idx").setHandleInvalid("keep")
        # one hot encode the indexed categorical column
        encoder = OneHotEncoderEstimator(inputCols=[stridx.getOutputCol()], outputCols=[c + "classVec"]).setDropLast(False)
        stages += [stridx, encoder]

    # Pipeline step 3: index the label column
    label_stridx = StringIndexer(inputCol="label", outputCol="label_transformed").setHandleInvalid('skip')
    stages += [label_stridx]

    # Pipeline step 4: put all features into one column as type of vector
    assembler_inputs = [c + "classVec" for c in cat_feats] + [n for n in num_feats]
    assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
    stages += [assembler]

    # Add the model to the pipeline
    lr = LogisticRegression(featuresCol='features', labelCol ='label_transformed', maxIter=4, regParam=.05)
    stages += [lr]

    # fit the pipeline to do the series of fit/transform defined in stages
    pipeline = Pipeline(stages=stages)

    # Train the model
    pipelineModel = pipeline.fit(train)

    # Make predictions
    predictions_df = pipelineModel.transform(test)
    
    #evaluator = MulticlassClassificationEvaluator(labelCol='label_transformed', metricName='accuracy')
    #accuracy = evaluator.evaluate(predictions_df)
       
    return predictions_df

def computeLogLoss(prob, label):
    ''' Calculates the log loss for a single observation
    Args:
        prob- float, a probability between 0 and 1
        label- integer, a label that is either 0 or 1
    Output:
        logloss- float, the log loss value for the single observation
    '''
    prob = prob[int(label)]
    # for the special case when prob=0 or 1, need a small value to avoid log(0)
    eps = 10e-14
    if prob == 0:
        prob += eps
    if prob == 1:
        prob -= eps
    try:
        logloss = -label * np.log(prob) - (1 - label) * np.log(1-prob)
    except Exception as e:
        logloss = prob
    return logloss

def returnMetrics(predictions_df):
    # Score predictions
    loglossUDF = udf(lambda x: float(computeLogLoss(x[0], x[1])), returnType=FloatType())
    newdf = predictions_df.withColumn('logloss', loglossUDF(struct('probability', 'label_transformed')))
    logloss = newdf.select(avg(col("logloss"))).collect()[0]['avg(logloss)']
    
    evaluator = MulticlassClassificationEvaluator(labelCol='label_transformed', metricName='accuracy')
    accuracy = evaluator.evaluate(predictions_df)
    return (accuracy, logloss)

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print('Precision of True ', metrics.precision(1))
    print('Precision of False', metrics.precision(0))
    print('Recall of True    ', metrics.recall(1))
    print('Recall of False   ', metrics.recall(0))
    #print('Weighted F-1 Score', metrics.weightedFMeasure())
    print('Average F-1 Score ', (metrics.fMeasure(label=0.0)+metrics.fMeasure(label=1.0))/2)
    print('Confusion Matrix\n', metrics.confusionMatrix().toArray())

With these functions in place, we can get a baseline of what kind of accuracy we would get if we fed ALL the data through the logistic regression.  We'd also learn how long such a task would take.

In [12]:
# Test a single regression run with ALL columns on a subset of 10% of data:
cat_feats = ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21', 'C22', 'C23', 'C24', 'C25', 'C26']
num_feats = ['daypart', 'I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 'I9Scaled', 'I10Scaled', 'I11Scaled', 'I12Scaled']
num_feats += ['I1BlankInd', 'I2BlankInd', 'I3BlankInd', 'I4BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 'I8BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd']
start = time.time()
predictions_df = runRegression(num_feats, cat_feats, train_df, test_df)
score = returnMetrics(predictions_df)
print("Accuracy:", score[0])
print("Log Loss: ", score[1])
print(f"... Single regression run in {time.time() - start} seconds")

Accuracy: 0.6851745996536162
Log Loss:  0.9483030295010226
... Single regression run in 1567.0638630390167 seconds


That takes a very long time, and I can't even run it on the full dataset without breaking Spark.  Clearly we need to prune down the list of features, but which ones to use?  We have no idea what any of these columns are.  Using the data from the EDA, we can hand-pick a set of columns that look like they add value:  categorical columns that are not too homogenous, empty, or contain too many unique values, and numerical columns that don't correlate too strongly with each other.

In [20]:
# Test a single regression run with HANDPICKED columns on 100% of data:
cat_feats = ['daypart', "C1", "C5", "C6", "C8", "C9", "C10", "C14", "C17", "C19", "C20", "C22", "C23", "C25"]
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 'I9Scaled', 'I10Scaled', 'I11Scaled', 'I12Scaled']
num_feats += ['I1BlankInd', 'I2BlankInd', 'I3BlankInd', 'I4BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 'I8BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11lankInd', 'I12BlankInd']
start = time.time()
predictions_df = runRegression(num_feats, cat_feats, train_df, test_df)
score = returnMetrics(predictions_df)
print("Accuracy:", score[0])
print("Log Loss: ", score[1])
print(f"... Single regression run in {time.time() - start} seconds")

Accuracy: 0.5786332029913465
Log Loss:  1.1673509932195787
... Single regression run in 110.77921342849731 seconds


This is much better, but can we improve it even further?  Why not build a set of functions to loop through all the features, and eliminate the ones that suppress accuracy one by one?

In [5]:
def catfilter(cats, nums, train_df, test_df, score_to_beat):
    removal_list = []
    winner_cats = cats
    current_score = score_to_beat
    score_list = []
    while True:
        print(f'Beginning round with cat features: {winner_cats}')
        for c in winner_cats:
            trial_cats = winner_cats.copy()
            trial_cats.remove(c)
            score = runRegression(nums, trial_cats, train_df, test_df)
            print(f'{c}: {score}')
            score_list.append(score) # Accuracy
        # Identify the index of the lowest score
        best = score_list.index(max(score_list))
        if score_list[best] > current_score:
            print(f'The best score of {score_list[best]} comes from removing {winner_cats[best]}.')
            current_score = score_list[best]
            removal_list.append(winner_cats[best])
            del winner_cats[best]
            score_list = []
        else:
            return (winner_cats, removal_list)
            break
            
def numfilter(cats, nums, df_train, df_test, score_to_beat):
    removal_list = []
    winner_nums = nums
    current_score = score_to_beat
    score_list = []
    while True:
        print(f'Beginning round with num features: {winner_nums}')
        for c in winner_nums:
            trial_nums = winner_nums.copy()
            trial_nums.remove(c)
            score = runRegression(trial_nums, cats, df_train, df_test)
            print(f'{c}: {score}')
            score_list.append(score)
        # Identify the index of the lowest score
        best = score_list.index(max(score_list))
        if score_list[best] > current_score:
            print(f'The best score of {score_list[best]} comes from removing {winner_nums[best]}.')
            current_score = score_list[best]
            removal_list.append(winner_nums[best])
            del winner_nums[best]
            score_list = []
        else:
            return (winner_nums, removal_list)
            break

In [13]:
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 
             'I8Scaled', 'I9Scaled', 'I10Scaled', 'I11Scaled', 'I12Scaled', 'I13Scaled']
num_feats += ['I1BlankInd', 'I2BlankInd', 'I3BlankInd', 'I4BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 
              'I8BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11lankInd', 'I12BlankInd', 'I13BlankInd', 'C22BlankInd']
cat_feats = ['C1', 'C2', 'C4', 'C5', 'C6', 'C7', 'C8', 'C10', 'C11', 'C12', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C24', 'C26']
numfilter(cat_feats, num_feats, train_df, test_df, .3)

Beginning round with num features: ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 'I9Scaled', 'I10Scaled', 'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I2BlankInd', 'I3BlankInd', 'I4BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 'I8BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11lankInd', 'I12BlankInd', 'I13BlankInd', 'C22BlankInd']
I1Scaled: 0.6380278982333967
I2Scaled: 0.6385192147480401
I3Scaled: 0.6530023711362228
I4Scaled: 0.6405272039818
I5Scaled: 0.6397581868284451
I6Scaled: 0.6397154636532587
I7Scaled: 0.6387114690363788
I8Scaled: 0.6534082413004935
I9Scaled: 0.6395018477773268
I10Scaled: 0.6613333902975669
I11Scaled: 0.6386473842735992
I12Scaled: 0.6527246704975114
I13Scaled: 0.6394377630145472
I1BlankInd: 0.6453121996026745
I2BlankInd: 0.6530023711362228
I3BlankInd: 0.6392882319013949
I4BlankInd: 0.6392668703138017
I5BlankInd: 0.6529596479610364
I6BlankInd: 0.6496913250592784
I7BlankInd: 0.6527673936726978
I8BlankIn

(['I1Scaled',
  'I2Scaled',
  'I3Scaled',
  'I4Scaled',
  'I5Scaled',
  'I6Scaled',
  'I7Scaled',
  'I8Scaled',
  'I11Scaled',
  'I12Scaled',
  'I13Scaled',
  'I1BlankInd',
  'I2BlankInd',
  'I4BlankInd',
  'I5BlankInd',
  'I7BlankInd',
  'I9BlankInd',
  'I10BlankInd',
  'I11lankInd',
  'I12BlankInd',
  'I13BlankInd',
  'C22BlankInd'],
 ['I10Scaled', 'I6BlankInd', 'I3BlankInd', 'I9Scaled', 'I8BlankInd'])

From this, we derive that we can trim an additional four columns from our handpicked set and still get essentially the same result.  If we start from the whole array of features and winnow down, we determine that the optimal combination of features are:

**Categorical:** ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']

**Numeric:** ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']

So with an algorithmically-designed set of features, what sort of scores do we see?

In [8]:
# Test a single regression run with OPTIMIZED columns on 100% of the data:
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']
start = time.time()
predictions_df = runRegression(num_feats, cat_feats, train_df, test_df)
score = returnMetrics(predictions_df)
print("Accuracy:", score[0])
print("Log Loss: ", score[1])
print(f"... Single regression run in {time.time() - start} seconds")

Accuracy: 0.6910073962852925
Log Loss:  0.8567510902837625
... Single regression run in 406.1963346004486 seconds


In [5]:
# Test a single regression run with OPTIMIZED columns on 100% of the data:
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']
start = time.time()
predictions_df = runRegression(num_feats, cat_feats, train_df, test_df)
score = returnMetrics(predictions_df)
print("Accuracy:", score[0])
print("Log Loss: ", score[1])
print(f"... Single regression run in {time.time() - start} seconds")

Accuracy: 0.6855601115022973
Log Loss:  0.8717612380033435
... Single regression run in 177.25854587554932 seconds


In [9]:
results_rdd = predictions_df.select(["prediction", "label_transformed"]).rdd
metrics = MulticlassMetrics(results_rdd)
print('Precision of True ', metrics.precision(1))
print('Precision of False', metrics.precision(0))
print('Recall of True    ', metrics.recall(1))
print('Recall of False   ', metrics.recall(0))
print('Weighted F-1 Score', metrics.weightedFMeasure())
print('Average F-1 Score ', (metrics.fMeasure(label=0.0)+metrics.fMeasure(label=1.0))/2)
print('Confusion Matrix\n', metrics.confusionMatrix().toArray())

Precision of True  0.7016657830854656
Precision of False 0.671935608236229
Recall of True     0.6440407386222969
Recall of False    0.7269593196966214
Weighted F-1 Score 0.685011774438858
Average F-1 Score  0.684992394542645
Confusion Matrix
 [[170802.  64152.]
 [ 83392. 150882.]]


Better accuracy than the features we picked by hand, slightly better to what we saw with all columns on a 10% subset of the data, and ran on 100% of the data in 1/10th the time.

## Interactions
These are the highest-value features taken independently, but is there additional information that can be derived by looking at them in combination?  For example, one of the numericals might be number of times this user has visited this site, while one of the categoricals might be the referring URL, and perhaps heavy users of THIS site who just came from THAT site are more likely to click on a particular ad.  In order to examine that, we would need an interaction term.

The trick comes in determining which fields to interact.  Interacting everything on everything else would be an exponentiation of features, and most of those combinations would make no sense.  So first we can do an exploration to see whether we can find any instances of this that contribute to accuracy.

In [4]:
def runRegressionInter(num, cat, train, test, inter=None, interwith=None):
    ''' Runs features through a pipeline into a Logistic Regression
    Args:
        num_feats - list of column names for numeric features
        cat_feats - list of column names for categorical features
        train - dataframe for training
        test - dataframe for testing
    Output:
        predictions - test dataframe with column of predictions from the logistic regression
    '''
    # Pipeline step 1: one hot encoding for the categorical variables
    stages = []
    for c in cat:
        # cast each record in in categorical column c to an index
        stridx = StringIndexer(inputCol=c, outputCol = c + "idx").setHandleInvalid("keep")
        # one hot encode the indexed categorical column
        encoder = OneHotEncoderEstimator(inputCols=[stridx.getOutputCol()], outputCols=[c + "classVec"]).setDropLast(False)
        stages += [stridx, encoder]

    # Pipeline step 3: index the label column
    label_stridx = StringIndexer(inputCol="label", outputCol="label_transformed").setHandleInvalid('skip')
    stages += [label_stridx]

    # Pipeline step 4: add interactions, and store in a features vector as type of sparse vector

    # Get string to indicate all terms we want in our function
    assembler_inputs = [c + "classVec" for c in cat] + [n for n in num]
    formula_string='label ~ ' + (' + ').join(assembler_inputs)
    
    if inter is not None:
        inter = inter+"classVec" if inter[0]=="C" and inter!="C22Bin" else inter
        interwith = interwith+"classVec" if interwith[0]=="C" and interwith!="C22Bin" else interwith
        formula_string += ' + ' + inter + ':' + interwith
        
    #print(formula_string)
    
    # Create formula for the function
    formula = RFormula(formula=formula_string, featuresCol='features', labelCol='label')
    stages +=[formula]

    # Add the model to the pipeline
    lr = LogisticRegression(featuresCol='features', labelCol ='label_transformed', maxIter=4, regParam=.05, elasticNetParam=0, standardization=True)
    stages += [lr]

    # fit the pipeline to do the series of fit/transform defined in stages
    pipeline = Pipeline(stages=stages)

    # Train the model
    pipelineModel = pipeline.fit(train)

    # Make predictions
    predictions_df = pipelineModel.transform(test)

    evaluator = MulticlassClassificationEvaluator(labelCol='label_transformed', metricName='accuracy')
    accuracy = evaluator.evaluate(predictions_df)
    
    return accuracy

In [5]:
cat_feats = ['daypart', 'C1', 'C2', 'C4', 'C5', 'C6', 'C7', 'C8', 'C10', 'C11', 'C12', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C24', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled']
small_feats = ['daypart', 'C5', 'C6', 'C8', 'C14', 'C17', 'C20', 'C22'] + num_feats
num_feats += ['I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 'I7BlankInd', 'I9BlankInd', 
              'I10BlankInd', 'I11lankInd', 'I12BlankInd', 'C22BlankInd']
start = time.time()
baseline = runRegressionInter(num_feats, cat_feats, train_df, test_df)
print("Baseline with no interactions:", baseline)
for pair in itertools.combinations([i for i in range(len(small_feats))], 2):
    score = runRegressionInter(num_feats, cat_feats, train_df, test_df, small_feats[pair[0]], small_feats[pair[1]])
    if score-baseline > .001:
        print(f'Interacting {small_feats[pair[0]]} with {small_feats[pair[1]]}: {score}, net change: {score-baseline}')

Baseline with no interactions: 0.6610770512464487
Interacting C5 with C17: 0.6660115779804755, net change: 0.004934526734026834
Interacting C6 with C17: 0.6674000811740328, net change: 0.006323029927584156
Interacting C8 with C17: 0.6663533633819666, net change: 0.005276312135517935
Interacting C14 with C17: 0.6669087646593895, net change: 0.005831713412940864
Interacting C17 with C20: 0.6668874030717963, net change: 0.005810351825347615
Interacting C17 with C22: 0.6655416230534253, net change: 0.0044645718069766804


In [13]:
def runRegressionWithInteractions(num, cat, train, test, inters=[]):
    ''' Runs features through a pipeline into a Logistic Regression
    Args:
        num_feats - list of column names for numeric features
        cat_feats - list of column names for categorical features
        train - dataframe for training
        test - dataframe for testing
    Output:
        predictions - test dataframe with column of predictions from the logistic regression
    '''
    # Pipeline step 1: one hot encoding for the categorical variables
    stages = []
    for c in cat:
        # cast each record in in categorical column c to an index
        stridx = StringIndexer(inputCol=c, outputCol = c + "idx").setHandleInvalid("keep")
        # one hot encode the indexed categorical column
        encoder = OneHotEncoderEstimator(inputCols=[stridx.getOutputCol()], outputCols=[c + "classVec"]).setDropLast(False)
        stages += [stridx, encoder]

    # Pipeline step 3: index the label column
    label_stridx = StringIndexer(inputCol="label", outputCol="label_transformed").setHandleInvalid('skip')
    stages += [label_stridx]

    # Pipeline step 4: add interactions, and store in a features vector as type of sparse vector

    # Get string to indicate all terms we want in our function
    assembler_inputs = [c + "classVec" for c in cat] + [n for n in num]
    formula_string='label ~ ' + (' + ').join(assembler_inputs)
    
    if inters:
        for inter in inters:
            i1 = inter[0]+"classVec" if inter[0][0]=="C" and inter[0]!="C22Bin" else inter[0]
            i2 = inter[1]+"classVec" if inter[1][0]=="C" and inter[1]!="C22Bin" else inter[1]
            formula_string += ' + ' + i1 + ':' + i2
        
    #print(formula_string)
    
    # Create formula for the function
    formula = RFormula(formula=formula_string, featuresCol='features', labelCol='label')
    stages +=[formula]

    # Add the model to the pipeline
    lr = LogisticRegression(featuresCol='features', labelCol ='label_transformed', maxIter=4, regParam=.05, elasticNetParam=0, standardization=True)
    stages += [lr]

    # fit the pipeline to do the series of fit/transform defined in stages
    pipeline = Pipeline(stages=stages)

    # Train the model
    pipelineModel = pipeline.fit(train)

    # Make predictions
    predictions_df = pipelineModel.transform(test)
   
    return predictions_df

In [21]:
# Test a single regression run with OPTIMIZED columns and INTERACTIONS on 100% of the data:
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11lankInd', 'I12BlankInd', 'C22BlankInd']
interactions = [('C6', 'C17')]
start = time.time()
predictions_df = runRegressionWithInteractions(num_feats, cat_feats, train_df, test_df, interactions)
score = returnMetrics(predictions_df)
print("Accuracy:", score[0])
print("Log Loss: ", score[1])
print(f"... Single regression run in {time.time() - start} seconds")

Accuracy: 0.6908240349089844
Log Loss:  0.8542615411319706
... Single regression run in 346.16791677474976 seconds


In [None]:
interactions = [('C17', 'I1Scaled'), ('C17', 'I11Scaled')]
for inter in range(len(interactions)):
    newlist = interactions.copy()
    del newlist[inter]
    predictions_df = runRegressionWithInteractions(num_feats, cat_feats, train_df, test_df, newlist)
    score = returnMetrics(predictions_df)
    print(f'Without {interactions[inter]}, score is: {score[0]}')

In [16]:
# Run Cross-Validated model evaluation with UNBALANCED training data on UNBALANCED test
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']
interactions = [('C6', 'C17')]
results = []
start = time.time()
for CV_set in range(1, 6):
    trainset = "gs://w261-final-hoky/data/cross_validation_data_set/set_"+str(CV_set)+"/train/"
    testset = "gs://w261-final-hoky/data/cross_validation_data_set/set_"+str(CV_set)+"/test/"
    train_df = spark.read.option("header", "false").parquet(trainset)
    test_df = spark.read.option("header", "false").parquet(testset)
    train_df.cache()
    test_df.cache()
    
    predictions_df = runRegressionWithInteractions(num_feats, cat_feats, train_df, test_df, interactions)
    score = returnMetrics(predictions_df)
    print(f"Accuracy for set {CV_set}: {score[0]}")
    print(f"Log Loss for set {CV_set}: {score[1]}\n")
    
    results = results.union(predictions_df) if results else predictions_df

# Convert to RDD to print complete metrics
results_rdd = results.select(["prediction", "label_transformed"]).rdd
printMetrics(results_rdd)
print(f"... Total Cross-Validated run in {time.time() - start} seconds")

Accuracy for set 1: 0.7670514045618169
Log Loss for set 1: 1.6144608416904505

Accuracy for set 2: 0.7673359297507747
Log Loss for set 2: 1.614008899716377

Accuracy for set 3: 0.7673559240105644
Log Loss for set 3: 1.6150568149390079

Accuracy for set 4: 0.7671941681527592
Log Loss for set 4: 1.6143941711082783

Accuracy for set 5: 0.7670995347960388
Log Loss for set 5: 1.6140322826341726

Precision of True  0.6591472606579001
Precision of False 0.7757394945551667
Recall of True     0.1883584041300187
Recall of False    0.9664702416615917
Average F-1 Score  0.5768279967541731
Confusion Matrix
 [[32951915.  1143201.]
 [ 9526153.  2210743.]]
... Total Cross-Validated run in 2256.473196029663 seconds


In [5]:
# Run Cross-Validated model evaluation with BALANCED training data on BALANCED test
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']
interactions = [('C6', 'C17')]
results = []
start = time.time()
for CV_set in range(1, 6):
    trainset = "gs://w261-final-hoky/data/cross_validation_data_set/balanced_set_"+str(CV_set)+"/train/"
    testset = "gs://w261-final-hoky/data/cross_validation_data_set/balanced_set_"+str(CV_set)+"/test/"
    train_df = spark.read.option("header", "false").parquet(trainset)
    test_df = spark.read.option("header", "false").parquet(testset)
    train_df.cache()
    test_df.cache()
    
    predictions_df = runRegressionWithInteractions(num_feats, cat_feats, train_df, test_df, interactions)
    score = returnMetrics(predictions_df)
    print(f"Accuracy for set {CV_set}: {score[0]}")
    print(f"Log Loss for set {CV_set}: {score[1]}\n")
    
    results = results.union(predictions_df) if results else predictions_df

# Convert to RDD to print complete metrics
results_rdd = results.select(["prediction", "label_transformed"]).rdd
printMetrics(results_rdd)
print(f"... Total Cross-Validated run in {time.time() - start} seconds")

Accuracy for set 1: 0.6910082479503615
Log Loss for set 1: 0.8536549623519875

Accuracy for set 2: 0.6909831978966476
Log Loss for set 2: 0.8538127788145157

Accuracy for set 3: 0.6904710851853004
Log Loss for set 3: 0.8550057711334526

Accuracy for set 4: 0.6906207365968081
Log Loss for set 4: 0.8522126511312598

Accuracy for set 5: 0.6905541717786448
Log Loss for set 5: 0.8530239266314799

Precision of True  0.7069914704921786
Precision of False 0.6768331871934024
Recall of True     0.6514409772396381
Recall of False    0.7300140514152975
F-1 Score          0.6907275143274678
Confusion Matrix
 [[8568099. 3168797.]
 [4091001. 7645895.]]
... Total Cross-Validated run in 1480.1901302337646 seconds


In [10]:
# Run Cross-Validated model evaluation with BALANCED training data on UNBALANCED test
cat_feats = ['daypart', 'C1', 'C2', 'C5', 'C6', 'C7', 'C8', 'C11', 'C13', 
             'C14', 'C15', 'C17', 'C18', 'C19', 'C20', 'C22', 'C26']
num_feats = ['I1Scaled', 'I2Scaled', 'I3Scaled', 'I4Scaled', 'I5Scaled', 'I6Scaled', 'I7Scaled', 'I8Scaled', 
             'I11Scaled', 'I12Scaled', 'I13Scaled', 'I1BlankInd', 'I3BlankInd', 'I5BlankInd', 'I6BlankInd', 
             'I7BlankInd', 'I9BlankInd', 'I10BlankInd', 'I11BlankInd', 'I12BlankInd', 'C22BlankInd']
interactions = [('C6', 'C17')]
results = []
start = time.time()
for CV_set in range(1, 6):
    trainset = "gs://w261-final-hoky/data/cross_validation_data_set/balanced_set_"+str(CV_set)+"/train/"
    testset = "gs://w261-final-hoky/data/cross_validation_data_set/set_"+str(CV_set)+"/test/"
    train_df = spark.read.option("header", "false").parquet(trainset)
    test_df = spark.read.option("header", "false").parquet(testset)
    train_df.cache()
    test_df.cache()
    
    predictions_df = runRegressionWithInteractions(num_feats, cat_feats, train_df, test_df, interactions)
    score = returnMetrics(predictions_df)
    print(f"Accuracy for set {CV_set}: {score[0]}")
    print(f"Log Loss for set {CV_set}: {score[1]}\n")
    
    results = results.union(predictions_df) if results else predictions_df

# Convert to RDD to print complete metrics
results_rdd = results.select(["prediction", "label_transformed"]).rdd
printMetrics(results_rdd)
print(f"... Total Cross-Validated run in {time.time() - start} seconds")

Accuracy for set 1: 0.7083156590806539
Log Loss for set 1: 0.9629934231484193

Accuracy for set 2: 0.7092321173412258
Log Loss for set 2: 0.9628230923929499

Accuracy for set 3: 0.7093607588686803
Log Loss for set 3: 0.9646067583893108

Accuracy for set 4: 0.7073435429573942
Log Loss for set 4: 0.9617774045314033

Accuracy for set 5: 0.7078682330126151
Log Loss for set 5: 0.9619883839616282

Precision of True  0.45192762249587815
Precision of False 0.8585094851192203
Recall of True     0.6514409772396381
Recall of False    0.7280399046009992
F-1 Score          0.7084240595852523
Confusion Matrix
 [[24822605.  9272511.]
 [ 4091001.  7645895.]]
... Total Cross-Validated run in 1587.3708455562592 seconds
