# Data Loading and Feature Engineering

## Overview

This notebook downloads, prepares, and transforms the ipenyou dataset.  This example uses dask for parallel data loading.  However, you could use a number of different techniques to load and transform the datasets.  We'll also add features to the dataset to be used for downstream machine learning

#### iPinYou

The iPinYou Global RTB(Real-Time Bidding) Bidding Algorithm Competition is organized by iPinYou (http://www.ipinyou.com) from April 1st, 2013 to December 31st, 2013. 

The competition has been divided into three seasons. For each season, a training dataset is released to the competition participants, the testing dataset is reserved by iPinYou. The complete testing dataset is randomly divided into two parts: one part is the leaderboard testing dataset to score and rank the participating teams on the leaderboard, and the other part is reserved for the final offline evaluation. 

We will be using the second season of iPinYou.  The training dataset includes a set of processed iPinYou DSP bidding, impression, click, and conversion logs.  We will be using the impression and click datasets.  The impression data assumes the bidder won the ad and the click dataset includes which ads were clicks.  Our goal will be to predict when a user will click the ad. 

Let's get started!  First, let's update our python libraries

In [338]:
%pip install numpy --upgrade
%pip install dask

You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [339]:
# python library imports

import pandas as pd
import glob
import numpy as np
import dask.dataframe as dd
import gc
import sys
import numpy as np
import pyarrow
from sklearn.model_selection import train_test_split
import sagemaker
import os
import boto3

In [341]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker.Session().default_bucket()

prefix = 'ipinyou'
os.environ["AWS_REGION"] = region

print(f'Region : {region}')
print(f'IAM Role : {role}')
print(f'S3 Bucket : {bucket_name}')

# Get the current installed version of Sagemaker SDK, TensorFlow, Python, Boto3 and SMDebug
print(f'SageMaker Python SDK version : {sagemaker.__version__}')
print(f'Python version : {sys.version}')

Region : us-east-1
IAM Role : arn:aws:iam::431615879134:role/sagemaker-test-role
S3 Bucket : sagemaker-us-east-1-431615879134
SageMaker Python SDK version : 2.44.0
Python version : 3.7.10 (default, Jun  7 2021, 20:35:03) 
[GCC 7.5.0]


## Data Download

In [251]:
# copy data from s3 to local EFS in SageMaker Studio, 3.4 GB of data

# replace s3 path with path to season 2 ipin you data.  Dataset location - https://figshare.com/articles/dataset/ipinyou_contest_dataset_season2/5732328/1 
s3_path = '<enter s3 path>'
!aws s3 cp {s3_path} ./data/ipinyou.zip

download: s3://sengstacken-temp/ipinyou.contest.datasetseason2.zip to data/ipinyou.zip


In [254]:
# unzip dataset
!unzip -o ./data/ipinyou.zip -d ./data/

Archive:  ./data/ipinyou.zip
 extracting: ./data/ipinyou.contest.dataset-season2/algo.submission.demo.tar.bz2  
  inflating: ./data/ipinyou.contest.dataset-season2/city.cn.txt  
  inflating: ./data/ipinyou.contest.dataset-season2/city.en.txt  
  inflating: ./data/ipinyou.contest.dataset-season2/files.md5  
  inflating: ./data/ipinyou.contest.dataset-season2/known.data.bugs.txt  
  inflating: ./data/ipinyou.contest.dataset-season2/README  
  inflating: ./data/ipinyou.contest.dataset-season2/README.old  
  inflating: ./data/ipinyou.contest.dataset-season2/region.cn.txt  
  inflating: ./data/ipinyou.contest.dataset-season2/region.en.txt  
 extracting: ./data/ipinyou.contest.dataset-season2/testing2nd/leaderboard.test.data.20130613_15.txt.bz2  
 extracting: ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130606.txt.bz2  
 extracting: ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130607.txt.bz2  
 extracting: ./data/ipinyou.contest.dataset-season2/training2nd/bid.2013060

The files in the dataset are plain text, *.txt.bz2 or *.tar.bz2 files. With bunzip2 and tar Linux or Mac command, they can be uncompressed easily into plain text:  
           bunzip2 *.txt.bz2
           tar xvjf *.tar.bz2

In [255]:
# convert all data to txt files from bz2 files
!bzip2 -vd ./data/ipinyou.contest.dataset-season2/training2nd/*.bz2
!bzip2 -vd ./data/ipinyou.contest.dataset-season2/testing2nd/*.bz2

bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130606.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130607.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130608.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130609.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130610.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130611.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/bid.20130612.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/clk.20130606.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/clk.20130607.txt already exists.
bzip2: Output file ./data/ipinyou.contest.dataset-season2/training2nd/clk

In [None]:
# push back to s3 as txt files, just 
!aws s3 cp ./data/ipinyou.contest.dataset-season2/ s3://sengstacken-temp/ipinyou-season2/ --recursive

## Data Loading

Now that we've downloaded the data, let's use dask and pandas to process the data

In [342]:
# define columns
ad_columns = ['BidID','Timestamp','Log Type','iPinYou ID','User-Agent','IP','Region','City','Ad Exchange','Domain','URL','Anonymous URL ID','Ad slot ID','Ad slot width','Ad slot height','Ad slot visibility','Ad slot format','Ad slot floor price','Creative ID','Bidding price','Paying price','Key page URL','Advertiser ID','User Tags']
ad_columns

['BidID',
 'Timestamp',
 'Log Type',
 'iPinYou ID',
 'User-Agent',
 'IP',
 'Region',
 'City',
 'Ad Exchange',
 'Domain',
 'URL',
 'Anonymous URL ID',
 'Ad slot ID',
 'Ad slot width',
 'Ad slot height',
 'Ad slot visibility',
 'Ad slot format',
 'Ad slot floor price',
 'Creative ID',
 'Bidding price',
 'Paying price',
 'Key page URL',
 'Advertiser ID',
 'User Tags']

Let's take the impression data and join it with the click data.  We'll use the BidID to match clicks and impressions.  We'll add a new prediction column called 'clicks'

In [343]:
# read impression data
ddf_imp = dd.read_csv('./data/ipinyou.contest.dataset-season2/training2nd/i*.txt',sep='\t',header=0,names=ad_columns)
# read click data
ddf_clicks = dd.read_csv('./data/ipinyou.contest.dataset-season2/training2nd/clk*.txt',sep='\t',header=0,names=ad_columns)

In [344]:
print(f'A total of {len(ddf_imp)} impressions in the impression training dataset')

A total of 12237142 impressions in the impression training dataset


In [345]:
# get the bid id's for each click in the dataset
clickids = ddf_clicks['BidID'].unique().compute()
print(f'Total number of clicks in the dataset - {len(clickids)}')

Total number of clicks in the dataset - 8832


Since there are multiple advertisers in this dataset, let's take just one of the ad Id's for our analysis.  One could imagine that you don't downsample here and include the 'advertiser ID'.  This ID corresponds to a Software Advertiser per - https://arxiv.org/pdf/1407.7073.pdf

In [346]:
%%time
# Now, let's only get the impression data for a single Advertiser ID.  
df = ddf_imp[ddf_imp['Advertiser ID']==3358].compute()

CPU times: user 1min 23s, sys: 6.78 s, total: 1min 30s
Wall time: 44.9 s


In [347]:
# create new boolean column for click 
df['clicks']=df['BidID'].isin(clickids)

In [348]:
print(f"Total click through rate for ID 3358 is {(sum(df['clicks'].values))} / {len(df)} or {(sum(df['clicks'].values))/len(df):.6f}")

Total click through rate for ID 3358 is 1358 / 1742104 or 0.000780


#### Save to intermediate parquet file

In [349]:
df.to_parquet('dftest.gzip',compression='gzip')

In [282]:
df = pd.read_parquet('dftest.gzip', engine='pyarrow')

## Feature Engineering

In [350]:
# drop unused columns
df.drop(columns=['User-Agent', 'BidID', 'Log Type', 'iPinYou ID', 'Domain','URL','Ad slot ID', 'Anonymous URL ID', 'Creative ID', 'Bidding price', 'Paying price', 'Key page URL', 'Ad slot floor price'],inplace=True)

In [351]:
# convert timestamp
df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%Y%m%d%H%M%S%f')

In [352]:
df.head()

Unnamed: 0,Timestamp,IP,Region,City,Ad Exchange,Ad slot width,Ad slot height,Ad slot visibility,Ad slot format,Advertiser ID,User Tags,clicks
0,2013-06-06 00:01:04.252,222.220.35.*,308,320,2,336,280,2,0,3358,1380010024,False
1,2013-06-06 00:01:04.253,58.100.240.*,94,95,1,950,90,0,1,3358,10059138661006310111,False
3,2013-06-06 00:01:04.329,58.67.157.*,216,217,1,300,250,0,5,3358,1386610111,False
5,2013-06-06 00:01:04.399,60.222.234.*,15,23,2,300,250,2,0,3358,NaN,False
12,2013-06-06 00:01:04.463,118.182.244.*,344,350,1,336,280,2,1,3358,1000610110,False


Let's take the timestamp information and generate features from it.  

In [353]:
import re

def make_date(df, date_field):
    "Make sure `df[date_field]` is of the right date type."
    field_dtype = df[date_field].dtype
    if isinstance(field_dtype, pd.core.dtypes.dtypes.DatetimeTZDtype):
        field_dtype = np.datetime64
    if not np.issubdtype(field_dtype, np.datetime64):
        df[date_field] = pd.to_datetime(df[date_field], infer_datetime_format=True)

def add_datepart(df, field_name, drop=True, time=False):
    "Helper function that adds columns relevant to a date in the column `field_name` of `df`."
    make_date(df, field_name)
    field = df[field_name]
    prefix = re.sub('[Dd]ate$', '', field_name)
    attr = ['Year', 'Month', 'Week', 'Day', 'Dayofweek', 'Dayofyear', 'Is_month_end', 'Is_month_start',
            'Is_quarter_end', 'Is_quarter_start', 'Is_year_end', 'Is_year_start']
    if time: attr = attr + ['Hour', 'Minute', 'Second']
    # Pandas removed `dt.week` in v1.1.10
    week = field.dt.isocalendar().week.astype(field.dt.day.dtype) if hasattr(field.dt, 'isocalendar') else field.dt.week
    for n in attr: df[prefix + n] = getattr(field.dt, n.lower()) if n != 'Week' else week
    mask = ~field.isna()
    df[prefix + 'Elapsed'] = np.where(mask,field.values.astype(np.int64) // 10 ** 9,np.nan)
    if drop: df.drop(field_name, axis=1, inplace=True)
    return df

In [354]:
df = add_datepart(df, 'Timestamp', time=True)

In [355]:
# create one hot encoded features from the user tags
_ = df['User Tags'].str.get_dummies(sep=',')

In [356]:
# add them back to the dataframe
df = pd.concat([df, _], axis=1)

In [357]:
df.head()

Unnamed: 0,IP,Region,City,Ad Exchange,Ad slot width,Ad slot height,Ad slot visibility,Ad slot format,Advertiser ID,User Tags,...,13678,13776,13800,13866,13874,14273,16593,16617,16661,16706
0,222.220.35.*,308,320,2,336,280,2,0,3358,1380010024,...,0,0,1,0,0,0,0,0,0,0
1,58.100.240.*,94,95,1,950,90,0,1,3358,10059138661006310111,...,0,0,0,1,0,0,0,0,0,0
3,58.67.157.*,216,217,1,300,250,0,5,3358,1386610111,...,0,0,0,1,0,0,0,0,0,0
5,60.222.234.*,15,23,2,300,250,2,0,3358,NaN,...,0,0,0,0,0,0,0,0,0,0
12,118.182.244.*,344,350,1,336,280,2,1,3358,1000610110,...,0,0,0,0,0,0,0,0,0,0


In [358]:
# create 3 features from the IP address
_ = df['IP'].str.split(pat='.',expand=True)

In [359]:
df[['ip1','ip2','ip3']]= _.rename(columns={0:'ip1',1:'ip2',2:'ip3'}).drop(columns=[3])

In [360]:
df.drop(columns=['IP','User Tags'],inplace=True)

In [361]:
df['ip1'] = df['ip1'].astype('int')
df['ip2'] = df['ip2'].astype('int')
df['ip3'] = df['ip3'].astype('int')

In [362]:
# one hot encode adex, advis, adfmt
df = pd.get_dummies(df,prefix='adex',columns=['Ad Exchange'])
df = pd.get_dummies(df,prefix='advis',columns=['Ad slot visibility'])
df = pd.get_dummies(df,prefix='adfmt',columns=['Ad slot format'])

In [363]:
df.head()

Unnamed: 0,Region,City,Ad slot width,Ad slot height,Advertiser ID,clicks,TimestampYear,TimestampMonth,TimestampWeek,TimestampDay,...,adex_1,adex_2,adex_3,advis_0,advis_1,advis_2,advis_255,adfmt_0,adfmt_1,adfmt_5
0,308,320,336,280,3358,False,2013,6,23,6,...,0,1,0,0,0,1,0,1,0,0
1,94,95,950,90,3358,False,2013,6,23,6,...,1,0,0,1,0,0,0,0,1,0
3,216,217,300,250,3358,False,2013,6,23,6,...,1,0,0,1,0,0,0,0,0,1
5,15,23,300,250,3358,False,2013,6,23,6,...,0,1,0,0,0,1,0,1,0,0
12,344,350,336,280,3358,False,2013,6,23,6,...,1,0,0,0,0,1,0,0,1,0


In [364]:
# TODO
# consider one hot encoding Region and City

In [365]:
df['clicks'] = df['clicks']*1

In [366]:
df.drop(columns=['Advertiser ID', 'TimestampElapsed'],inplace=True)

In [367]:
# move clicks to the first column in the dataframe
clicks = df.pop('clicks')
df.insert(0,'clicks',clicks)

In [368]:
display(df)

Unnamed: 0,clicks,Region,City,Ad slot width,Ad slot height,TimestampYear,TimestampMonth,TimestampWeek,TimestampDay,TimestampDayofweek,...,adex_1,adex_2,adex_3,advis_0,advis_1,advis_2,advis_255,adfmt_0,adfmt_1,adfmt_5
0,0,308,320,336,280,2013,6,23,6,3,...,0,1,0,0,0,1,0,1,0,0
1,0,94,95,950,90,2013,6,23,6,3,...,1,0,0,1,0,0,0,0,1,0
3,0,216,217,300,250,2013,6,23,6,3,...,1,0,0,1,0,0,0,0,0,1
5,0,15,23,300,250,2013,6,23,6,3,...,0,1,0,0,0,1,0,1,0,0
12,0,344,350,336,280,2013,6,23,6,3,...,1,0,0,0,0,1,0,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
184880,0,3,12,1000,90,2013,6,24,12,2,...,0,0,1,1,0,0,0,1,0,0
184883,0,80,83,1000,90,2013,6,24,12,2,...,0,0,1,1,0,0,0,1,0,0
184884,0,40,44,1000,90,2013,6,24,12,2,...,0,0,1,1,0,0,0,1,0,0
10069,0,65,66,300,250,2013,6,24,12,2,...,0,1,0,1,0,0,0,1,0,0


#### Save to intermediate parquet file

In [319]:
df.to_parquet('dfclean.gzip',compression='gzip')

In [320]:
df = pd.read_parquet('dfclean.gzip', engine='pyarrow')

In [381]:
df = pd.read_parquet('dfclean.gzip')

In [385]:
df = df.dropna()

In [386]:
len(df)

1742104

#### Save off test / train datasets, stratify on clicks

In [387]:
train, test = train_test_split(df,test_size=0.1, random_state = 4321, stratify=df['clicks'])

In [374]:
train = train*1

In [375]:
test = test*1

In [388]:
train.to_csv('train.csv',index=False,header=False, encoding='utf-8')

In [389]:
test.to_csv('test.csv',index=False,header=False, encoding='utf-8')

In [390]:
test.to_csv()

In [391]:
p = pd.read_csv('test.csv')

In [324]:
# collect python garbage
gc.collect()

118

## Convert to tfrecords

References:
* https://www.srijan.net/resources/blog/building-a-high-performance-data-pipeline-with-tensorflow
* https://keras.io/examples/keras_recipes/creating_tfrecords/


In [325]:
import tensorflow as tf

In [326]:
df = pd.read_parquet('dfclean.gzip', engine='pyarrow')

In [327]:
train, test = train_test_split(df,test_size=0.1, random_state = 4321, stratify=df['clicks'])

In [328]:
test

Unnamed: 0,clicks,Region,City,Ad slot width,Ad slot height,TimestampYear,TimestampMonth,TimestampWeek,TimestampDay,TimestampDayofweek,...,adex_1,adex_2,adex_3,advis_0,advis_1,advis_2,advis_255,adfmt_0,adfmt_1,adfmt_5
171975,0,106,111,300,250,2013,6,23,8,5,...,1,0,0,0,0,1,0,0,1,0
68713,0,146,150,1000,90,2013,6,24,10,0,...,0,0,1,1,0,0,0,1,0,0
69655,0,298,301,950,90,2013,6,24,12,2,...,1,0,0,0,0,0,1,0,1,0
106790,0,183,190,728,90,2013,6,24,10,0,...,0,1,0,1,0,0,0,1,0,0
51126,0,276,277,728,90,2013,6,23,6,3,...,0,1,0,0,0,1,0,1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
109160,0,80,86,1000,90,2013,6,24,12,2,...,0,0,1,1,0,0,0,1,0,0
101613,0,80,85,728,90,2013,6,23,6,3,...,0,1,0,0,0,1,0,1,0,0
96757,0,134,138,300,250,2013,6,23,9,6,...,1,0,0,0,0,1,0,0,1,0
167038,0,146,148,300,250,2013,6,24,10,0,...,1,0,0,1,0,0,0,0,1,0


In [329]:
def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [330]:
def create_example(example):
    
    feature = {
        'clicks': _int64_feature(example['clicks']),
        'region': _int64_feature(example['Region']),
        'city': _int64_feature(example['City']),
        'adslotwidth': _int64_feature(example['Ad slot width']),
        'adslotheight': _int64_feature(example['Ad slot height']),
        'timestampyear': _int64_feature(example['TimestampYear']),
        'timestampmonth': _int64_feature(example['TimestampMonth']),
        'timestampweek': _int64_feature(example['TimestampWeek']),
        'timestampday': _int64_feature(example['TimestampDay']),
        'timestampdayofweek': _int64_feature(example['TimestampDayofweek']),
        'timestampdayofyear': _int64_feature(example['TimestampDayofyear']),
        'timestampis_month_end': _int64_feature(example['TimestampIs_month_end']),
        'timestampis_month_start': _int64_feature(example['TimestampIs_month_start']),
        'timestampis_quarter_end': _int64_feature(example['TimestampIs_quarter_end']),
        'timestampis_quarter_start': _int64_feature(example['TimestampIs_quarter_start']),
        'timestampis_year_end': _int64_feature(example['TimestampIs_year_end']),
        'timestampis_year_start': _int64_feature(example['TimestampIs_year_start']),
        'timestamphour': _int64_feature(example['TimestampHour']),
        'timestampminute': _int64_feature(example['TimestampMinute']),
        'timestampsecond': _int64_feature(example['TimestampSecond']),
        '10006': _int64_feature(example['10006']),
        '10024': _int64_feature(example['10024']),
        '10031': _int64_feature(example['10031']),
        '10048': _int64_feature(example['10048']),
        '10052': _int64_feature(example['10052']),
        '10057': _int64_feature(example['10057']),
        '10059': _int64_feature(example['10059']),
        '10063': _int64_feature(example['10063']),
        '10067': _int64_feature(example['10067']),
        '10074': _int64_feature(example['10074']),
        '10075': _int64_feature(example['10075']),
        '10076': _int64_feature(example['10076']),
        '10077': _int64_feature(example['10077']),
        '10079': _int64_feature(example['10079']),
        '10083': _int64_feature(example['10083']),
        '10093': _int64_feature(example['10093']),
        '10102': _int64_feature(example['10102']),
        '10110': _int64_feature(example['10110']),
        '10111': _int64_feature(example['10111']),
        '10684': _int64_feature(example['10684']),
        '11092': _int64_feature(example['11092']),
        '11278': _int64_feature(example['11278']),
        '11379': _int64_feature(example['11379']),
        '11423': _int64_feature(example['11423']),
        '11512': _int64_feature(example['11512']),
        '11576': _int64_feature(example['11576']),
        '11632': _int64_feature(example['11632']),
        '11680': _int64_feature(example['11680']),
        '11724': _int64_feature(example['11724']),
        '11944': _int64_feature(example['11944']),
        '13042': _int64_feature(example['13042']),
        '13403': _int64_feature(example['13403']),
        '13496': _int64_feature(example['13496']),
        '13678': _int64_feature(example['13678']),
        '13776': _int64_feature(example['13776']),
        '13800': _int64_feature(example['13800']),
        '13866': _int64_feature(example['13866']),
        '13874': _int64_feature(example['13874']),
        '14273': _int64_feature(example['14273']),
        '16593': _int64_feature(example['16593']),
        '16617': _int64_feature(example['16617']),
        '16661': _int64_feature(example['16661']),
        '16706': _int64_feature(example['16706']),
        'ip1': _int64_feature(example['ip1']),
        'ip2': _int64_feature(example['ip2']),
        'ip3': _int64_feature(example['ip3']),
        'adex_1': _int64_feature(example['adex_1']),
        'adex_2': _int64_feature(example['adex_2']),
        'adex_3': _int64_feature(example['adex_3']),
        'advis_0': _int64_feature(example['advis_0']),
        'advis_1': _int64_feature(example['advis_1']),
        'advis_2': _int64_feature(example['advis_2']),
        'advis_255': _int64_feature(example['advis_255']),
        'adfmt_0': _int64_feature(example['adfmt_0']),
        'adfmt_1': _int64_feature(example['adfmt_1']),
        'adfmt_5': _int64_feature(example['adfmt_5']),
    }
    
    return tf.train.Example(features=tf.train.Features(feature=feature))


#### Convert Training Data to TFRecords

In [335]:
%%time

num_tfrecords = 30
split_df = np.array_split(train,num_tfrecords)
options=tf.io.TFRecordOptions(compression_type='ZLIB')

for i,temp_df in enumerate(split_df):
    print(f'Writing TF Record {i}')
    with tf.io.TFRecordWriter(f'./data/tfrecords/train/train_{i}-{num_tfrecords}.tfrec',options=options) as writer:
        for q,r in temp_df.iterrows():
            example = create_example(r)
            writer.write(example.SerializeToString())
            
print(f'A total of {num_tfrecords} TFRecord files were created.')
print(f'Each file contains {len(split_df[0])} records')

Writing TF Record 0
Writing TF Record 1
Writing TF Record 2
Writing TF Record 3
Writing TF Record 4
Writing TF Record 5
Writing TF Record 6
Writing TF Record 7
Writing TF Record 8
Writing TF Record 9
Writing TF Record 10
Writing TF Record 11
Writing TF Record 12
Writing TF Record 13
Writing TF Record 14
Writing TF Record 15
Writing TF Record 16
Writing TF Record 17
Writing TF Record 18
Writing TF Record 19
Writing TF Record 20
Writing TF Record 21
Writing TF Record 22
Writing TF Record 23
Writing TF Record 24
Writing TF Record 25
Writing TF Record 26
Writing TF Record 27
Writing TF Record 28
Writing TF Record 29
A total of 30 TFRecord files were created.
Each file contains 52264 records
CPU times: user 27min 40s, sys: 1.52 s, total: 27min 42s
Wall time: 27min 41s


In [337]:
!ls ./data/tfrecords/train -lh

total 264M
-rw-r--r-- 1 root root 8.8M Aug  9 20:04 train_0-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:05 train_1-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:14 train_10-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:14 train_11-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:15 train_12-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:16 train_13-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:17 train_14-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:18 train_15-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:19 train_16-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:20 train_17-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:21 train_18-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:22 train_19-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:06 train_2-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:23 train_20-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:24 train_21-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:25 train_22-30.tfrec
-rw-r--r-- 1 root root 8.8M Aug  9 20:25 train_2

In [334]:
%%time

num_tfrecords = 3
split_df = np.array_split(test,num_tfrecords)
options=tf.io.TFRecordOptions(compression_type='ZLIB')

for i,temp_df in enumerate(split_df):
    print(f'Writing TF Record {i}')
    with tf.io.TFRecordWriter(f'./data/tfrecords/test/test_{i}-{num_tfrecords}.tfrec',options=options) as writer:
        for q,r in temp_df.iterrows():
            example = create_example(r)
            writer.write(example.SerializeToString())
            
print(f'A total of {num_tfrecords} TFRecord files were created.')
print(f'Each file contains {len(split_df[0])} records')

Writing TF Record 0
Writing TF Record 1
Writing TF Record 2
A total of 3 TFRecord files were created.
Each file contains 58071 records
CPU times: user 3min 3s, sys: 206 ms, total: 3min 3s
Wall time: 3min 3s


In [336]:
!ls ./data/tfrecords/test -lh

total 30M
-rw-r--r-- 1 root root 9.8M Aug  9 20:01 test_0-3.tfrec
-rw-r--r-- 1 root root 9.8M Aug  9 20:02 test_1-3.tfrec
-rw-r--r-- 1 root root 9.8M Aug  9 20:03 test_2-3.tfrec


In [227]:
def decoder(example):
    feature_description = {
        'clicks': tf.io.FixedLenFeature([], tf.int64),
        'region': tf.io.FixedLenFeature([], tf.int64),
        'city': tf.io.FixedLenFeature([], tf.int64),
        'adslotwidth': tf.io.FixedLenFeature([], tf.int64),
        'adslotheight': tf.io.FixedLenFeature([], tf.int64),
        'timestampyear': tf.io.FixedLenFeature([], tf.int64),
        'timestampmonth': tf.io.FixedLenFeature([], tf.int64),
        'timestampweek': tf.io.FixedLenFeature([], tf.int64),
        'timestampday': tf.io.FixedLenFeature([], tf.int64),
        'timestampdayofweek': tf.io.FixedLenFeature([], tf.int64),
        'timestampdayofyear': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_month_end': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_month_start': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_quarter_end': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_quarter_start': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_year_end': tf.io.FixedLenFeature([], tf.int64),
        'timestampis_year_start': tf.io.FixedLenFeature([], tf.int64),
        'timestamphour': tf.io.FixedLenFeature([], tf.int64),
        'timestampminute': tf.io.FixedLenFeature([], tf.int64),
        'timestampsecond': tf.io.FixedLenFeature([], tf.int64),
        '10006': tf.io.FixedLenFeature([], tf.int64),
        '10024': tf.io.FixedLenFeature([], tf.int64),
        '10031': tf.io.FixedLenFeature([], tf.int64),
        '10048': tf.io.FixedLenFeature([], tf.int64),
        '10052': tf.io.FixedLenFeature([], tf.int64),
        '10057': tf.io.FixedLenFeature([], tf.int64),
        '10059': tf.io.FixedLenFeature([], tf.int64),
        '10063': tf.io.FixedLenFeature([], tf.int64),
        '10067': tf.io.FixedLenFeature([], tf.int64),
        '10074': tf.io.FixedLenFeature([], tf.int64),
        '10075': tf.io.FixedLenFeature([], tf.int64),
        '10076': tf.io.FixedLenFeature([], tf.int64),
        '10077': tf.io.FixedLenFeature([], tf.int64),
        '10079': tf.io.FixedLenFeature([], tf.int64),
        '10083': tf.io.FixedLenFeature([], tf.int64),
        '10093': tf.io.FixedLenFeature([], tf.int64),
        '10102': tf.io.FixedLenFeature([], tf.int64),
        '10110': tf.io.FixedLenFeature([], tf.int64),
        '10111': tf.io.FixedLenFeature([], tf.int64),
        '10684': tf.io.FixedLenFeature([], tf.int64),
        '11092': tf.io.FixedLenFeature([], tf.int64),
        '11278': tf.io.FixedLenFeature([], tf.int64),
        '11379': tf.io.FixedLenFeature([], tf.int64),
        '11423': tf.io.FixedLenFeature([], tf.int64),
        '11512': tf.io.FixedLenFeature([], tf.int64),
        '11576': tf.io.FixedLenFeature([], tf.int64),
        '11632': tf.io.FixedLenFeature([], tf.int64),
        '11680': tf.io.FixedLenFeature([], tf.int64),
        '11724': tf.io.FixedLenFeature([], tf.int64),
        '11944': tf.io.FixedLenFeature([], tf.int64),
        '13042': tf.io.FixedLenFeature([], tf.int64),
        '13403': tf.io.FixedLenFeature([], tf.int64),
        '13496': tf.io.FixedLenFeature([], tf.int64),
        '13678': tf.io.FixedLenFeature([], tf.int64),
        '13776': tf.io.FixedLenFeature([], tf.int64),
        '13800': tf.io.FixedLenFeature([], tf.int64),
        '13866': tf.io.FixedLenFeature([], tf.int64),
        '13874': tf.io.FixedLenFeature([], tf.int64),
        '14273': tf.io.FixedLenFeature([], tf.int64),
        '16593': tf.io.FixedLenFeature([], tf.int64),
        '16617': tf.io.FixedLenFeature([], tf.int64),
        '16661': tf.io.FixedLenFeature([], tf.int64),
        '16706': tf.io.FixedLenFeature([], tf.int64),
        'ip1': tf.io.FixedLenFeature([], tf.int64),
        'ip2': tf.io.FixedLenFeature([], tf.int64),
        'ip3': tf.io.FixedLenFeature([], tf.int64),
        'adex_1': tf.io.FixedLenFeature([], tf.int64),
        'adex_2': tf.io.FixedLenFeature([], tf.int64),
        'adex_3': tf.io.FixedLenFeature([], tf.int64),
        'advis_0': tf.io.FixedLenFeature([], tf.int64),
        'advis_1': tf.io.FixedLenFeature([], tf.int64),
        'advis_2': tf.io.FixedLenFeature([], tf.int64),
        'advis_255': tf.io.FixedLenFeature([], tf.int64),
        'adfmt_0': tf.io.FixedLenFeature([], tf.int64),
        'adfmt_1': tf.io.FixedLenFeature([], tf.int64),
        'adfmt_5': tf.io.FixedLenFeature([], tf.int64),
    }
    example = tf.io.parse_single_example(example, feature_description)
    return example

In [238]:
def prep(features):
    label = features.pop('clicks')
    return tf.stack([features[i] for i in features]), label

In [106]:
def load_data(data_dir):
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    filenames = tf.io.gfile.glob(f'{data_dir}/*.tfrec')
    dataset = (
        tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTOTUNE, compression_type='ZLIB')
        .map(decoder, num_parallel_calls=AUTOTUNE)
        .map(prep, num_parallel_calls=AUTOTUNE)
        .shuffle(args.batch_size * 10, seed=args.seed)
        .batch(args.batch_size)
        .prefetch(AUTOTUNE)
    )

    logger.info('Completed loading and preprocessing data.')
    return dataset

In [60]:
def get_dataset(filenames, batch_size):
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = False  # disable order, increase speed
    dataset = (
        tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTOTUNE)
        .with_options(ignore_order)
        .map(decoder, num_parallel_calls=AUTOTUNE)
        .shuffle(batch_size * 10)
        .batch(batch_size)
        .prefetch(AUTOTUNE)
    )
    
    return dataset

## convert to parquet

In [400]:
import boto3

#### TRAIN DATA

In [408]:
%%time
ath = boto3.client('athena')
# create the table with raw data
with open('train_data.ddl') as ddl:
    ath.start_query_execution(
        QueryString=ddl.read(),
        ResultConfiguration={'OutputLocation': 's3://sagemaker-us-east-1-431615879134/ipenyou-xgboost/data/queries/train/'})

CPU times: user 16.5 ms, sys: 0 ns, total: 16.5 ms
Wall time: 131 ms


In [409]:
%%time
# convert to parquet
with open('parquet_train.ddl') as ddl:
    ath.start_query_execution(
        QueryString=ddl.read(),
        ResultConfiguration={'OutputLocation': 's3://sagemaker-us-east-1-431615879134/ipenyou-xgboost/data/queries/train/'})

CPU times: user 3.83 ms, sys: 0 ns, total: 3.83 ms
Wall time: 90.5 ms


#### TEST DATA

In [406]:
%%time
ath = boto3.client('athena')
# create the table with raw data
with open('test_data.ddl') as ddl:
    ath.start_query_execution(
        QueryString=ddl.read(),
        ResultConfiguration={'OutputLocation': 's3://sagemaker-us-east-1-431615879134/ipinyou-tf/queries/'})

CPU times: user 16.6 ms, sys: 0 ns, total: 16.6 ms
Wall time: 136 ms


In [407]:
%%time
# convert to parquet
with open('parquet_test.ddl') as ddl:
    ath.start_query_execution(
        QueryString=ddl.read(),
        ResultConfiguration={'OutputLocation': 's3://sagemaker-us-east-1-431615879134/ipinyou-tf/queries/'})

CPU times: user 3.89 ms, sys: 0 ns, total: 3.89 ms
Wall time: 102 ms
