In [None]:
#use dask to read the file
!python -m pip install "dask[complete]"

In [2]:
import dask.dataframe as dd

In [4]:
import pandas as pd
import os
from os import listdir
from numpy import loadtxt
import matplotlib.pyplot as plt
import torch
import pickle
import json
from sklearn.metrics import log_loss, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, MinMaxScaler

from drive.MyDrive.RCSYS_finalproject.DeepCTRTorch.deepctr_torch.inputs import SparseFeat, DenseFeat, get_feature_names
from drive.MyDrive.RCSYS_finalproject.DeepCTRTorch.deepctr_torch.models import *
from drive.MyDrive.RCSYS_finalproject.DeepCTRTorch.deepctr_torch.models.SExdeepfm import *
from drive.MyDrive.RCSYS_finalproject.DeepCTRTorch.deepctr_torch.inputs import VarLenSparseFeat

## **1. Download criteo dataset, unzip the file and load it to dask dataframe**

In [9]:
#download criteo
!wget http://go.criteo.net/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz

--2022-02-11 08:40:30--  http://go.criteo.net/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz
Resolving go.criteo.net (go.criteo.net)... 178.250.0.152
Connecting to go.criteo.net (go.criteo.net)|178.250.0.152|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://criteostorage.blob.core.windows.net/criteo-research-datasets/kaggle-display-advertising-challenge-dataset.tar.gz [following]
--2022-02-11 08:40:31--  https://criteostorage.blob.core.windows.net/criteo-research-datasets/kaggle-display-advertising-challenge-dataset.tar.gz
Resolving criteostorage.blob.core.windows.net (criteostorage.blob.core.windows.net)... 20.209.1.1
Connecting to criteostorage.blob.core.windows.net (criteostorage.blob.core.windows.net)|20.209.1.1|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4576820670 (4.3G) [application/x-gzip]
Saving to: ‘criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz.1’


2022-02-11 08:47:37

In [11]:
!tar -xf /content/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz

tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'
tar: Ignoring unknown extended header keyword 'LIBARCHIVE.creationtime'
tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'
tar: Ignoring unknown extended header keyword 'SCHILY.dev'
tar: Ignoring unknown extended header keyword 'SCHILY.ino'
tar: Ignoring unknown extended header keyword 'SCHILY.nlink'


In [12]:
column_names = ['label','interger1','interger2','interger3','interger4','interger5','interger6',
                'interger7','interger8','interger9','interger10','interger11','interger12','interger13',
                'categorical1','categorical2','categorical3','categorical4','categorical5','categorical6',
                'categorical7','categorical8','categorical9','categorical10','categorical11','categorical12',
                'categorical13','categorical14','categorical15','categorical16','categorical17','categorical18',
                'categorical19','categorical20','categorical21','categorical22','categorical23','categorical24',
                'categorical25','categorical26']

In [16]:
#load the dataset into dask dataframe
#df = dd.read_csv('/content/train.txt', sep='\t', names=column_names)
df = dd.read_csv('/content/test.txt', sep='\t', names=column_names)

In [17]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                  label interger1 interger2 interger3 interger4 interger5 interger6 interger7 interger8 interger9 interger10 interger11 interger12 interger13 categorical1 categorical2 categorical3 categorical4 categorical5 categorical6 categorical7 categorical8 categorical9 categorical10 categorical11 categorical12 categorical13 categorical14 categorical15 categorical16 categorical17 categorical18 categorical19 categorical20 categorical21 categorical22 categorical23 categorical24 categorical25 categorical26
npartitions=23                                                                                                                                                                                                                                                                                                                                                                                                                               

In [18]:
df.shape[0].compute()

6042135

In [None]:
## Replace Nulls by mean to improve logit
df['interger1'] = df.interger1.mask(df.interger1.isnull(), df.interger1.mean())
df['interger2'] = df.interger2.mask(df.interger2.isnull(), df.interger2.mean())
df['interger3'] = df.interger3.mask(df.interger3.isnull(), df.interger3.mean())
df['interger4'] = df.interger4.mask(df.interger4.isnull(), df.interger4.mean())
df['interger5'] = df.interger5.mask(df.interger5.isnull(), df.interger5.mean())
df['interger6'] = df.interger6.mask(df.interger6.isnull(), df.interger6.mean())
df['interger7'] = df.interger7.mask(df.interger7.isnull(), df.interger7.mean())
df['interger8'] = df.interger8.mask(df.interger8.isnull(), df.interger8.mean())
df['interger9'] = df.interger9.mask(df.interger9.isnull(), df.interger9.mean())
df['interger10'] = df.interger10.mask(df.interger10.isnull(), df.interger10.mean())
df['interger11'] = df.interger11.mask(df.interger11.isnull(), df.interger11.mean())
df['interger12'] = df.interger12.mask(df.interger12.isnull(), df.interger12.mean())
df['interger13'] = df.interger13.mask(df.interger13.isnull(), df.interger13.mean())

