Copyright (c) 2020, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

In [1]:
import os, time
os.environ["CUDA_VISIBLE_DEVICES"]="0,1,2,3"
start = time.time()
very_start = time.time()

In [2]:
#import pandas as pd, 
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
#pd.set_option('display.max_columns', 500)
#pd.set_option('display.max_rows', 500)
import cudf, cupy, time, rmm
cudf.__version__

'0.14.0'

In [3]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import subprocess

In [4]:
cmd = "hostname --all-ip-addresses"
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
IPADDR = str(output.decode()).split()[0]
print(IPADDR)
# this is not the correct ip

169.254.8.236


In [5]:
cluster = LocalCUDACluster(ip='10.2.61.36',protocol="ucx", 
                           rmm_pool_size="26GB",
                           enable_tcp_over_ucx=True, enable_nvlink=True)
#cluster = LocalCUDACluster()
client = Client(cluster)
client

0,1
Client  Scheduler: ucx://10.2.61.36:49857  Dashboard: http://10.2.61.36:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 270.39 GB


In [6]:
#%%time
#client.run(cudf.set_allocator, pool=True, initial_pool_size=int(2**35)-int(2**32))
#client.run(cupy.cuda.set_allocator, rmm.rmm_cupy_allocator)

# Load Train

In [7]:
%%time
path = '/raid/data/recsys/train_split'
train = dask_cudf.read_parquet(f'{path}/train-preproc-fold-*.parquet')#,dtypes=dtypes)

CPU times: user 846 ms, sys: 302 ms, total: 1.15 s
Wall time: 1.14 s


In [8]:
%%time
# DROP UNUSED COLUMNS
cols_drop = ['links','hashtags0', 'hashtags1', 'fold', 'a_account_creation',
 'b_account_creation']
train = train.drop(cols_drop,axis=1)

CPU times: user 120 ms, sys: 19.9 ms, total: 140 ms
Wall time: 127 ms


In [9]:
%%time
train, = dask.persist(train)
print(type(train), train.shape)

<class 'dask_cudf.core.DataFrame'> (Delayed('int-b218d7b2-71f1-46d1-bdb6-8a5543cdbe4d'), 23)
CPU times: user 9.54 ms, sys: 0 ns, total: 9.54 ms
Wall time: 8.87 ms


In [10]:
%%time
train = train.repartition(npartitions=4)
train, = dask.persist(train)
train.head()

CPU times: user 131 ms, sys: 29.7 ms, total: 161 ms
Wall time: 1.22 s


Unnamed: 0,tweet_id,media,domains,tweet_type,language,timestamp,a_user_id,a_follower_count,a_following_count,a_is_verified,...,b_is_verified,b_follows_a,reply,retweet,retweet_comment,like,engage_time,len_domains,len_hashtags,len_links
92560312,0,7,0,1,54,2020-02-09 09:26:50,0,14326,408,False,...,False,True,0,0,0,0,1970-01-01 00:00:00,0,0,0
55533709,10,0,0,1,54,2020-02-09 18:41:35,10,237126,1193,True,...,False,False,0,0,0,0,1970-01-01 00:00:00,0,0,0
37019981,20,5,0,1,3,2020-02-09 01:13:28,19,23079,1803,False,...,False,False,0,0,0,0,1970-01-01 00:00:00,0,0,0
74047136,30,0,5,2,11,2020-02-07 12:15:20,29,769176,190,False,...,False,False,0,0,0,1,2020-02-07 12:36:47,0,0,0
74047139,40,0,0,2,6,2020-02-08 14:14:39,35,73952,13,False,...,False,False,0,0,0,1,2020-02-09 13:33:47,0,0,0


In [11]:
for i in range(4):
    print(i, len(train.partitions[i]))

0 29629913
1 44309248
2 29638018
3 44498059


In [12]:
label_names = ['reply', 'retweet', 'retweet_comment', 'like']
for col in train.columns:
    if col in label_names:
        train[col] = train[col].astype('float32')
    elif train[col].dtype=='int64':
        train[col] = train[col].astype('int32')
    elif train[col].dtype=='int16':
        train[col] = train[col].astype('int8')

In [13]:
%%time
train = train.reset_index(drop=True)

CPU times: user 11 ms, sys: 0 ns, total: 11 ms
Wall time: 9.75 ms


In [14]:
%%time
train, = dask.persist(train)
print(train.shape)

(Delayed('int-3c3a6355-9e7d-4fea-b49e-546ebee424b4'), 23)
CPU times: user 51.4 ms, sys: 0 ns, total: 51.4 ms
Wall time: 50.2 ms


In [15]:
%%time 
# TIME FEATURES
# RAPIDS does this 5x faster than Pandas CPU
# If we didn't need to copy CPU to GPU to CPU, then 1300x faster!
def split_time(df):
    #gf = cudf.from_pandas(df[['timestamp']])
    df['dt_dow']  = df['timestamp'].dt.weekday#.to_array() 
    df['dt_hour'] = df['timestamp'].dt.hour#.to_array()
    df['dt_minute'] = df['timestamp'].dt.minute#.to_array()
    df['dt_second'] = df['timestamp'].dt.second#.to_array()
    return df

train = split_time(train)

CPU times: user 79.4 ms, sys: 14.9 ms, total: 94.3 ms
Wall time: 86.5 ms


In [16]:
train.head()[['engage_time','timestamp']]

Unnamed: 0,engage_time,timestamp
0,1970-01-01 00:00:00,2020-02-09 09:26:50
1,1970-01-01 00:00:00,2020-02-09 18:41:35
2,1970-01-01 00:00:00,2020-02-09 01:13:28
3,2020-02-07 12:36:47,2020-02-07 12:15:20
4,2020-02-09 13:33:47,2020-02-08 14:14:39


In [17]:
train.timestamp.dtype

dtype('<M8[ns]')

In [18]:
%%time
# ELAPSED TIME
for col in ['engage_time','timestamp']:
    train[col] = train[col].astype('int64')/1e3

