Copyright (c) 2020, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

#### ~~Tweet_id is encoded using approximate hashing which caused hashing collisions. Exact encoding of tweet_id is blocked by a bug in cudf. All other columns are using exact encoding.~~ 
Fixed

In [1]:
import os, time
os.environ["CUDA_VISIBLE_DEVICES"]="0"
start = time.time()
very_start = time.time()

In [2]:
#import pandas as pd, 
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
#pd.set_option('display.max_columns', 500)
#pd.set_option('display.max_rows', 500)
import cudf, cupy, time, rmm
cudf.__version__

'0.16.0'

In [3]:
import dask as dask, dask_cudf
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import subprocess

In [4]:
'''cluster = LocalCUDACluster(ip='10.2.61.36',protocol="ucx", 
                           rmm_pool_size="14GB",
                           enable_tcp_over_ucx=True, enable_nvlink=True)'''
cluster = LocalCUDACluster()
client = Client(cluster)
client.run(cudf.set_allocator, "managed")
client

0,1
Client  Scheduler: tcp://127.0.0.1:35007  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 1  Memory: 109.84 GB


In [5]:
%%time

NP = 16
#path = './'#'/home/pinto/rapid_data/recsys'

df = dask_cudf.read_csv('/home/pinto/training.tsv', sep='\x01', header=None)#, dtype=DTYPES)
df = df.repartition(npartitions=NP)
df, = dask.persist(df)
_ = wait(df)
print('number of rows:',len(df))

number of rows: 106254462
CPU times: user 12.4 s, sys: 2.34 s, total: 14.8 s
Wall time: 10min 57s


In [6]:
%%time
features = [
    'text_tokens',    ###############
    'hashtags',       #Tweet Features
    'tweet_id',       #
    'media',          #
    'links',          #
    'domains',        #
    'tweet_type',     #
    'language',       #
    'timestamp',      ###############
    'a_user_id',              ###########################
    'a_follower_count',       #Engaged With User Features
    'a_following_count',      #
    'a_is_verified',          #
    'a_account_creation',     ###########################
    'b_user_id',              #######################
    'b_follower_count',       #Engaging User Features
    'b_following_count',      #
    'b_is_verified',          #
    'b_account_creation',     #######################
    'b_follows_a',    #################### Engagement Features
    'reply',          #Target Reply
    'retweet',        #Target Retweet    
    'retweet_comment',#Target Retweet with comment
    'like',           #Target Like
                      ####################
]
df.columns = features

df = df.drop('text_tokens', axis=1)
df, = dask.persist(df)
_ = wait(df)
df.head()

CPU times: user 6 s, sys: 624 ms, total: 6.62 s
Wall time: 6min 41s


Unnamed: 0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,a_user_id,a_follower_count,...,b_user_id,b_follower_count,b_following_count,b_is_verified,b_account_creation,b_follows_a,reply,retweet,retweet_comment,like
0,A3D61C03DBCE2F920350379414E0048B\t867E9AB76EFF...,29083D10151BD72E2459635EA38AC8B3,Photo,,,TopLevel,22C448FF81263D4BAF2A176145EE9EAD,1581472519,DA64544DF9826E9F284F4FF6CE624F7A,11005,...,00000523D726C61D483CC9AF99C69FF7,4,6,False,1568940527,True,,,,
1,,D717A0A251D46C3382D55CA36DDC7F33,,,,Quote,ECED8A16BE2A5E8871FD55F4842F16B1,1581124265,2D17D33032F1DFFAA740384DFE68C7BB,425,...,00000860E80C67D8C46CE57C64DE9444,225,185,False,1541013180,True,,,,1581174423.0
2,,2B880E288129B67CF644D6123C923EBF,,,,TopLevel,9BF3403E0EB7EA8A256DA9019C0B0716,1581166895,092FBF2F7989B4B817428B4AD44DCD37,298336,...,00000DEF82BE9EB5CFD07FB7DB94317B,4,72,False,1573996260,False,,,,1581201675.0
3,,AE9DD310CF14AFD4F1A95902521FF132,,,,TopLevel,22C448FF81263D4BAF2A176145EE9EAD,1581375781,054BA689B31AEFD429EE2DED113F92DA,1115,...,00000E0C9B364891CDE89ECFC54771DE,780,440,False,1432084055,True,,,,1581437254.0
4,,E5A28F213A16FF06D0CEE628F9A01E16,,,,Retweet,D3164C7FBCF2565DDF915B1B3AEFB1DC,1580996019,B28168B03223509298F983523265035C,63473,...,00000FAA76D55EEF9C4693D28473437B,180,32,False,1490688761,False,,,,


