# What am I doing here?

- GBT model
- Setting up doing three runs, can change to three different models for a voting classifier
- Every time I checkpoint a step to a file, it's in an 'if False' block. If you need to
create a file, change that to True to make the file. Then change it back to False to 
get the faster way through the notebook. 

In [248]:
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [249]:
# This is optional stuff - either pip install watermark
# or just comment it out (it just keeps track of what library
# versions I have)
%load_ext watermark
%watermark -iv

The watermark extension is already loaded. To reload it, use:
  %reload_ext watermark
pyspark 2.4.1



In [250]:
# Comment these out to run on a cluster. Also, adjust memory to size of your laptop
pyspark.sql.SparkSession.builder.config('spark.driver.memory', '8g')
pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.paritions', 5)

<pyspark.sql.session.SparkSession.Builder at 0x116dc5470>

In [251]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# Global Variables 

In [252]:
unigrams = [ 'os', 'channel', 'app' ]
bigrams = [[ 'device', 'os'], 
           ['device', 'channel'], 
           ['device', 'app'], 
           ['channel', 'app']]

# Checkpoint 1 

Read the csv file, drop the attributed_time (because I didn't use it in the MVP),
and downsample the 0 class to 25% because I'm still on my laptop 

In [253]:
if False:
    df = spark.read.csv('../data/train.csv', 
                    header=True, inferSchema=True)

    df = df.drop('attributed_time')
    df = df.sampleBy('is_attributed', fractions={0:.25,1:1.})
    
    test = spark.read.csv('../data/test.csv', 
                         header= True, inferSchema=True)

    df.write.parquet('../data/checkpoint1.parquet', mode='overwrite')
    test.write.parquet('../data/test_checkpoint1.parquet', mode='overwrite')
else:
    df = spark.read.parquet('../data/checkpoint1.parquet')
    test = spark.read.parquet('../data/test_checkpoint1.parquet')

In [204]:
df.dtypes

[('ip', 'int'),
 ('app', 'int'),
 ('device', 'int'),
 ('os', 'int'),
 ('channel', 'int'),
 ('click_time', 'timestamp'),
 ('is_attributed', 'int')]

In [205]:
test.dtypes

[('click_id', 'int'),
 ('ip', 'int'),
 ('app', 'int'),
 ('device', 'int'),
 ('os', 'int'),
 ('channel', 'int'),
 ('click_time', 'timestamp')]

In [206]:
df.count()

46575441

In [207]:

test.count()

18790469

# Daily IP prevalence 
Because IP addresses get reassigned, need to do these as feature engineering on train and test
sets separately.
(See the link Elyse posted on the slack.)

In [254]:
df = df.withColumn('doy', 
                   F.dayofyear('click_time'))
test = test.withColumn('doy',
                   F.dayofyear('click_time'))

In [255]:
df_ip_counts  = df[['doy', 'ip']].groupby(['doy','ip']).count()
test_ip_counts = test[['doy', 'ip']].groupby(['doy', 'ip']).count()

In [256]:
df_day_max = df_ip_counts[['doy','count']]\
                .groupby(['doy'])\
                .max()\
                .withColumnRenamed('max(count)', 'day_max')\
                .drop('max(doy)')
test_day_max = test_ip_counts[['doy','count']]\
                .groupby(['doy'])\
                .max()\
                .withColumnRenamed('max(count)', 'day_max')\
                .drop('max(doy)')

In [257]:
df_ip_counts = df_ip_counts.join(df_day_max,
                                ['doy'],
                                how='left')
test_ip_counts = test_ip_counts.join(test_day_max,
                                ['doy'],
                                how='left')

In [258]:
df_ip_counts.dtypes

[('doy', 'int'), ('ip', 'int'), ('count', 'bigint'), ('day_max', 'bigint')]

In [259]:
df_ip_counts = df_ip_counts.withColumn('ip_pct',
                F.col('count').astype(T.FloatType())/
                F.col('day_max').astype(T.FloatType()))

test_ip_counts = test_ip_counts.withColumn('ip_pct',
                F.col('count').astype(T.FloatType())/
                F.col('day_max').astype(T.FloatType()))

In [260]:
df = df.join(
    df_ip_counts[['doy','ip','ip_pct']],
    on=['doy','ip'],
    how='left'
)
test = test.join(
    test_ip_counts[['doy','ip','ip_pct']],
    on=['doy','ip'],
    how='left'
)

## Same class balancing as MVP
Still hacky - but I reordered it so that the join happens on a
smaller table.
And, now there are three versions to stack.

In [261]:
class1_a = df.filter(df.is_attributed == 1).sample(
    withReplacement=True, fraction=4.0, seed=111)
class1_b = df.filter(df.is_attributed == 1).sample(
    withReplacement=True, fraction=4.0, seed=222)
class1_c = df.filter(df.is_attributed == 1).sample(
    withReplacement=True, fraction=4.0, seed=333)

In [262]:
df_a = df.sampleBy('is_attributed', {0:.11}, seed=111).unionAll(class1_a)
df_b = df.sampleBy('is_attributed', {0:.11}, seed=222).unionAll(class1_b)
df_c = df.sampleBy('is_attributed', {0:.11}, seed=333).unionAll(class1_c)

## Counting 

Built count tables except for IP with the full training set rather than the 
subset. Results here.

In [263]:
def get_count_table( group ):
    if type(group) == str:
        column_name = group + '_pct' # for example: ip_pct
    else:
        column_name = "_".join(group)  # for example: device_os
        
    table_name = 'table_' + column_name
    counts_sdf = spark.read.parquet(f'../data/{table_name}.parquet')
    return counts_sdf

In [264]:
def join_table( sdf, count_table, group ):
    sdf = sdf.join(count_table, group, how='left')
    return sdf

In [265]:
# create the count columns with the training data 
# write everything out to disk so we don't have to redo 
# feature engineering when all I want to do is tune hyperparameters
if False:
    for c in unigrams:
        ct   = get_count_table( c )
        df_a   = join_table(df_a, ct, [c])
        df_b   = join_table(df_b, ct, [c])
        df_c   = join_table(df_c, ct, [c])
        test = join_table(test, ct, [c])
    
    for bigram in bigrams:
        ct = get_count_table( bigram )
        df_a = join_table(df_a, ct, bigram)
        df_b = join_table(df_b, ct, bigram)
        df_c = join_table(df_c, ct, bigram)
        test = join_table(test, ct, bigram)
        
    df_a.write.parquet('../data/dfa.parquet', mode='overwrite')
    df_b.write.parquet('../data/dfb.parquet', mode='overwrite')
    df_c.write.parquet('../data/dfc.parquet', mode='overwrite')
    test.write.parquet('../data/test_stack.parquet', mode='overwrite')
else:
    df_a = spark.read.parquet('../data/dfa.parquet')
    df_b = spark.read.parquet('../data/dfb.parquet')
    df_c = spark.read.parquet('../data/dfc.parquet')
    test = spark.read.parquet('../data/test_stack.parquet')
    

In [266]:
df_a = df_a.fillna(0) 
df_b = df_b.fillna(0) 
df_c = df_c.fillna(0) 

In [267]:
test = test.fillna(0)

In [268]:
for sdf in [ df_a, df_b, df_c ]:
    sdf.groupby('is_attributed').count().show()

+-------------+-------+
|is_attributed|  count|
+-------------+-------+
|            1|1829362|
|            0|5072930|
+-------------+-------+

+-------------+-------+
|is_attributed|  count|
+-------------+-------+
|            1|1825928|
|            0|5072188|
+-------------+-------+

+-------------+-------+
|is_attributed|  count|
+-------------+-------+
|            1|1827785|
|            0|5074657|
+-------------+-------+



In [269]:
test.count()

18790469

# Last minute model tweak - add hour column

In [270]:
def add_hour(sdf):
    return sdf.withColumn('hour',
                  (F.hour('click_time').astype(T.FloatType()) + 
                   (F.minute('click_time').astype(T.FloatType()) / 60.)) / 24. )

test = add_hour(test)
df_a = add_hour(df_a)
df_b = add_hour(df_b)
df_c = add_hour(df_c)

# Create model data in format expected by Spark

In [271]:
input_cols = [ c + '_pct' for c in unigrams ]
input_cols += [ '_'.join(b) for b in bigrams ]
input_cols += ['ip_pct', 'hour']

In [272]:
input_cols

['os_pct',
 'channel_pct',
 'app_pct',
 'device_os',
 'device_channel',
 'device_app',
 'channel_app',
 'ip_pct',
 'hour']

In [273]:
vec_assembler = VectorAssembler(inputCols=input_cols, outputCol = 'features')
evaluator = BinaryClassificationEvaluator(labelCol = 'is_attributed')
    

In [274]:
model_a = vec_assembler.transform(df_a).select('is_attributed', 'features')
model_b = vec_assembler.transform(df_b).select('is_attributed', 'features')
model_c = vec_assembler.transform(df_c).select('is_attributed', 'features')

# GBT Classifier

In [281]:
gbtc = GBTClassifier(
    labelCol = 'is_attributed',
)

# Preparting for future hyperparameter tuning
pg = ParamGridBuilder(
       ).addGrid(
                gbtc.maxDepth, [ 10 ]
       ).addGrid(
                gbtc.subsamplingRate, [ .8  ]
       ).addGrid(
                gbtc.featureSubsetStrategy, [ '6' ] 
       ).addGrid(
                gbtc.maxBins, [ 64 ]
       ).addGrid(
                gbtc.stepSize, [ .2 ]
       ).addGrid(
                gbtc.maxIter, [ 30 ]
       ).build(
       )

tvs = TrainValidationSplit(
        estimator = gbtc,
        estimatorParamMaps = pg,
        evaluator = evaluator,
        trainRatio = .8
    )

In [282]:
tvs_a = tvs.fit(model_a)
results_a = tvs_a.transform(model_a)
evaluator.evaluate(results_a)

0.9766076828232535

In [284]:
tvs_a.bestModel.extractParamMap()

{Param(parent='GBTClassifier_eb9adceae1b5', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='GBTClassifier_eb9adceae1b5', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='GBTClassifier_eb9adceae1b5', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): '5',
 Param(parent='GBTClassifier_eb9adceae1b5', name='featuresCol', doc='features column name'): 'features',
 Param(parent='GBTClassifier_eb9adceae1b5', name='labelCol', doc='label column name'): 'is_a

In [172]:
tvs_b = tvs.fit(model_b)
results_b = tvs_b.transform(model_b)
evaluator.evaluate(results_b)

0.9702896606200668

In [173]:
tvs_c = tvs.fit(model_c)
results_c = tvs_c.transform(model_c)
evaluator.evaluate(results_c)

0.9704599902977329

# Let's bring the test set in here

In [174]:
test_model = vec_assembler.transform(test)

In [175]:
results_a = tvs_a.transform(test_model)
results_b = tvs_b.transform(test_model)
results_c = tvs_c.transform(test_model)

In [176]:
def get_prediction(sdf):
    sdf = sdf.select('click_id', 
                       F.col('prediction').astype(T.ShortType()), 
                       'probability')
    sdf.groupby('prediction').count().show()
    return sdf

In [177]:
results_a = get_prediction(results_a)
results_b = get_prediction(results_b)
results_c = get_prediction(results_c)

+----------+--------+
|prediction|   count|
+----------+--------+
|         1|  561267|
|         0|18229202|
+----------+--------+

+----------+--------+
|prediction|   count|
+----------+--------+
|         1|  555951|
|         0|18234518|
+----------+--------+

+----------+--------+
|prediction|   count|
+----------+--------+
|         1|  657822|
|         0|18132647|
+----------+--------+



# Extract probabilities 

In [179]:
mySchema = T.StructType([
    T.StructField('click_id', T.IntegerType()),
    T.StructField('prediction', T.ShortType()),
    T.StructField('pclass1', T.FloatType())
])

def save_stuff(x):
    return T.Row(click_id=x.click_id, 
                prediction=x.prediction, 
                pclass1=float(x.probability[1]))

vec_a = results_a.rdd.map(lambda x: save_stuff(x)).toDF(schema=mySchema)
vec_b = results_b.rdd.map(lambda x: save_stuff(x)).toDF(schema=mySchema)
vec_c = results_c.rdd.map(lambda x: save_stuff(x)).toDF(schema=mySchema)

# Take the median of the three models as my final answer

In [180]:
vec_a = vec_a.select('click_id', 
                      F.col('pclass1').alias('vec_a') )
vec_b = vec_b.select('click_id', 
                      F.col('pclass1').alias('vec_b') )
vec_c = vec_c.select('click_id', 
                      F.col('pclass1').alias('vec_c') )

joined = vec_a.join(vec_b, ['click_id']).join(vec_c, ['click_id'])

mySchema = T.StructType([
    T.StructField('click_id', T.IntegerType()),
    T.StructField('is_attributed', T.FloatType())
])

from statistics import median
def get_predict(x):
    return T.Row(click_id=x.click_id,
                is_attributed=median([x.vec_a, x.vec_b, x.vec_c]))

joined = joined.rdd.map(lambda x: get_predict(x)).toDF(schema=mySchema)

In [181]:
joined.write.csv('../data/vote_results.csv', mode='overwrite')

In [245]:
spark.stop()