#  Feature Merge

Loads all the engineered feature tables, merges them into the datasets, and saves new datasets 
that can go straight to a spark mlib model

In [1]:
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
from itertools import combinations

In [3]:
%load_ext watermark
%watermark -iv

pyspark 2.4.3



In [4]:
# Comment these out to run on a cluster. Also, adjust memory to size of your laptop
pyspark.sql.SparkSession.builder.config('spark.driver.memory', '8g')
pyspark.sql.SparkSession.builder.config('spark.sql.shuffle.paritions', 5)

<pyspark.sql.session.SparkSession.Builder at 0x115488a58>

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

#  Code to load and merge the features

In [6]:
columns = [ 'device', 'os', 'channel', 'app' ]
bigrams = [ list(b) for b in combinations(columns,2)]

for ftype in [ 'df_', 'tgt_']:
    for c in columns:
        tmp = spark.read.parquet(f'../data/features/{ftype}{c}_pct.parquet')
        tmp.createOrReplaceTempView(f'{ftype}{c}_pct')
    
    for bigram in bigrams:
        b = '_'.join(bigram)
        tmp = spark.read.parquet(f'../data/features/{ftype}{b}.parquet')
        tmp.createOrReplaceTempView(f'{ftype}{b}')
        
tmp = spark.read.parquet('../data/features/ip_pct.parquet')
tmp.createOrReplaceTempView('ip_pct')

In [7]:
# The big-hairy SQL method is faster than merging one feature at a time because it gets
# optimized under the covers

In [8]:
big_hairy_sql = """
    SELECT {tname}.*, 
           df_device_pct.device_pct,
           df_os_pct.os_pct,
           df_channel_pct.channel_pct,
           df_app_pct.app_pct,
           tgt_device_pct.device_pct as tgt_device_pct,
           tgt_os_pct.os_pct as tgt_os_pct,
           tgt_channel_pct.channel_pct as tgt_channel_pct,
           tgt_app_pct.app_pct as tgt_app_pct,
           ip_pct.ip_pct,
           df_device_app.device_app, 
           df_device_channel.device_channel,
           df_device_os.device_os,
           df_os_app.os_app,
           df_os_channel.os_channel,
           df_channel_app.channel_app,
           tgt_device_app.device_app as tgt_device_app,
           tgt_device_channel.device_channel as tgt_device_channel,
           tgt_device_os.device_os as tgt_device_os,
           tgt_os_app.os_app as tgt_os_app,
           tgt_os_channel.os_channel as tgt_os_channel,
           tgt_channel_app.channel_app as tgt_channel_app
    FROM
           {tname}
    LEFT JOIN df_device_pct      ON {tname}.device = df_device_pct.device
    LEFT JOIN df_os_pct          ON {tname}.os = df_os_pct.os
    LEFT JOIN df_channel_pct     ON {tname}.channel = df_channel_pct.channel
    LEFT JOIN df_app_pct         ON {tname}.app = df_app_pct.app
    LEFT JOIN tgt_device_pct     ON {tname}.device = tgt_device_pct.device
    LEFT JOIN tgt_os_pct         ON {tname}.os = tgt_os_pct.os
    LEFT JOIN tgt_channel_pct    ON {tname}.channel = tgt_channel_pct.channel
    LEFT JOIN tgt_app_pct        ON {tname}.app = tgt_app_pct.app
    LEFT JOIN ip_pct             ON {tname}.ip = ip_pct.ip AND {tname}.doy = ip_pct.doy
    LEFT JOIN df_device_app      ON {tname}.device = df_device_app.device AND {tname}.app = df_device_app.app
    LEFT JOIN df_device_channel  ON {tname}.device = df_device_channel.device AND {tname}.channel = df_device_channel.channel
    LEFT JOIN df_device_os       ON {tname}.device = df_device_os.device AND {tname}.os = df_device_os.os
    LEFT JOIN df_os_app          ON {tname}.os = df_os_app.os AND {tname}.app = df_os_app.app
    LEFT JOIN df_os_channel      ON {tname}.os = df_os_channel.os AND {tname}.channel = df_os_channel.channel
    LEFT JOIN df_channel_app     ON {tname}.channel = df_channel_app.channel AND {tname}.app = df_channel_app.app
    LEFT JOIN tgt_device_app     ON {tname}.device = tgt_device_app.device AND {tname}.app = tgt_device_app.app
    LEFT JOIN tgt_device_channel ON {tname}.device = tgt_device_channel.device AND {tname}.channel = tgt_device_channel.channel
    LEFT JOIN tgt_device_os      ON {tname}.device = tgt_device_os.device AND {tname}.os = tgt_device_os.os
    LEFT JOIN tgt_os_app         ON {tname}.os = tgt_os_app.os AND {tname}.app = tgt_os_app.app
    LEFT JOIN tgt_os_channel     ON {tname}.os = tgt_os_channel.os AND {tname}.channel = tgt_os_channel.channel
    LEFT JOIN tgt_channel_app    ON {tname}.channel = tgt_channel_app.channel AND {tname}.app = tgt_channel_app.app
           
"""

In [9]:
def add_features(sdf, table_name='dataset'):
    sdf = sdf.withColumn('doy', F.dayofyear('click_time'))
    sdf.createOrReplaceTempView(table_name)
    merged = spark.sql(big_hairy_sql.format(tname=table_name))
    return merged.fillna(0) 

# Load, merge, and resave each of the data sets

In [49]:
fulldata = spark.read.parquet('../data/intermed/train.parquet')
feats = add_features(fulldata)
feats.write.parquet('../data/model/trainf.parquet')

In [50]:
class0 = spark.read.parquet('../data/intermed/train0_10pct.parquet')
feats = add_features(class0)
feats.write.parquet('../data/model/train0_10pctf.parquet')

In [51]:
class1 = spark.read.parquet('../data/intermed/train1.parquet')
feats = add_features(class1)
feats.write.parquet('../data/model/train1f.parquet')

In [52]:
mini = spark.read.parquet('../data/intermed/train_sample.parquet')
feats = add_features(mini)
feats.write.parquet('../data/model/train_samplef.parquet')

In [10]:
test = spark.read.parquet('../data/intermed/test.parquet')
feats = add_features(test)
feats.write.parquet('../data/model/testf.parquet')

In [53]:
# barrier so I don't hit return too many times and kill my spark session :-)
assert(0)

AssertionError: 

In [12]:
spark.stop()