In [7]:
df.dtypes

hashtags               object
tweet_id               object
media                  object
links                  object
domains                object
tweet_type             object
language               object
timestamp               int64
a_user_id              object
a_follower_count        int64
a_following_count       int64
a_is_verified            bool
a_account_creation      int64
b_user_id              object
b_follower_count        int64
b_following_count       int64
b_is_verified            bool
b_account_creation      int64
b_follows_a              bool
reply                 float64
retweet               float64
retweet_comment       float64
like                  float64
dtype: object

In [8]:
%%time
df['id']   = 1
df['id']   = df['id'].cumsum()
df['id'] = df['id'].astype('int32')

df['reply']   = df['reply'].fillna(0)
df['retweet'] = df['retweet'].fillna(0)
df['retweet_comment'] = df['retweet_comment'].fillna(0)
df['like']    = df['like'].fillna(0)

df['reply']   = df['reply'].astype('int32')
df['retweet'] = df['retweet'].astype('int32')
df['retweet_comment'] = df['retweet_comment'].astype('int32')
df['like']    = df['like'].astype('int32')
df, = dask.persist(df)
_ = wait(df)

CPU times: user 2.83 s, sys: 280 ms, total: 3.11 s
Wall time: 1min 46s


In [9]:
%%time

df['timestamp']         = df['timestamp'].astype( np.int32 )
df['a_follower_count']  = df['a_follower_count'].astype( np.int32 )
df['a_following_count'] = df['a_following_count'].astype( np.int32 )
df['a_account_creation']= df['a_account_creation'].astype( np.int32 )
df['b_follower_count']  = df['b_follower_count'].astype( np.int32 )
df['b_following_count'] = df['b_following_count'].astype( np.int32 )
df['b_account_creation']= df['b_account_creation'].astype( np.int32 )

df, = dask.persist(df)
_ = wait(df)
df.head()

CPU times: user 2.11 s, sys: 360 ms, total: 2.47 s
Wall time: 1min 38s


Unnamed: 0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,a_user_id,a_follower_count,...,b_follower_count,b_following_count,b_is_verified,b_account_creation,b_follows_a,reply,retweet,retweet_comment,like,id
0,A3D61C03DBCE2F920350379414E0048B\t867E9AB76EFF...,29083D10151BD72E2459635EA38AC8B3,Photo,,,TopLevel,22C448FF81263D4BAF2A176145EE9EAD,1581472519,DA64544DF9826E9F284F4FF6CE624F7A,11005,...,4,6,False,1568940527,True,0,0,0,0,1
1,,D717A0A251D46C3382D55CA36DDC7F33,,,,Quote,ECED8A16BE2A5E8871FD55F4842F16B1,1581124265,2D17D33032F1DFFAA740384DFE68C7BB,425,...,225,185,False,1541013180,True,0,0,0,1581174423,2
2,,2B880E288129B67CF644D6123C923EBF,,,,TopLevel,9BF3403E0EB7EA8A256DA9019C0B0716,1581166895,092FBF2F7989B4B817428B4AD44DCD37,298336,...,4,72,False,1573996260,False,0,0,0,1581201675,3
3,,AE9DD310CF14AFD4F1A95902521FF132,,,,TopLevel,22C448FF81263D4BAF2A176145EE9EAD,1581375781,054BA689B31AEFD429EE2DED113F92DA,1115,...,780,440,False,1432084055,True,0,0,0,1581437254,4
4,,E5A28F213A16FF06D0CEE628F9A01E16,,,,Retweet,D3164C7FBCF2565DDF915B1B3AEFB1DC,1580996019,B28168B03223509298F983523265035C,63473,...,180,32,False,1490688761,False,0,0,0,0,5