CPU times: user 23.8 ms, sys: 0 ns, total: 23.8 ms
Wall time: 22.6 ms


In [19]:
%%time
train, = dask.persist(train)
print(type(train), train.shape)

<class 'dask_cudf.core.DataFrame'> (Delayed('int-622cd132-88a6-4826-a17c-876bb9ca991d'), 27)
CPU times: user 26.5 ms, sys: 3.79 ms, total: 30.3 ms
Wall time: 29.5 ms


In [20]:
train.head()[['engage_time','timestamp']]

Unnamed: 0,engage_time,timestamp
0,0.0,1581240000.0
1,0.0,1581274000.0
2,0.0,1581211000.0
3,1581079000.0,1581078000.0
4,1581255000.0,1581171000.0


In [21]:
def set_nan(ds):
    mask = ds == 0
    ds.loc[mask] = np.nan
    return ds.nans_to_nulls()
train['engage_time'] = train['engage_time'].map_partitions(set_nan)

In [22]:
train['elapsed_time'] = train['engage_time'] - train['timestamp']
train['elapsed_time'] = train.elapsed_time.astype('float64')

In [23]:
print(train['elapsed_time'].min().compute(),train['elapsed_time'].max().compute())
print(train['elapsed_time'].mean().compute())

2.0 603956.0
15581.699535245267


In [24]:
%%time
train, = dask.persist(train)
print(type(train), train.shape)

<class 'dask_cudf.core.DataFrame'> (Delayed('int-65b25a28-50b1-4d9a-91f4-af47d48594a7'), 28)
CPU times: user 12.6 ms, sys: 101 µs, total: 12.7 ms
Wall time: 12.3 ms


In [25]:
train.head()

Unnamed: 0,tweet_id,media,domains,tweet_type,language,timestamp,a_user_id,a_follower_count,a_following_count,a_is_verified,...,like,engage_time,len_domains,len_hashtags,len_links,dt_dow,dt_hour,dt_minute,dt_second,elapsed_time
0,0,7,0,1,54,1581240000.0,0,14326,408,False,...,0.0,,0,0,0,6,9,26,50,
1,10,0,0,1,54,1581274000.0,10,237126,1193,True,...,0.0,,0,0,0,6,18,41,35,
2,20,5,0,1,3,1581211000.0,19,23079,1803,False,...,0.0,,0,0,0,6,1,13,28,
3,30,0,5,2,11,1581078000.0,29,769176,190,False,...,1.0,1581079007.0,0,0,0,4,12,15,20,1287.0
4,40,0,0,2,6,1581171000.0,35,73952,13,False,...,1.0,1581255227.0,0,0,0,5,14,14,39,83948.0


# Feature Engineering 

In [26]:
%%time
# TRAIN FIRST 5 DAYS. VALIDATE LAST 2 DAYS
VALID_DOW = [1, 2]# order is [3, 4, 5, 6, 0, 1, 2]
valid = train[train['dt_dow'].isin(VALID_DOW)].reset_index(drop=True)
train = train[~train['dt_dow'].isin(VALID_DOW)].reset_index(drop=True)

CPU times: user 47 ms, sys: 300 µs, total: 47.3 ms
Wall time: 45.1 ms


In [27]:
%%time
train,valid = dask.persist(train,valid)
print(type(train), train.shape, valid.shape)

<class 'dask_cudf.core.DataFrame'> (Delayed('int-83f2d59d-2f43-4a7e-8928-f40dc58f0cd9'), 28) (Delayed('int-ed8484d9-f286-4236-8e24-677b5c9baffe'), 28)
CPU times: user 11.3 ms, sys: 0 ns, total: 11.3 ms
Wall time: 10.3 ms


In [28]:
%%time
train = train.sort_values('timestamp').reset_index(drop=True)
valid = valid.sort_values('timestamp').reset_index(drop=True)
train = train.drop(['dt_dow','timestamp'],axis=1)
valid = valid.drop(['dt_dow','timestamp'],axis=1)
train,valid = dask.persist(train,valid)
train.head()

CPU times: user 2.19 s, sys: 3.49 s, total: 5.68 s
Wall time: 2.09 s


Unnamed: 0,tweet_id,media,domains,tweet_type,language,a_user_id,a_follower_count,a_following_count,a_is_verified,b_user_id,...,retweet_comment,like,engage_time,len_domains,len_hashtags,len_links,dt_hour,dt_minute,dt_second,elapsed_time
0,1173270,0,4,2,54,672136,5341,840,False,2407520,...,0.0,0.0,,0,0,0,0,0,0,
1,164980,5,6625,2,3,112668,22984,6033,True,15890909,...,0.0,0.0,,0,0,0,0,0,0,
2,3240270,9,0,2,11,64906,256871,8,True,8602066,...,0.0,0.0,,0,2,0,0,0,0,
3,2468740,5,4,2,54,1278002,16289,376,False,14428135,...,0.0,0.0,,0,0,0,0,0,0,
4,3653210,0,0,2,11,98180,9841,6,True,16369737,...,0.0,0.0,,0,5,0,0,0,0,


In [29]:
for i in range(4):
    print(i,len(train.partitions[i]))

0 26885321
1 26832264
2 26822864
3 26696073


In [30]:
for i in range(4):
    print(i,len(valid.partitions[i]))

0 10236930
1 10267563
2 10191750
3 10142473


### Target Encode