## **2. since Dask divide the dataset into many partition we need to merge them in steps due to limit RAM resources**

In [None]:
#save all the partitions of dask dataframe to pickles files
for i in range(0,175):
  print(i)
  partition = df.partitions[i]
  partition_pd = partition.compute()
  partition_pd.to_pickle('/content/partition_'+str(i)+'.pickle')

In [None]:
#merge each 10 files to one file
for i in range(0,169,10):
  df_list = []
  print(i)
  for j in range(i,i+10):
    df_list.append(pd.read_pickle('/content/partition_'+ str(j)+'.pickle'))
  pd.concat(df_list).to_pickle('/content/concat'+str(i)+'.pickle')

df_list = []
for i in range(170,175):
  df_list.append(pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/partition_'+ str(i)+'.pickle'))

pd.concat(df_list).to_pickle('/content/concat_170'+'.pickle')



In [None]:
from google.colab import  drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#again merge each two merged files into one file
df1 = pd.read_pickle('/content/concat160.pickle')
df2 = pd.read_pickle('/content/concat_170.pickle')

pd.concat([df1,df2]).to_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data9.pickle')

In [None]:
#Finally, we have 9 pickle files with the dataset - data1, data2, ...data9.
import pandas as pd

df1 = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data7.pickle')
df2 = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data8.pickle')
df3 = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data9.pickle')

In [None]:
#concat every three data files
pd.concat([df1,df2,df3]).to_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data7_9.pickle')

## **3. Now, we have data4_6.pickle file which contain 15M samples and we need to prepare it for training**

In [None]:
import pandas as pd

df1 = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/data4_6.pickle')

In [None]:
#sparse_features = ['C' + str(i) for i in range(1, 27)]
sparse_features = ['categorical' + str(i) for i in range(1, 27)]

#dense_features = ['I' + str(i) for i in range(1, 14)]
dense_features = ['interger' + str(i) for i in range(1, 14)]

df1[sparse_features] = df1[sparse_features].fillna('-1', )
df1[dense_features] = df1[dense_features].fillna(0, )

In [None]:
df1

In [None]:
data2 = pd.read_csv('/content/drive/MyDrive/RCSYS_finalproject/DeepCTRTorch/examples/criteo_sample.txt')
data2

Unnamed: 0,label,I1,I2,I3,I4,I5,I6,I7,I8,I9,I10,I11,I12,I13,C1,C2,C3,C4,C5,C6,C7,C8,C9,C10,C11,C12,C13,C14,C15,C16,C17,C18,C19,C20,C21,C22,C23,C24,C25,C26
0,0,,3,260.0,,17668.0,,,33.0,,,,0.0,,05db9164,08d6d899,9143c832,f56b7dd5,25c83c98,7e0ccccf,df5c2d18,0b153874,a73ee510,8f48ce11,a7b606c4,ae1bb660,eae197fd,b28479f6,bfef54b3,bad5ee18,e5ba7672,87c6f83c,,,0429f84b,,3a171ecb,c0d61a5c,,
1,0,,-1,19.0,35.0,30251.0,247.0,1.0,35.0,160.0,,1.0,,35.0,68fd1e64,04e09220,95e13fd4,a1e6a194,25c83c98,fe6b92e5,f819e175,062b5529,a73ee510,ab9456b4,6153cf57,8882c6cd,769a1844,b28479f6,69f825dd,23056e4f,d4bb7bd8,6fc84bfb,,,5155d8a3,,be7c41b4,ded4aac9,,
2,0,0.0,0,2.0,12.0,2013.0,164.0,6.0,35.0,523.0,0.0,3.0,,18.0,05db9164,38a947a1,3f55fb72,5de245c7,30903e74,7e0ccccf,b72ec13d,1f89b562,a73ee510,acce978c,3547565f,a5b0521a,12880350,b28479f6,c12fc269,95a8919c,e5ba7672,675c9258,,,2e01979f,,bcdee96c,6d5d1302,,
3,0,,13,1.0,4.0,16836.0,200.0,5.0,4.0,29.0,,2.0,,4.0,05db9164,8084ee93,02cf9876,c18be181,25c83c98,,e14874c9,0b153874,7cc72ec2,2462946f,636405ac,8fe001f4,31b42deb,07d13a8f,422c8577,36103458,e5ba7672,52e44668,,,e587c466,,32c7478e,3b183c5c,,
4,0,0.0,0,104.0,27.0,1990.0,142.0,4.0,32.0,37.0,0.0,1.0,,27.0,05db9164,207b2d81,5d076085,862b5ba0,25c83c98,fbad5c96,17c22666,0b153874,a73ee510,534fc986,feb49a68,f24b551c,8978af5c,64c94865,32ec6582,b6d021e8,e5ba7672,25c88e42,21ddcdc9,b1252a9d,0e8585d2,,32c7478e,0d4a6d1a,001f3601,92c878de
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
195,0,,0,113.0,3.0,3036.0,575.0,2.0,3.0,214.0,,1.0,,3.0,05db9164,0468d672,628b07b0,b63c0277,25c83c98,7e0ccccf,0d339a25,c8ddd494,a73ee510,1722d4c8,7d756b25,0c87b3e9,6f833c7a,1adce6ef,4f3b3616,48af915a,07c540c4,9880032b,21ddcdc9,5840adea,34cc61bb,c9d4222a,32c7478e,e5ed7da2,ea9a246c,984e0db0
196,1,0.0,1,1.0,1.0,1607.0,12.0,1.0,12.0,15.0,0.0,1.0,,12.0,be589b51,aa8fcc21,4255f8fd,7501d94a,25c83c98,fe6b92e5,0492c809,1f89b562,a73ee510,13ba96b0,ba0f9e8a,887a0c20,4e4dd817,07d13a8f,a4f91020,022714ba,1e88c74f,3972b4ed,,,d1aa4512,,32c7478e,9257f75f,,
197,1,1.0,0,6.0,3.0,0.0,0.0,19.0,3.0,3.0,1.0,9.0,0.0,0.0,05db9164,09e68b86,db151f8b,f1b645fc,25c83c98,,b87f4a4a,0b153874,a73ee510,e70742b0,319687c9,af6ad6b6,62036f49,f862f261,1dca7862,05a97a3c,3486227d,5aed7436,54591762,a458ea53,4a2c3526,,32c7478e,1793a828,e8b83407,1a02cbe1
198,0,0.0,22,6.0,22.0,203.0,153.0,80.0,18.0,508.0,0.0,11.0,0.0,22.0,05db9164,e5fb1af3,7e1ad1fe,46ec0a38,43b19349,7e0ccccf,24c48926,0b153874,a73ee510,afa26c81,9f0003f4,651d80c6,5afd9e51,07d13a8f,b5de5956,72401022,3486227d,13145934,55dd3565,5840adea,bf647035,,32c7478e,1481ceb4,e8b83407,988b0775


In [None]:
#change the column names to be equal to column names used by deepctr_torch API
data = df1
data.columns = data2.columns

In [None]:
sparse_features = ['C' + str(i) for i in range(1, 27)]
dense_features = ['I' + str(i) for i in range(1, 14)]

target = ['label']

#1.Label Encoding for sparse features,and do simple Transformation for dense features
for feat in sparse_features:
    lbe = LabelEncoder()
    data[feat] = lbe.fit_transform(data[feat])
mms = MinMaxScaler(feature_range=(0, 1))
data[dense_features] = mms.fit_transform(data[dense_features])

# 2.count #unique features for each sparse field,and record dense feature field name
fixlen_feature_columns = [SparseFeat(feat, data[feat].nunique())
                          for feat in sparse_features] + [DenseFeat(feat, 1, )
                                                          for feat in dense_features]

dnn_feature_columns = fixlen_feature_columns
linear_feature_columns = fixlen_feature_columns

feature_names = get_feature_names(
    linear_feature_columns + dnn_feature_columns)

train, test = train_test_split(data, test_size = 0.2)

## **4.Save our dataset into our drive.**

In [None]:
import pickle

# with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/dnn_feature_columns_criteo.p', 'wb') as fp:
#     pickle.dump(dnn_feature_columns, fp, protocol=pickle.HIGHEST_PROTOCOL)

with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/dnn_feature_columns_criteo.p', 'rb') as fp:
    dnn_feature_columns = pickle.load(fp)

# with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/linear_feature_columns_criteo.p', 'wb') as fp:
#     pickle.dump(linear_feature_columns, fp, protocol=pickle.HIGHEST_PROTOCOL)

with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/linear_feature_columns_criteo.p', 'rb') as fp:
    linear_feature_columns = pickle.load(fp)

# with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/feature_names_criteo.p', 'wb') as fp:
#     pickle.dump(feature_names, fp, protocol=pickle.HIGHEST_PROTOCOL)

with open('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/feature_names_criteo.p', 'rb') as fp:
    feature_names = pickle.load(fp)

In [None]:
train.to_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/criteo_train.pkl')
test.to_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/criteo_test.pkl')

In [None]:
train = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/criteo_train.pkl')
test = pd.read_pickle('/content/drive/MyDrive/RCSYS_finalproject/criteo_original/criteo_test.pkl')