In [10]:
df.dtypes

hashtags              object
tweet_id              object
media                 object
links                 object
domains               object
tweet_type            object
language              object
timestamp              int32
a_user_id             object
a_follower_count       int32
a_following_count      int32
a_is_verified           bool
a_account_creation     int32
b_user_id             object
b_follower_count       int32
b_following_count      int32
b_is_verified           bool
b_account_creation     int32
b_follows_a             bool
reply                  int32
retweet                int32
retweet_comment        int32
like                   int32
id                     int32
dtype: object

In [11]:
%%time
dv = dask_cudf.read_csv('/home/pinto/val.tsv', sep='\x01', header=None)
dv = dv.repartition(npartitions=NP)
dv, = dask.persist(dv)
_ = wait(dv)
print('number of rows:',len(dv))

number of rows: 9760684
CPU times: user 1 s, sys: 140 ms, total: 1.14 s
Wall time: 49.5 s


In [12]:
%%time
print(dv.head())
features = [
    'text_tokens',    ###############
    'hashtags',       #Tweet Features
    'tweet_id',       #
    'media',          #
    'links',          #
    'domains',        #
    'tweet_type',     #
    'language',       #
    'timestamp',      ###############
    'a_user_id',              ###########################
    'a_follower_count',       #Engaged With User Features
    'a_following_count',      #
    'a_is_verified',          #
    'a_account_creation',     ###########################
    'b_user_id',              #######################
    'b_follower_count',       #Engaging User Features
    'b_following_count',      #
    'b_is_verified',          #
    'b_account_creation',     #######################
    'b_follows_a',    #################### Engagement Features
    'reply',          #Target Reply
    'retweet',        #Target Retweet    
    'retweet_comment',#Target Retweet with comment
    'like',           #Target Like
                      ####################
]
dv.columns = features
dv = dv.drop('text_tokens', axis=1)
dv, = dask.persist(dv)
_ = wait(dv)

                                                   0     1  \
0  101\t10117\t140\t119\t142\t119\t152\t119\t1010...  <NA>   
1  101\t10105\t10817\t10124\t59232\t18121\t15629\...  <NA>   
2  101\t48561\t10116\t67737\t18554\t36371\t10989\...  <NA>   
3  101\t100055\t69940\t10414\t159\t11305\t11166\t...  <NA>   
4  101\t62154\t32221\t71843\t10143\t10237\t15507\...  <NA>   

                                  2      3                                 4  \
0  373C0F43762B7CEC1D75728BE8A33891   <NA>  A2CE3A1941BA410A1C31496C355EFCD7   
1  773A92D9E4824D06105C02BD044BB20A   <NA>                              <NA>   
2  218A6C27871801759F7380D7C41694A6   <NA>  5C683B5A29B308CADD0D7EFA7C9C32D3   
3  AB817EBA68064A0C8CBF4A6C059D92DC  Photo  E925556EE312213AD98C4D9F131D7A8D   
4  349120C1E2801857530393F16D4653A5   <NA>                              <NA>   

                                  5         6  \
0  E14AF8A8D257BB47587843FE7D08382B  TopLevel   
1                              <NA>     Quote   

In [13]:
%%time

dv['reply']           = 0
dv['retweet']         = 0
dv['retweet_comment'] = 0
dv['like']            = 0