In [31]:
class MTE_one_shot:
    
    def __init__(self, folds, smooth, seed=42, mode='gpu'):
        self.folds = folds
        self.seed = seed
        self.smooth = smooth
        if mode=='gpu':
            self.np = cupy
            self.df = cudf
        else:
            self.np = np
            self.df = pd
        self.mode = mode
        
    def fit_transform(self, train, x_col, y_col, y_mean=None, out_col = None, out_dtype=None):
        
        self.y_col = y_col
        self.np.random.seed(self.seed)
        
        if 'fold' not in train.columns:
            fsize = len(train)//self.folds
            if isinstance(train,dask_cudf.core.DataFrame):
                #train['fold'] = train.map_partitions(lambda cudf_df: cudf_df.index%self.folds)
                train['fold'] = 1
                train['fold'] = train['fold'].cumsum()
                train['fold'] = train['fold']//fsize
                train['fold'] = train['fold']%self.folds
            else:
                #train['fold'] = self.np.random.randint(0,self.folds,len(train))
                train['fold'] = (train.index.values//fsize)%self.folds
        
        if out_col is None:
            tag = x_col if isinstance(x_col,str) else '_'.join(x_col)
            out_col = f'TE_{tag}_{self.y_col}'
        
        if y_mean is None:
            y_mean = train[y_col].mean()#.compute().astype('float32')
        self.mean = y_mean
        
        cols = ['fold',x_col] if isinstance(x_col,str) else ['fold']+x_col
        
        agg_each_fold = train.groupby(cols).agg({y_col:['count','sum']}).reset_index()
        agg_each_fold.columns = cols + ['count_y','sum_y']
        
        agg_all = agg_each_fold.groupby(x_col).agg({'count_y':'sum','sum_y':'sum'}).reset_index()
        cols = [x_col] if isinstance(x_col,str) else x_col
        agg_all.columns = cols + ['count_y_all','sum_y_all']
        
        agg_each_fold = agg_each_fold.merge(agg_all,on=x_col,how='left')
        agg_each_fold['count_y_all'] = agg_each_fold['count_y_all'] - agg_each_fold['count_y']
        agg_each_fold['sum_y_all'] = agg_each_fold['sum_y_all'] - agg_each_fold['sum_y']
        agg_each_fold[out_col] = (agg_each_fold['sum_y_all']+self.smooth*self.mean)/(agg_each_fold['count_y_all']+self.smooth)
        agg_each_fold = agg_each_fold.drop(['count_y_all','count_y','sum_y_all','sum_y'],axis=1)
        
        agg_all[out_col] = (agg_all['sum_y_all']+self.smooth*self.mean)/(agg_all['count_y_all']+self.smooth)
        agg_all = agg_all.drop(['count_y_all','sum_y_all'],axis=1)
        self.agg_all = agg_all
        
        train.columns
        cols = ['fold',x_col] if isinstance(x_col,str) else ['fold']+x_col
        train = train.merge(agg_each_fold,on=cols,how='left')
        del agg_each_fold
        #self.agg_each_fold = agg_each_fold
        if self.mode=='gpu':
            if isinstance(train,dask_cudf.core.DataFrame):
                train[out_col] = train.map_partitions(lambda cudf_df: cudf_df[out_col].nans_to_nulls())
            else:
                train[out_col] = train[out_col].nans_to_nulls()
        train[out_col] = train[out_col].fillna(self.mean)
        
        if out_dtype is not None:
            train[out_col] = train[out_col].astype(out_dtype)
        return train
    
    def transform(self, test, x_col, out_col = None, out_dtype=None):
        if out_col is None:
            tag = x_col if isinstance(x_col,str) else '_'.join(x_col)
            out_col = f'TE_{tag}_{self.y_col}'
        test = test.merge(self.agg_all,on=x_col,how='left')
        test[out_col] = test[out_col].fillna(self.mean)
        if out_dtype is not None:
            test[out_col] = test[out_col].astype(out_dtype)
        return test
 

In [32]:
%%time
# cuDF TE ENCODING IS SUPER FAST!!
idx = 0; cols = []
start = time.time()
for t in ['reply', 'retweet', 'retweet_comment', 'like']:
    start = time.time()
    for c in ['media', 'tweet_type', 'language', 'a_user_id', 'b_user_id']:
        out_col = f'TE_{c}_{t}'
        encoder = MTE_one_shot(folds=5,smooth=20)
        train = encoder.fit_transform(train, c, t, out_col=out_col, out_dtype='float32')
        valid = encoder.transform(valid, c, out_col=out_col, out_dtype='float32')
        cols.append(out_col)
        del encoder
        train,valid = dask.persist(train,valid)
        train.head()

CPU times: user 12.2 s, sys: 900 ms, total: 13.1 s
Wall time: 39.4 s


### Multiple Column Target Encode

In [33]:
%%time
# cuDF TE ENCODING IS SUPER FAST!!
idx = 0; cols=[]
c = ['domains','language','b_follows_a','tweet_type','media','a_is_verified']
for t in ['reply', 'retweet', 'retweet_comment', 'like']:
    out_col = f'TE_multi_{t}'
    encoder = MTE_one_shot(folds=5,smooth=20)
    train = encoder.fit_transform(train, c, t, out_col=out_col, out_dtype='float32')
    valid = encoder.transform(valid, c, out_col=out_col, out_dtype='float32')
    cols.append(out_col)
    del encoder

CPU times: user 2 s, sys: 38.6 ms, total: 2.04 s
Wall time: 1.98 s


In [34]:
%%time
train,valid = dask.persist(train,valid)
train.head()

CPU times: user 1.12 s, sys: 99.4 ms, total: 1.22 s
Wall time: 2.82 s


Unnamed: 0,tweet_id,media,domains,tweet_type,language,a_user_id,a_follower_count,a_following_count,a_is_verified,b_user_id,...,TE_b_user_id_retweet_comment,TE_media_like,TE_tweet_type_like,TE_language_like,TE_a_user_id_like,TE_b_user_id_like,TE_multi_reply,TE_multi_retweet,TE_multi_retweet_comment,TE_multi_like
0,323067,5,161,2,54,1579,34843521,245,True,23840898,...,0.007008,0.494331,0.532514,0.449791,0.049679,0.453557,0.002379,0.01318,0.000506,0.079467
1,323067,5,161,2,54,1579,34843521,245,True,24016535,...,0.006703,0.494331,0.532514,0.449791,0.049679,0.477315,0.002379,0.01318,0.000506,0.079467
2,323067,5,161,2,54,1579,34843521,245,True,23892268,...,0.007341,0.494331,0.532514,0.449791,0.049679,0.427535,0.002379,0.01318,0.000506,0.079467
3,323067,5,161,2,54,1579,34843521,245,True,24082276,...,0.006424,0.494331,0.532514,0.449791,0.049679,0.41576,0.002379,0.01318,0.000506,0.079467
4,323067,5,161,2,54,1579,34919640,243,True,23879187,...,0.007708,0.494331,0.532514,0.449791,0.049679,0.448912,0.002379,0.01318,0.000506,0.079467


### Elapsed Time Target Encode

In [35]:
%%time
# cuDF TE ENCODING IS SUPER FAST!!
idx = 0; cols = []
for c in ['media', 'tweet_type', 'language']:#, 'a_user_id', 'b_user_id']:
    for t in ['elapsed_time']:
        out_col = f'TE_{c}_{t}'
        encoder = MTE_one_shot(folds=5,smooth=20)
        train = encoder.fit_transform(train, c, t, out_col=out_col)
        out_dtype='float32' #if 'user_id' in c else None
        valid = encoder.transform(valid, c, out_col=out_col, out_dtype=out_dtype)
        cols.append(out_col)
        #del encoder

CPU times: user 970 ms, sys: 28.5 ms, total: 999 ms
Wall time: 967 ms


In [36]:
%%time
train,valid = dask.persist(train,valid)
train.head()

CPU times: user 656 ms, sys: 73.7 ms, total: 729 ms
Wall time: 1.68 s


Unnamed: 0,tweet_id,media,domains,tweet_type,language,a_user_id,a_follower_count,a_following_count,a_is_verified,b_user_id,...,TE_language_like,TE_a_user_id_like,TE_b_user_id_like,TE_multi_reply,TE_multi_retweet,TE_multi_retweet_comment,TE_multi_like,TE_media_elapsed_time,TE_tweet_type_elapsed_time,TE_language_elapsed_time
0,223305,0,411,2,4,1452,3444937,26,True,27109478,...,0.449679,0.633323,0.453557,0.009299,0.113452,0.003633,0.502925,15701.587078,18450.83857,15133.940503
1,223305,0,411,2,4,1452,3454699,26,True,27279981,...,0.449679,0.633323,0.51913,0.009299,0.113452,0.003633,0.502925,15701.587078,18450.83857,15133.940503
2,223305,0,411,2,4,1452,3454699,26,True,4609600,...,0.449679,0.633323,0.537625,0.009299,0.113452,0.003633,0.502925,15701.587078,18450.83857,15133.940503
3,223305,0,411,2,4,1452,3454699,26,True,27215638,...,0.449679,0.633323,0.475155,0.009299,0.113452,0.003633,0.502925,15701.587078,18450.83857,15133.940503
4,223305,0,411,2,4,1452,3454699,26,True,27119370,...,0.449679,0.633323,0.427535,0.009299,0.113452,0.003633,0.502925,15701.587078,18450.83857,15133.940503


In [37]:
valid.head()

Unnamed: 0,tweet_id,media,domains,tweet_type,language,a_user_id,a_follower_count,a_following_count,a_is_verified,b_user_id,...,TE_language_like,TE_a_user_id_like,TE_b_user_id_like,TE_multi_reply,TE_multi_retweet,TE_multi_retweet_comment,TE_multi_like,TE_media_elapsed_time,TE_tweet_type_elapsed_time,TE_language_elapsed_time
0,61737,5,0,2,50,2824,31861000,80,True,23342181,...,0.423782,0.414891,0.41576,0.016941,0.036669,0.004081,0.472134,18000.140625,19014.623047,17206.958984
1,61737,5,0,2,50,2824,31852634,80,True,22730399,...,0.423782,0.414891,0.390358,0.016941,0.036669,0.004081,0.472134,18000.140625,19014.623047,17206.958984
2,61737,5,0,2,50,2824,31861000,80,True,4306434,...,0.423782,0.414891,0.579943,0.016941,0.036669,0.004081,0.472134,18000.140625,19014.623047,17206.958984
3,61737,5,0,2,50,2824,31861000,80,True,23160229,...,0.423782,0.414891,0.427535,0.016941,0.036669,0.004081,0.472134,18000.140625,19014.623047,17206.958984
4,61737,5,0,2,50,2824,31852634,80,True,23401988,...,0.423782,0.414891,0.39913,0.016941,0.036669,0.004081,0.472134,18000.140625,19014.623047,17206.958984


### Count Encode

In [38]:
class FrequencyEncoder:
    
    def __init__(self, seed=42, mode='gpu'):
        self.seed = seed
        if mode=='gpu':
            self.np = cupy
            self.df = cudf
        else:
            self.np = np
            self.df = pd
        self.mode = mode
        
    def fit_transform(self, train, x_col, c_col=None, out_col = None):
        self.np.random.seed(self.seed)
        if c_col is None or c_col not in train.columns:
            c_col = 'dummy'
            train[c_col] = 1
            drop = True
        else:
            drop = False
            
        if out_col is None:
            tag = x_col if isinstance(x_col,str) else '_'.join(x_col)
            out_col = f'CE_{tag}_norm'
            
        cols = [x_col] if isinstance(x_col,str) else x_col
        agg_all = train.groupby(cols).agg({c_col:'count'}).reset_index()
        if drop:
            train = train.drop(c_col,axis=1)
        agg_all.columns = cols + [out_col]
        agg_all[out_col] = agg_all[out_col].astype('int32')
        agg_all[out_col] = agg_all[out_col]*1.0/len(train)
        agg_all[out_col] = agg_all[out_col].astype('float32')
    
        train = train.merge(agg_all,on=cols,how='left')
        del agg_all
        #print(train.columns)
        if self.mode=='gpu':
            if isinstance(train,dask_cudf.core.DataFrame):
                train[out_col] = train.map_partitions(lambda cudf_df: cudf_df[out_col].nans_to_nulls())
            else:
                train[out_col] = train[out_col].nans_to_nulls()
        return train
    
    def transform(self, test, x_col, c_col=None, out_col = None):
        return self.fit_transform(test, x_col, c_col, out_col)
 

In [39]:
class CountEncoder:
    
    def __init__(self, seed=42, mode='gpu'):
        self.seed = seed
        if mode=='gpu':
            self.np = cupy
            self.df = cudf
        else:
            self.np = np
            self.df = pd
        self.mode = mode
        
    def fit_transform(self, train, test, x_col, out_col = None):
        self.np.random.seed(self.seed)
        
        common_cols = [i for i in train.columns if i in test.columns and i!=x_col]

        if len(common_cols):
            c_col = common_cols[0]
            drop = False
        else:
            c_col = 'dummy'
            train[c_col] = 1
            test[c_col]=1
            drop = True
            
        if out_col is None:
            tag = x_col if isinstance(x_col,str) else '_'.join(x_col)
            out_col = f'CE_{tag}_norm'
            
        cols = [x_col] if isinstance(x_col,str) else x_col
        agg_all = train.groupby(cols).agg({c_col:'count'}).reset_index()
        agg_all.columns = cols + [out_col]
        
        agg_test = test.groupby(cols).agg({c_col:'count'}).reset_index()
        agg_test.columns = cols + [out_col+'_test']
        agg_all = agg_all.merge(agg_test,on=cols,how='left')
        agg_all[out_col+'_test'] = agg_all[out_col+'_test'].fillna(0)
        agg_all[out_col] = agg_all[out_col] + agg_all[out_col+'_test']
        agg_all = agg_all.drop(out_col+'_test', axis=1)
        del agg_test
            
        if drop:
            train = train.drop(c_col,axis=1)
            test = test.drop(c_col,axis=1)
        train = train.merge(agg_all,on=cols,how='left')
        test = test.merge(agg_all,on=cols,how='left')
        del agg_all
        return train,test

In [40]:
%%time
# cuDF CE ENCODING IS SUPER FAST!!
idx = 0; cols = []
for c in ['media', 'tweet_type', 'language', 'a_user_id', 'b_user_id']:
    encoder = CountEncoder()
    out_col = f'CE_{c}'
    train,valid = encoder.fit_transform(train, valid, c, out_col=out_col)
    print
    del encoder
    train,valid = dask.persist(train,valid)
    train.head()

CPU times: user 1.37 s, sys: 90.4 ms, total: 1.46 s
Wall time: 4.02 s


In [41]:
%%time
# cuDF CE ENCODING IS SUPER FAST!!
idx = 0; cols = []
for c in ['media', 'tweet_type', 'language', 'a_user_id', 'b_user_id']:
    encoder = FrequencyEncoder()
    out_col = f'CE_{c}_norm'
    train = encoder.fit_transform(train, c, c_col='tweet_id', out_col=out_col)
    valid = encoder.transform(valid, c, c_col='tweet_id', out_col=out_col)
    cols.append(out_col)
    del encoder
    train,valid = dask.persist(train,valid)
    train.head()

CPU times: user 2.09 s, sys: 129 ms, total: 2.22 s
Wall time: 3.53 s


### Difference Encode (Lag Features)

In [42]:
def diff_encode_cudf_v1(train,col,tar,sft=1):
    train[col+'_sft'] = train[col].shift(sft)
    train[tar+'_sft'] = train[tar].shift(sft)
    out_col = f'DE_{col}_{tar}_{sft}'
    train[out_col] = train[tar]-train[tar+'_sft']
    mask = '__MASK__'
    train[mask] = train[col] == train[col+'_sft']
    train = train.drop([col+'_sft',tar+'_sft'],axis=1)
    train[out_col] = train[out_col]*train[mask]
    train = train.drop(mask,axis=1)
    return train

In [43]:
%%time

# cuDF DE ENCODING IS FAST!!
idx = 0; cols = []; sc = 'timestamp'
for c in ['b_user_id']:
    for t in ['b_follower_count','b_following_count','language']:
        for s in [1,-1]:
            start = time.time()
            train = diff_encode_cudf_v1(train, col=c, tar=t, sft=s)
            valid = diff_encode_cudf_v1(valid, col=c, tar=t, sft=s)
            train,valid = dask.persist(train,valid)
            train.head()
            end = time.time(); idx += 1
            print('DE',c,t,s,'%.1f seconds'%(end-start))

DE b_user_id b_follower_count 1 0.5 seconds
DE b_user_id b_follower_count -1 0.6 seconds
DE b_user_id b_following_count 1 0.5 seconds
DE b_user_id b_following_count -1 0.5 seconds
DE b_user_id language 1 0.5 seconds
DE b_user_id language -1 0.6 seconds
CPU times: user 2.76 s, sys: 153 ms, total: 2.91 s
Wall time: 3.3 s


### Diff Language

In [44]:
train_lang = train[['a_user_id', 'language', 'tweet_id']].drop_duplicates()
valid_lang = valid[['a_user_id', 'language', 'tweet_id']].drop_duplicates()
train_lang_count = train_lang.groupby(['a_user_id', 'language']).agg({'tweet_id':'count'}).reset_index()
valid_lang_count = valid_lang.groupby(['a_user_id', 'language']).agg({'tweet_id':'count'}).reset_index()
train_lang_count,valid_lang_count = dask.persist(train_lang_count,valid_lang_count)
train_lang_count.head()
del train_lang,valid_lang

In [45]:
%%time
train_lang_count = train_lang_count.merge(valid_lang_count,on=['a_user_id', 'language'],how='left')
train_lang_count['tweet_id_y'] = train_lang_count['tweet_id_y'].fillna(0)
train_lang_count['tweet_id_x'] = train_lang_count['tweet_id_x'] + train_lang_count['tweet_id_y']
train_lang_count = train_lang_count.drop('tweet_id_y',axis=1)
train_lang_count.columns = ['a_user_id', 'top_language', 'language_count']
train_lang_count, = dask.persist(train_lang_count)
train_lang_count.head()

CPU times: user 54.1 ms, sys: 11.8 ms, total: 66 ms
Wall time: 115 ms


Unnamed: 0,a_user_id,top_language,language_count
0,23104,3,7
1,23106,3,215
2,23106,54,3
3,23109,54,5
4,23110,54,29


In [46]:
%%time
train_lang_count = train_lang_count.sort_values(['a_user_id', 'language_count'])
train_lang_count['a_user_shifted'] = train_lang_count['a_user_id'].shift(1)
train_lang_count = train_lang_count[train_lang_count['a_user_id']!=train_lang_count['a_user_shifted']]
train_lang_count = train_lang_count.drop(['a_user_shifted','language_count'],axis=1)
train_lang_count.columns = ['a_user_id','top_language']
train_lang_count, = dask.persist(train_lang_count)
train_lang_count.head()

CPU times: user 56.9 ms, sys: 0 ns, total: 56.9 ms
Wall time: 112 ms


Unnamed: 0,a_user_id,top_language
71476,0,4
71478,1,47
71463,2,21
71466,3,25
71479,4,54


In [47]:
def diff_language(df,df_lang_count):
    df = df.merge(df_lang_count,how='left', left_on='b_user_id', right_on='a_user_id')
    df['nan_language'] = df['top_language'].isnull()
    df['same_language'] = df['language'] == df['top_language']
    df['diff_language'] = df['language'] != df['top_language']
    df['same_language'] = df['same_language']*(1-df['nan_language'])
    df['diff_language'] = df['diff_language']*(1-df['nan_language'])
    df = df.drop('top_language',axis=1)
    return df

In [48]:
%%time
train = diff_language(train,train_lang_count)
valid = diff_language(valid,train_lang_count)
train,valid = dask.persist(train,valid)
train.head()

CPU times: user 505 ms, sys: 17.5 ms, total: 523 ms
Wall time: 712 ms


Unnamed: 0,tweet_id,media,domains,tweet_type,language,a_user_id_x,a_follower_count,a_following_count,a_is_verified,b_user_id,...,DE_b_user_id_b_follower_count_1,DE_b_user_id_b_follower_count_-1,DE_b_user_id_b_following_count_1,DE_b_user_id_b_following_count_-1,DE_b_user_id_language_1,DE_b_user_id_language_-1,a_user_id_y,nan_language,same_language,diff_language
0,43130,7,0,2,47,8826,3864033,100,True,170675,...,0,0,0,0,0,0,170675.0,False,0,1
1,43130,7,0,2,47,8826,3864033,100,True,6007085,...,0,0,0,0,0,0,6007085.0,False,0,1
2,43130,7,0,2,47,8826,3864033,100,True,5058499,...,0,0,0,0,0,0,5058499.0,False,0,1
3,43130,7,0,2,47,8826,3864033,100,True,6473473,...,0,0,0,0,0,0,6473473.0,False,0,1
4,43130,7,0,2,47,8826,3822152,100,True,16270149,...,0,0,0,0,0,0,,True,0,0


## Rate feature

In [49]:
%%time
# follow rate feature
train['a_ff_rate'] = (train['a_following_count'] / train['a_follower_count']).astype('float32')
train['b_ff_rate'] = (train['b_follower_count']  / train['b_following_count']).astype('float32')
valid['a_ff_rate']  = (valid['a_following_count'] / valid['a_follower_count']).astype('float32')
valid['b_ff_rate']  = (valid['b_follower_count']  / valid['b_following_count']).astype('float32')

CPU times: user 70.4 ms, sys: 201 µs, total: 70.6 ms
Wall time: 67.6 ms


In [50]:
train,valid = dask.persist(train,valid)

In [51]:
train.head()
valid.head()
print()




# Summarize Features

In [52]:
%%time

label_names = ['reply', 'retweet', 'retweet_comment', 'like']
DONT_USE = ['timestamp','a_account_creation','b_account_creation','engage_time',
            'fold','b_user_id','a_user_id', 'dt_dow',
            'a_account_creation', 'b_account_creation', 'elapsed_time',
             'links','domains','hashtags0','hashtags1']
DONT_USE += label_names
features = [c for c in train.columns if c not in DONT_USE]

RMV = [c for c in DONT_USE if c in train.columns and c not in label_names]
RMV = list(set(RMV))

CPU times: user 3.8 ms, sys: 0 ns, total: 3.8 ms
Wall time: 3.74 ms


In [53]:
%%time

train = train.drop(RMV,axis=1)
RMV = [c for c in RMV if c in valid.columns]
valid = valid.drop(RMV,axis=1)    
wait(train)
wait(valid)

CPU times: user 30.5 ms, sys: 161 µs, total: 30.7 ms
Wall time: 29.4 ms


DoneAndNotDoneFutures(done={<Future: finished, type: cudf.DataFrame, key: ('assign-0b47b881e1e4877e24ac587cc7236c0f', 1)>, <Future: finished, type: cudf.DataFrame, key: ('assign-0b47b881e1e4877e24ac587cc7236c0f', 0)>, <Future: finished, type: cudf.DataFrame, key: ('assign-0b47b881e1e4877e24ac587cc7236c0f', 2)>, <Future: finished, type: cudf.DataFrame, key: ('assign-0b47b881e1e4877e24ac587cc7236c0f', 3)>}, not_done=set())

# Train Model Validate
We will train on random 10% of first 5 days and validation on last 2 days

In [54]:
%%time

SAMPLE_RATIO = 0.1
SEED = 1

if SAMPLE_RATIO < 1.0:
    train['sample'] = train['tweet_id'].map_partitions(lambda cudf_df: cudf_df.hash_encode(stop=10))
    print(len(train))
    
    train = train[train['sample']<10*SAMPLE_RATIO]
    train, = dask.persist(train)
    train.head()
    print(len(train))


Y_train = train[label_names]
Y_train, = dask.persist(Y_train)
Y_train.head()    
    
train = train.drop(['sample','tweet_id']+label_names,axis=1)
train, = dask.persist(train)
train.head()


features = [c for c in train.columns if c not in DONT_USE]
print('Using %i features:'%(len(features)),train.shape[1])
np.asarray(features)

107236522
10716577
Using 66 features: 66
CPU times: user 344 ms, sys: 31.4 ms, total: 375 ms
Wall time: 939 ms


array(['media', 'tweet_type', 'language', 'a_user_id_x',
       'a_follower_count', 'a_following_count', 'a_is_verified',
       'b_follower_count', 'b_following_count', 'b_is_verified',
       'b_follows_a', 'len_domains', 'len_hashtags', 'len_links',
       'dt_hour', 'dt_minute', 'dt_second', 'TE_media_reply',
       'TE_tweet_type_reply', 'TE_language_reply', 'TE_a_user_id_reply',
       'TE_b_user_id_reply', 'TE_media_retweet', 'TE_tweet_type_retweet',
       'TE_language_retweet', 'TE_a_user_id_retweet',
       'TE_b_user_id_retweet', 'TE_media_retweet_comment',
       'TE_tweet_type_retweet_comment', 'TE_language_retweet_comment',
       'TE_a_user_id_retweet_comment', 'TE_b_user_id_retweet_comment',
       'TE_media_like', 'TE_tweet_type_like', 'TE_language_like',
       'TE_a_user_id_like', 'TE_b_user_id_like', 'TE_multi_reply',
       'TE_multi_retweet', 'TE_multi_retweet_comment', 'TE_multi_like',
       'TE_media_elapsed_time', 'TE_tweet_type_elapsed_time',
       'TE_langu

In [55]:
SAMPLE_RATIO = 0.35 # VAL SET NOW SIZE OF TEST SET
SEED = 1
if SAMPLE_RATIO < 1.0:
    print(len(valid))
    valid['sample'] = valid['tweet_id'].map_partitions(lambda cudf_df: cudf_df.hash_encode(stop=10))
    
    valid = valid[valid['sample']<10*SAMPLE_RATIO]
    valid, = dask.persist(valid)
    valid.head()
    print(len(valid))
    
Y_valid = valid[label_names]
Y_valid, = dask.persist(Y_valid)
Y_valid.head()

valid = valid.drop(['sample','tweet_id']+label_names,axis=1)
valid, = dask.persist(valid)
valid.head()

40838716
16288926


Unnamed: 0,media,tweet_type,language,a_user_id_x,a_follower_count,a_following_count,a_is_verified,b_follower_count,b_following_count,b_is_verified,...,DE_b_user_id_b_following_count_1,DE_b_user_id_b_following_count_-1,DE_b_user_id_language_1,DE_b_user_id_language_-1,a_user_id_y,nan_language,same_language,diff_language,a_ff_rate,b_ff_rate
0,0,2,54,44114,6312811,649,True,572,466,False,...,0,0,0,0,6716746.0,False,1,0,0.000103,1.227468
2,0,2,54,44114,6311325,647,True,482,341,False,...,0,0,0,0,11486449.0,False,1,0,0.000103,1.41349
3,0,2,54,44114,6311325,647,True,102,546,False,...,0,0,0,0,,True,0,0,0.000103,0.186813
4,0,2,54,44114,6311325,647,True,14,195,False,...,0,0,0,0,,True,0,0,0.000103,0.071795
5,0,2,54,44114,6311325,647,True,94,118,False,...,0,0,0,0,,True,0,0,0.000103,0.79661


In [56]:
import xgboost as xgb
print('XGB Version',xgb.__version__)

xgb_parms = { 
    'max_depth':8, 
    'learning_rate':0.1, 
    'subsample':0.8,
    'colsample_bytree':0.3, 
    'eval_metric':'logloss',
    'objective':'binary:logistic',
    'tree_method':'gpu_hist',
    'predictor' : 'gpu_predictor'
}


XGB Version 1.1.0


In [57]:
if train.columns.duplicated().sum()>0:
    raise Exception(f'duplicated!: { train.columns[train.columns.duplicated()] }')
print('no dup :) ')
print(f'X_train.shape {train.shape}')
print(f'X_valid.shape {valid.shape}')

no dup :) 
X_train.shape (Delayed('int-0859ff55-4b53-43b8-8964-b2b0c9db145a'), 66)
X_valid.shape (Delayed('int-86e3de93-21b6-4a8c-b14c-98c419f1f5c9'), 66)


In [58]:
%%time

for col in train.columns:
    if train[col].dtype=='bool':
        train[col] = train[col].astype('int8')
        valid[col] = valid[col].astype('int8')
train,valid = dask.persist(train,valid)
train.head()
valid.head()

CPU times: user 286 ms, sys: 20.5 ms, total: 306 ms
Wall time: 360 ms


Unnamed: 0,media,tweet_type,language,a_user_id_x,a_follower_count,a_following_count,a_is_verified,b_follower_count,b_following_count,b_is_verified,...,DE_b_user_id_b_following_count_1,DE_b_user_id_b_following_count_-1,DE_b_user_id_language_1,DE_b_user_id_language_-1,a_user_id_y,nan_language,same_language,diff_language,a_ff_rate,b_ff_rate
0,0,2,54,44114,6312811,649,1,572,466,0,...,0,0,0,0,6716746.0,0,1,0,0.000103,1.227468
2,0,2,54,44114,6311325,647,1,482,341,0,...,0,0,0,0,11486449.0,0,1,0,0.000103,1.41349
3,0,2,54,44114,6311325,647,1,102,546,0,...,0,0,0,0,,1,0,0,0.000103,0.186813
4,0,2,54,44114,6311325,647,1,14,195,0,...,0,0,0,0,,1,0,0,0.000103,0.071795
5,0,2,54,44114,6311325,647,1,94,118,0,...,0,0,0,0,,1,0,0,0.000103,0.79661


In [59]:
%%time
# TRAIN AND VALIDATE

NROUND = 300
VERBOSE_EVAL = 50
#ESR = 50
    
oof = np.zeros((len(valid),len(label_names)))
preds = []
for i in range(4):

    name = label_names[i]
    print('#'*25);print('###',name);print('#'*25)
       
    start = time.time(); print('Creating DMatrix...')
    dtrain = xgb.dask.DaskDMatrix(client,data=train,label=Y_train.iloc[:, i])
    dvalid = xgb.dask.DaskDMatrix(client,data=valid,label=Y_valid.iloc[:, i])
    print('Took %.1f seconds'%(time.time()-start))
             
    start = time.time(); print('Training...')
    model = xgb.dask.train(client, xgb_parms, 
                           dtrain=dtrain,
                           #evals=[(dtrain,'train'),(dvalid,'valid')],
                           num_boost_round=NROUND,
                           #early_stopping_rounds=ESR,
                           verbose_eval=VERBOSE_EVAL) 
    print('Took %.1f seconds'%(time.time()-start))
        
    start = time.time(); print('Predicting...')
    #Y_valid[f'pred_{name}'] = xgb.dask.predict(client,model,valid)
    #oof[:, i] += xgb.dask.predict(client,model,dvalid).compute()
    preds.append(xgb.dask.predict(client,model,valid))
    print('Took %.1f seconds'%(time.time()-start))
        
    del model, dtrain, dvalid

#########################
### reply
#########################
Creating DMatrix...
Took 0.1 seconds
Training...
Took 14.4 seconds
Predicting...


  [<function predict.<locals>.mapped_predict at 0x7f ... titions>, True]
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


Took 0.6 seconds
#########################
### retweet
#########################
Creating DMatrix...
Took 0.1 seconds
Training...
Took 14.2 seconds
Predicting...
Took 0.6 seconds
#########################
### retweet_comment
#########################
Creating DMatrix...
Took 0.1 seconds
Training...
Took 13.4 seconds
Predicting...
Took 0.5 seconds
#########################
### like
#########################
Creating DMatrix...
Took 0.1 seconds
Training...
Took 14.4 seconds
Predicting...
Took 0.6 seconds
CPU times: user 4.21 s, sys: 752 ms, total: 4.97 s
Wall time: 58.9 s


In [60]:
yvalid = Y_valid[label_names].values.compute()
#yvalid = cupy.asnumpy(yvalid)

oof = cupy.array([i.values.compute() for i in preds]).T
#oof = cupy.asnumpy(oof)

# Compute Validation Metrics

In [61]:
from sklearn.metrics import auc

def precision_recall_curve(y_true,y_pred):
    y_true = y_true.astype('float32')
    ids = cupy.argsort(-y_pred) 
    y_true = y_true[ids]
    y_pred = y_pred[ids]
    y_pred = cupy.flip(y_pred,axis=0)

    acc_one = cupy.cumsum(y_true)
    sum_one = cupy.sum(y_true)
    
    precision = cupy.flip(acc_one/cupy.cumsum(cupy.ones(len(y_true))),axis=0)
    precision[:-1] = precision[1:]
    precision[-1] = 1.

    recall = cupy.flip(acc_one/sum_one,axis=0)
    recall[:-1] = recall[1:]
    recall[-1] = 0
    n = (recall==1).sum()
    
    return precision[n-1:],recall[n-1:],y_pred[n:]

def compute_prauc(pred, gt):
    prec, recall, thresh = precision_recall_curve(gt, pred)
    recall, prec = cupy.asnumpy(recall), cupy.asnumpy(prec)
    prauc = auc(recall, prec)
    return prauc

def log_loss(y_true,y_pred,eps=1e-15, normalize=True, sample_weight=None):
    y_true = y_true.astype('int32')
    y_pred = cupy.clip(y_pred, eps, 1 - eps)
    if y_pred.ndim == 1:
        y_pred = cupy.expand_dims(y_pred, axis=1)
    if y_pred.shape[1] == 1:
        y_pred = cupy.hstack([1 - y_pred, y_pred])

    y_pred /= cupy.sum(y_pred, axis=1, keepdims=True)
    loss = -cupy.log(y_pred)[cupy.arange(y_pred.shape[0]), y_true]
    return _weighted_sum(loss, sample_weight, normalize).item()

def _weighted_sum(sample_score, sample_weight, normalize):
    if normalize:
        return cupy.average(sample_score, weights=sample_weight)
    elif sample_weight is not None:
        return cupy.dot(sample_score, sample_weight)
    else:
        return sample_score.sum()

# FAST METRIC FROM GIBA
def compute_rce_fast(pred, gt):
    cross_entropy = log_loss(gt, pred)
    yt = np.mean(gt)     
    strawman_cross_entropy = -(yt*np.log(yt) + (1 - yt)*np.log(1 - yt))
    return (1.0 - cross_entropy/strawman_cross_entropy)*100.0

In [62]:
%%time
txt = ''
for i in range(4):
    prauc = compute_prauc(oof[:,i], yvalid[:, i])#.item()
    rce   = compute_rce_fast(oof[:,i], yvalid[:, i]).item()
    txt_ = f"{label_names[i]:20} PRAUC:{prauc:.5f} RCE:{rce:.5f}"
    print(txt_)
    txt += txt_ + '\n'

reply                PRAUC:0.17495 RCE:18.83670
retweet              PRAUC:0.53806 RCE:29.56952
retweet_comment      PRAUC:0.05584 RCE:11.69343
like                 PRAUC:0.77888 RCE:26.49255
CPU times: user 1.07 s, sys: 424 ms, total: 1.5 s
Wall time: 1.43 s


In [63]:
print('This notebook took %.1f minutes'%((time.time()-very_start)/60.))

This notebook took 2.3 minutes