dv['id']   = 1
dv['id']   = dv['id'].cumsum()
dv['id'] = dv['id'] + len(df)
dv['id'] = dv['id'].astype('int32')

dv['reply']           = dv['reply'].astype( np.int32 )
dv['retweet']         = dv['retweet'].astype( np.int32 )
dv['retweet_comment'] = dv['retweet_comment'].astype( np.int32 )
dv['like']            = dv['like'].astype( np.int32 )

dv['timestamp']         = dv['timestamp'].astype( np.int32 )
dv['a_follower_count']  = dv['a_follower_count'].astype( np.int32 )
dv['a_following_count'] = dv['a_following_count'].astype( np.int32 )
dv['a_account_creation']= dv['a_account_creation'].astype( np.int32 )
dv['b_follower_count']  = dv['b_follower_count'].astype( np.int32 )
dv['b_following_count'] = dv['b_following_count'].astype( np.int32 )
dv['b_account_creation']= dv['b_account_creation'].astype( np.int32 )
dv, = dask.persist(dv)
_ = wait(dv)

CPU times: user 1.5 s, sys: 112 ms, total: 1.62 s
Wall time: 42 s


In [14]:
%%time
dt = dask_cudf.read_csv(f'/home/pinto/test.tsv', sep='\x01', header=None)
dt = dt.repartition(npartitions=NP)
dt, = dask.persist(dt)
_ = wait(dt)

CPU times: user 940 ms, sys: 160 ms, total: 1.1 s
Wall time: 49.9 s


In [15]:
%%time
print(dt.head())
features = [
    'text_tokens',    ###############
    'hashtags',       #Tweet Features
    'tweet_id',       #
    'media',          #
    'links',          #
    'domains',        #
    'tweet_type',     #
    'language',       #
    'timestamp',      ###############
    'a_user_id',              ###########################
    'a_follower_count',       #Engaged With User Features
    'a_following_count',      #
    'a_is_verified',          #
    'a_account_creation',     ###########################
    'b_user_id',              #######################
    'b_follower_count',       #Engaging User Features
    'b_following_count',      #
    'b_is_verified',          #
    'b_account_creation',     #######################
    'b_follows_a',    #################### Engagement Features
    'reply',          #Target Reply
    'retweet',        #Target Retweet    
    'retweet_comment',#Target Retweet with comment
    'like',           #Target Like
                      ####################
]
dt.columns = features
dt = dt.drop('text_tokens', axis=1)
dt, = dask.persist(dt)
_ = wait(dt)
print('number of rows:',len(dt))

                                                   0  \
0  101\t3100\t5477\t3028\t4348\t1924\t111806\t186...   
1  101\t56898\t137\t36110\t10400\t168\t64062\t131...   
2  101\t10117\t23672\t12610\t86532\t11205\t90138\...   
3  101\t13690\t14372\t119\t119\t19281\t25444\t984...   
4  101\t56898\t137\t171\t64791\t168\t11499\t10330...   

                                                   1  \
0                                               <NA>   
1  024FE90EC2C01B3CDC46A5A90D66B020\t1B78BDD9C7FF...   
2                                               <NA>   
3                                               <NA>   
4  9533703A3BB1CBA49AC0547C4A9F8043\t4C57C1D9063D...   

                                  2      3     4     5         6  \
0  04746004AA1F5498834CE7A4C6343D1A   <NA>  <NA>  <NA>  TopLevel   
1  B5C4CBE185831F3E5A58A4D81118D4C7   <NA>  <NA>  <NA>   Retweet   
2  8747BA89E3245FE4097C9E4E0AE40862  Video  <NA>  <NA>  TopLevel   
3  1CD2139E249349CBD5A0576D163C5EF8  Photo  <NA>  <NA>

In [16]:
%%time
dt['reply']           = 0
dt['retweet']         = 0
dt['retweet_comment'] = 0
dt['like']            = 0

dt['id']   = 1
dt['id']   = dt['id'].cumsum()
dt['id']   = dt['id']+len(df)+len(dv)
dt['id']   = dt['id'].astype('int32')

dt['reply']           = dt['reply'].astype( np.int32 )
dt['retweet']         = dt['retweet'].astype( np.int32 )
dt['retweet_comment'] = dt['retweet_comment'].astype( np.int32 )
dt['like']            = dt['like'].astype( np.int32 )

dt['timestamp']         = dt['timestamp'].astype( np.int32 )
dt['a_follower_count']  = dt['a_follower_count'].astype( np.int32 )
dt['a_following_count'] = dt['a_following_count'].astype( np.int32 )
dt['a_account_creation']= dt['a_account_creation'].astype( np.int32 )
dt['b_follower_count']  = dt['b_follower_count'].astype( np.int32 )
dt['b_following_count'] = dt['b_following_count'].astype( np.int32 )
dt['b_account_creation']= dt['b_account_creation'].astype( np.int32 )

dt, = dask.persist(dt)
_ = wait(dt)
print(df.shape,dv.shape,dt.shape)
dt.head()

(Delayed('int-9ddd6ef8-7e76-497c-9621-832987bac313'), 24) (Delayed('int-9fb6bc9d-dcf4-4064-b52d-4eba96857a8c'), 24) (Delayed('int-8130f1bb-0c8c-40ea-b182-bd835ac37f37'), 24)
CPU times: user 1.7 s, sys: 200 ms, total: 1.9 s
Wall time: 46.3 s


Unnamed: 0,hashtags,tweet_id,media,links,domains,tweet_type,language,timestamp,a_user_id,a_follower_count,...,b_follower_count,b_following_count,b_is_verified,b_account_creation,b_follows_a,reply,retweet,retweet_comment,like,id
0,,04746004AA1F5498834CE7A4C6343D1A,,,,TopLevel,22C448FF81263D4BAF2A176145EE9EAD,1581759640,6720CC7830F94CB7465CA283300DB010,119,...,111,673,False,1478011810,True,0,0,0,0,116015147
1,024FE90EC2C01B3CDC46A5A90D66B020\t1B78BDD9C7FF...,B5C4CBE185831F3E5A58A4D81118D4C7,,,,Retweet,22C448FF81263D4BAF2A176145EE9EAD,1581668217,7DDC67265CFB6E0B4820E0BD0E33A8D3,189,...,111,673,False,1478011810,True,0,0,0,0,116015148
2,,8747BA89E3245FE4097C9E4E0AE40862,Video,,,TopLevel,D3164C7FBCF2565DDF915B1B3AEFB1DC,1581876000,1C0B9850DE2A34BA8B8DF413F5C13233,405907,...,15,123,False,1385502405,False,0,0,0,0,116015149
3,,1CD2139E249349CBD5A0576D163C5EF8,Photo,,,TopLevel,D3164C7FBCF2565DDF915B1B3AEFB1DC,1581646803,94FBB99CE36D7A152DABA57CEF62F75E,111841,...,141,955,False,1335110299,False,0,0,0,0,116015150
4,9533703A3BB1CBA49AC0547C4A9F8043\t4C57C1D9063D...,BDFA89609AC3EDDE250DCECD964B5890,Photo,,,Retweet,22C448FF81263D4BAF2A176145EE9EAD,1581623428,311BB1E051070C2639BC632ECDE69C43,3577,...,139,953,False,1335110299,False,0,0,0,0,116015151


In [17]:
train_size = len(df)#.shape[0]
test0_size = len(dv)#.shape[0]
test1_size = len(dt)#.shape[0]
print(train_size,test0_size,test1_size)

106254462 9760684 9765321


In [18]:
%%time
df = dask_cudf.concat( [df,dv,dt] )
df, = dask.persist(df)
wait(df)
del dv, dt

CPU times: user 836 ms, sys: 48 ms, total: 884 ms
Wall time: 34.4 s


In [19]:
df.head()['language']

0    22C448FF81263D4BAF2A176145EE9EAD
1    ECED8A16BE2A5E8871FD55F4842F16B1
2    9BF3403E0EB7EA8A256DA9019C0B0716
3    22C448FF81263D4BAF2A176145EE9EAD
4    D3164C7FBCF2565DDF915B1B3AEFB1DC
Name: language, dtype: object

In [20]:
df.info()

<class 'dask_cudf.core.DataFrame'>
Columns: 24 entries, hashtags to id
dtypes: object(9), bool(3), int32(12)

In [21]:
print(df.npartitions,len(df))

48 125780467


In [22]:
"""
%%time
df['tweet_id'] = df['tweet_id'].map_partitions(lambda cudf:cudf.hash_encode(stop=1_000_000_000))
df['tweet_id'] = df['tweet_id'].astype( np.int32 )
#df['tweet_id'] = df['tweet_id'].map_partitions(lambda cudf:cudf.hash_values()%1_000_000_000)
df, = dask.persist(df)
_ = wait(df)
df.head()
"""

"\n%%time\ndf['tweet_id'] = df['tweet_id'].map_partitions(lambda cudf:cudf.hash_encode(stop=1_000_000_000))\ndf['tweet_id'] = df['tweet_id'].astype( np.int32 )\n#df['tweet_id'] = df['tweet_id'].map_partitions(lambda cudf:cudf.hash_values()%1_000_000_000)\ndf, = dask.persist(df)\n_ = wait(df)\ndf.head()\n"

In [23]:
%%time
df['media'] = df['media'].fillna( '' )
def split_join(ds,sep):
    df = ds.str.split(sep)
    df[0] = df[0].fillna('')
    df[1] = df[1].fillna('')
    res = df[0]+'_'+df[1]
    del df
    return res

df['media'] = df['media'].map_partitions( lambda x:  split_join(x,'\t'), meta=('O'))

df, = dask.persist(df)
_ = wait(df)

CPU times: user 1.2 s, sys: 124 ms, total: 1.33 s
Wall time: 50.8 s


In [24]:
def factorize_small_cardinality(df,col):
    tmp_col = f'{col}_encode'
    tmp = df[col].unique().compute()
    tmp = tmp.to_frame().reset_index()
    tmp = tmp.rename(columns={'index':tmp_col})
    df = df.merge(tmp,on=col,how='left')
    df, = dask.persist(df)
    wait(df)
    head=df.head()
    del tmp
    df = df.drop(col,axis=1)
    df, = dask.persist(df)
    wait(df)
    df.columns = [i if i!=tmp_col else col for i in df.columns ]
    return df,head

In [25]:
%%time
for col in ['language','tweet_type','media']:
    df,_ = factorize_small_cardinality(df,col)
    df[col] = df[col].astype('int8')

CPU times: user 8.59 s, sys: 1.36 s, total: 9.95 s
Wall time: 6min 32s


In [26]:
df.head()

Unnamed: 0,hashtags,tweet_id,links,domains,timestamp,a_user_id,a_follower_count,a_following_count,a_is_verified,a_account_creation,...,b_account_creation,b_follows_a,reply,retweet,retweet_comment,like,id,language,tweet_type,media
0,37B3F45323C0CCD6B6CE796F9A68E124,CA6AA78C0AC04D479474B710C312D036,86FEFBC0AC1A08E6C3BEA98E848A97CE,509137044637B3E19DF7A4EC7A119BBA,1581167076,80479D9D1E0E28C6215157C6D5C2C20B,96739,297,False,1355952612,...,1422964994,False,0,0,0,0,10081,54,2,12
1,E78674D323461112D0DCF8010AF5AED3\t90737D9DA900...,D3C01D1AD3E6C932A6D08983893EB76C,11F5E00A38937E0E24BE7201F76851EC,C0F5328C1D36CD4B34D1808012E18D46,1580989938,B0495577F2754B0D221ADE4D67F3B684,382,411,False,1537091895,...,1555698595,True,0,0,0,0,10082,11,2,12
2,18F6AC96A6EA62716A47DE9FE6241534\t7CFE679B7755...,4C4DABE2920149B9D9CB34DB50159026,,,1581038292,93BF527AA88D3CD3743457108C622B82,13996,388,False,1513796755,...,1574558351,False,0,0,0,0,10083,59,2,12
3,,7C4F1639637486BA1D7B36C10244789C,,,1581084791,FF5A24BD9886D71DDD7919946A882C63,66737,2087,False,1299341178,...,1551442966,False,0,0,0,0,10084,11,2,12
4,,85E58F119D7FE02C9FF795537E1821FC,CC06F03EAF56559C2F8093C37169EDA0,D7BDB227039444E6FA817EB72C26CD30,1581466892,E4FE7F4D0DA8EA3C6DD49C685B7EA030,408628,660,False,1451278401,...,1426944244,False,0,1581505553,0,1581505553,10085,54,2,8


In [27]:
%%time
tweet = df[['tweet_id']]
tweet = tweet.drop_duplicates(split_out=16)
tweet['tweet_encode'] = 1
tweet['tweet_encode'] = tweet['tweet_encode'].cumsum()
tweet, = dask.persist(tweet)
_ = wait(tweet)
tweet.head()

CPU times: user 1.92 s, sys: 196 ms, total: 2.11 s
Wall time: 52.8 s


Unnamed: 0,tweet_id,tweet_encode
975391,0000012429A02D1B5C871FBA53A0C4DD,1
4511485,0000043B5500353E778A6B78498EE7CD,2
5773967,000004B4208284C156C06BCFAB500ACC,3
5075595,00000BF2119CD5F74998D3D407F15DB8,4
6259239,000016B0820BCC23039DCFD5A02EBA61,5


In [28]:
%%time
df = df.merge(tweet,on='tweet_id',how='left')
df = df.drop('tweet_id',axis=1)
df.columns = [i if i!='tweet_encode' else 'tweet_id' for i in df.columns]
df, = dask.persist(df)
wait(df)
del tweet
df.head()

CPU times: user 8.36 s, sys: 1.05 s, total: 9.41 s
Wall time: 6min 2s


Unnamed: 0,hashtags,links,domains,timestamp,a_user_id,a_follower_count,a_following_count,a_is_verified,a_account_creation,b_user_id,...,b_follows_a,reply,retweet,retweet_comment,like,id,language,tweet_type,media,tweet_id
0,B592E67CFDD7B3CA93EBDA75BF793889,,,1581492458,3D46BE75D48CD0D549AC23351E1AAC9F,437,359,False,1517054466,134AFF5444E4578F1095D420EC733AB5,...,True,0,0,0,1581523161,381496,11,2,4,1816422
1,91589A956DCCB3FD9BB8EC120D3A59B4,A1CD0D75F875FDC59CF559613F6D7DD9,55DE40D8922065020FA388C82283797F,1581358453,06F5FE82DD3C1415761756636BEAD4AC,216517,58,True,1237147176,06DA9A4574AAEEF352E2B3255E4E8414,...,False,0,0,0,0,135195,59,2,12,1586775
2,,,,1581238990,0E6693445A986F0121C72C5110216449,776,967,False,1359794025,019322461D2EFF54F08A0951D4FC50C1,...,True,0,0,0,0,31037,22,2,12,477892
3,,,,1581245796,A65B63E1480392A8ED89A9BB9146D856,538480,161,False,1316441204,0A795B0F00E2AED79F420D0997EB2BF6,...,False,0,0,0,1581248985,206522,38,2,12,1927568
4,70A6CF176129B0FB258ABD68A0146048\tE053DD16CE35...,,,1581029785,8CC882EA51AAC05E586EC06C4A4EED09,617,508,False,1390451780,1CC51F1B465A6F3A07B68A1D56518711,...,False,0,0,0,0,568919,54,1,4,121397


In [29]:
#%%time
#df = df.repartition(npartitions=1024)
#df, = dask.persist(df)
#_ = wait(df)

In [30]:
%%time
user_a = df[['a_user_id']].drop_duplicates(split_out=16)
user_a, = dask.persist(user_a)
_ = wait(user_a)
user_b = df[['b_user_id']].drop_duplicates(split_out=16)
user_b, = dask.persist(user_b)
wait(user_b)
print(len(user_a),len(user_b),len(df))

user_a.columns = ['user_id']
user_b.columns = ['user_id']
user_b['dummy'] = 1
user_a = user_a.merge(user_b,on='user_id',how='outer')
user_a = user_a.drop('dummy',axis=1)
user_a, = dask.persist(user_a)
wait(user_a)
print(len(user_a),len(user_b),len(df))
del user_b

user_a['user_encode'] = 1
user_a['user_encode'] = user_a['user_encode'].cumsum()
user_a, = dask.persist(user_a)
_ = wait(user_a)

12074272 23311202 125780467
28332400 23311202 125780467
CPU times: user 3.62 s, sys: 388 ms, total: 4.01 s
Wall time: 1min 50s


In [31]:
%%time
df = df.merge(user_a,left_on='a_user_id',right_on='user_id',how='left')
df = df.drop(['a_user_id','user_id'],axis=1)
df.columns = [i if i!='user_encode' else 'a_user_id' for i in df.columns]
df, = dask.persist(df)
_ = wait(df)

CPU times: user 4.82 s, sys: 600 ms, total: 5.42 s
Wall time: 3min 29s


In [32]:
%%time
df = df.merge(user_a,left_on='b_user_id',right_on='user_id',how='left')
df = df.drop(['b_user_id','user_id'],axis=1)
df.columns = [i if i!='user_encode' else 'b_user_id' for i in df.columns]
df, = dask.persist(df)
wait(df)
del user_a
df.head()

CPU times: user 4.44 s, sys: 624 ms, total: 5.07 s
Wall time: 2min 58s


Unnamed: 0,hashtags,links,domains,timestamp,a_follower_count,a_following_count,a_is_verified,a_account_creation,b_follower_count,b_following_count,...,retweet,retweet_comment,like,id,language,tweet_type,media,tweet_id,a_user_id,b_user_id
0,,8957DD51710FC1C3DE81C337B459C1C9,E0EEE18B1940D8AE41CEA5B15DC421D3,1581192974,3214807,0,True,1272597682,510,393,...,0,0,0,28448498,3,2,12,34312466,1717222,615098
1,,,,1581508730,108597,551,True,1379713973,60,275,...,0,0,1581520847,64296468,54,2,12,38483937,1220979,709874
2,,,,1581201226,603,1340,False,1373559819,34,79,...,0,0,0,65627264,59,0,12,34241830,1284185,995240
3,,,,1581464495,69236,882,False,1394066296,23,143,...,0,0,1581473414,66579081,54,2,12,28207885,1337565,147284
4,,,,1581195600,1911234,4327,True,1372717142,176,594,...,0,0,0,45322295,54,2,8,11378919,1486122,984386


In [33]:
%%time
df = df.repartition(npartitions=NP)
df, = dask.persist(df)
_ = wait(df)

CPU times: user 148 ms, sys: 32 ms, total: 180 ms
Wall time: 6.76 s


In [34]:
%%time
df.to_parquet('./dask_input/step1_output',write_index=False)

CPU times: user 1.05 s, sys: 204 ms, total: 1.25 s
Wall time: 57.3 s


In [35]:
print('This notebook took %.1f minutes'%((time.time()-very_start)/60.))

This notebook took 49.8 minutes
