In [1]:
# fix jupyter notebook python path
import os,sys,inspect

current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
root_dir = os.path.dirname(parent_dir)
sys.path.insert(0, root_dir) 

print(f"{current_dir=}, {parent_dir=}, {root_dir}")

current_dir='/home/luca/Recommender Systems/recsys-challenge-2021-twitter/Utils/Preprocessing', parent_dir='/home/luca/Recommender Systems/recsys-challenge-2021-twitter/Utils', /home/luca/Recommender Systems/recsys-challenge-2021-twitter


In [2]:
import dask
import dask.dataframe as dd
import RootPath
import pandas as pd
import numpy as np
import gc
import itertools
import sys
import gzip
import pickle
from typing import List

### Launch client to monitor status
Very nice visualization of resource management at the dashboard!q

In [3]:
#from dask.distributed import Client,wait,LocalCluster

#import psutil
#dict(psutil.virtual_memory()._asdict())

# i have only 8GB, so i try to leave some free space
# each worker seem to need like 3 GB, to load one of the CSVs (1.4GB), keep in memory a dict ~500MB, and be able to still do things
cluster = LocalCluster(
   n_workers=1, threads_per_worker=8, memory_limit="6GB"
)
client = Client(cluster) # use default n_threads and mem
client

In [4]:
client.cluster

In [5]:
#some extra functions that may be useful to investigate memory in workers
# def collect():
#     import gc
#     gc.collect()
#
# def investigate_df(type_to_investigate: type = pd.DataFrame):
#     return [sys.getsizeof(obj) for obj in gc.get_objects() if isinstance(obj, type_to_investigate)]
#
# def investigate():
#     return sum([sys.getsizeof(obj) for obj in gc.get_objects()])


### Load data with correct labels and type

In [6]:
# all_features = [
#     "raw_feature_tweet_text_token",
#     "raw_feature_tweet_hashtags",
#     "raw_feature_tweet_id",
#     "raw_feature_tweet_media",
#     "raw_feature_tweet_links",
#     "raw_feature_tweet_domains",
#     "raw_feature_tweet_type",
#     "raw_feature_tweet_language",
#     "raw_feature_tweet_timestamp",
#     "raw_feature_creator_id",
#     "raw_feature_creator_follower_count",
#     "raw_feature_creator_following_count",
#     "raw_feature_creator_is_verified",
#     "raw_feature_creator_creation_timestamp",
#     "raw_feature_engager_id",
#     "raw_feature_engager_follower_count",
#     "raw_feature_engager_following_count",
#     "raw_feature_engager_is_verified",
#     "raw_feature_engager_creation_timestamp",
#     "raw_feature_engagement_creator_follows_engager"
#     ]

all_features_dtype = {
    "raw_feature_tweet_text_token": pd.StringDtype(),
    "raw_feature_tweet_hashtags": pd.StringDtype(),
    "raw_feature_tweet_id": pd.StringDtype(),
    "raw_feature_tweet_media": pd.StringDtype(),
    "raw_feature_tweet_links": pd.StringDtype(),
    "raw_feature_tweet_domains": pd.StringDtype(),
    "raw_feature_tweet_type": pd.StringDtype(),
    "raw_feature_tweet_language": pd.StringDtype(),
    "tweet_timestamp": pd.Int64Dtype(),
    "raw_feature_creator_id": pd.StringDtype(),
    "creator_follower_count": pd.UInt32Dtype(),
    "creator_following_count": pd.UInt32Dtype(),
    "creator_is_verified": pd.BooleanDtype(),
    "creator_creation_timestamp": pd.Int64Dtype(),
    "raw_feature_engager_id": pd.StringDtype(),
    "engager_follower_count": pd.UInt32Dtype(),
    "engager_following_count": pd.UInt32Dtype(),
    "engager_is_verified": pd.BooleanDtype(),
    "engager_creation_timestamp": pd.Int64Dtype(),
    "engagement_creator_follows_engager": pd.BooleanDtype(),
    "engagement_reply_timestamp": pd.Int64Dtype(),
    "engagement_retweet_timestamp": pd.Int64Dtype(),
    "engagement_comment_timestamp": pd.Int64Dtype(),
    "engagement_like_timestamp": pd.Int64Dtype()
}
# all_labels = [
#     "engagement_reply_timestamp",
#     "engagement_retweet_timestamp",
#     "engagement_comment_timestamp",
#     "engagement_like_timestamp"
# ]
# all_labels_dtype = {
#     "engagement_reply_timestamp": pd.Int32Dtype(),
#     "engagement_retweet_timestamp": pd.Int32Dtype(),
#     "engagement_comment_timestamp": pd.Int32Dtype(),
#     "engagement_like_timestamp": pd.Int32Dtype()
# }

# mapped_features_dtype = {
#     "decoded_tweet_text_token": pd.StringDtype(),
#     "mapped_tweet_hashtags": 'O',
#     "mapped_tweet_id": pd.UInt32Dtype(),
#     "number_of_photo": pd.UInt8Dtype(),
#     "number_of_gif": pd.UInt8Dtype(),
#     "number_of_video": pd.UInt8Dtype(),
#     "mapped_tweet_links": 'O',
#     "mapped_tweet_domains": 'O',
#     "mapped_tweet_type": pd.UInt8Dtype(),
#     "mapped_tweet_language": pd.UInt8Dtype(),
#     "tweet_timestamp": pd.UInt32Dtype(),
#     "mapped_creator_id": pd.UInt32Dtype() ,
#     "creator_follower_count": pd.UInt8Dtype(),
#     "creator_following_count": pd.UInt8Dtype(),
#     "creator_is_verified": pd.BooleanDtype(),
#     "creator_creation_timestamp": pd.UInt32Dtype(),
#     "mapped_engager_id": pd.UInt32Dtype(),
#     "engager_follower_count": pd.UInt8Dtype(),
#     "engager_following_count": pd.UInt8Dtype(),
#     "engager_is_verified": pd.BooleanDtype(),
#     "engager_creation_timestamp": pd.UInt32Dtype(),
#     "engagement_creator_follows_engager": pd.BooleanDtype(),
#     "engagement_reply_timestamp": pd.UInt32Dtype(),
#     "engagement_retweet_timestamp": pd.UInt32Dtype(),
#     "engagement_comment_timestamp": pd.UInt32Dtype(),
#     "engagement_like_timestamp": pd.UInt32Dtype()
# }


In [7]:
original_dataset_path = f"{RootPath.get_dataset_path()}/part-*"
#original_dataset_path = f"{RootPath.get_dataset_path()}/part-00000"
dataset_path = f"{RootPath.get_dataset_path()}/Temp/full_dataset"
output_path = f"{RootPath.get_dataset_path()}/Preprocessed/dataset.tsv"
temp_output_path = f"{RootPath.get_dataset_path()}/Preprocessed/Temp/"
dict_path = f"{RootPath.get_dataset_path()}/Preprocessed/Dictionary/"

In [41]:
def create_dict_feature(series: dask.dataframe.Series, out_type: type) -> (dict,pd.DataFrame):
    feature_name = series.name
    feature_name_encode = feature_name + "_encode"

    mapping = series.drop_duplicates().to_frame() #create a dataframe from a series
    mapping[feature_name_encode] = 1
    mapping[feature_name_encode] = mapping[feature_name_encode].cumsum()
    mapping[feature_name_encode] = mapping[feature_name_encode].astype(out_type)
    mapping, = dask.compute(mapping)

    # define mapping dicts
    direct_dict = dict(zip(mapping[feature_name], mapping[feature_name_encode]))

    return direct_dict, mapping

def create_dict_feature_to_split(series: dask.dataframe.Series, sep: str, out_type: type) -> (dict,pd.DataFrame):
    feature_name = series.name
    feature_name_encode = feature_name + "_encode"

    #map partition internal function goes from series to dataframe
    mapping = series\
        .map_partitions(lambda s: pd.DataFrame([hashtag for line in s.dropna() for hashtag in line.split(sep)], columns=[feature_name]),
                       meta={feature_name:pd.StringDtype()})\
        .drop_duplicates()
    mapping[feature_name_encode] = 1
    mapping[feature_name_encode] = mapping[feature_name_encode].cumsum()
    mapping[feature_name_encode] = mapping[feature_name_encode].astype(out_type)
    mapping, = dask.compute(mapping)

    gc.collect()

    # define mapping dicts
    direct_dict = dict(zip(mapping[feature_name], mapping[feature_name_encode]))
    # manage nans
    #direct_dict[pd.NA] = None
    #inverse_dict[None] = pd.NA

    gc.collect()

    return direct_dict, mapping


def map_column_single_value(series: dask.dataframe.Series, dictionary: dict, name_out:str, out_type: type) -> dask.dataframe.Series:
    return series\
        .apply(lambda x: dictionary[x], # if x is not pd.NA else None, #Nans to be managed outside, manual entry in dict if you like
               meta=pd.Series(dtype=out_type, name=name_out))


def map_column_array(series: dask.dataframe.Series, dictionary: dict, sep: str, name_out:str, out_type: type, nan_symbol) -> dask.dataframe.Series:
    return series\
        .apply(lambda x: np.array([dictionary[y] for y in x.split(sep)], dtype=out_type)
                                    if x is not nan_symbol else np.array([]),
               meta=pd.Series(dtype='O', name=name_out))

### Create intermediate parquet full dataset

In [9]:
%%time
# Read data
df = dd.read_csv(original_dataset_path,
                 sep='\x01',
                 names=all_features_dtype.keys(),
                 dtype=all_features_dtype,
                 )

CPU times: user 9.28 ms, sys: 9.58 ms, total: 18.9 ms
Wall time: 24.7 ms


In [10]:
%%time
# Write to parquet
df.to_parquet(dataset_path, write_index=False, compression="snappy", engine="pyarrow", overwrite="True")

CPU times: user 2min 4s, sys: 10.4 s, total: 2min 15s
Wall time: 1min 33s


In [11]:
df = dd.read_parquet(dataset_path,
                     engine='pyarrow')
df

Unnamed: 0_level_0,raw_feature_tweet_text_token,raw_feature_tweet_hashtags,raw_feature_tweet_id,raw_feature_tweet_media,raw_feature_tweet_links,raw_feature_tweet_domains,raw_feature_tweet_type,raw_feature_tweet_language,tweet_timestamp,raw_feature_creator_id,creator_follower_count,creator_following_count,creator_is_verified,creator_creation_timestamp,raw_feature_engager_id,engager_follower_count,engager_following_count,engager_is_verified,engager_creation_timestamp,engagement_creator_follows_engager,engagement_reply_timestamp,engagement_retweet_timestamp,engagement_comment_timestamp,engagement_like_timestamp
npartitions=48,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,string,string,string,string,string,string,string,string,Int64,string,UInt32,UInt32,boolean,Int64,string,UInt32,UInt32,boolean,Int64,boolean,Int64,Int64,Int64,Int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [12]:
df.npartitions

48

In [13]:
df["raw_feature_tweet_domains"].head(10)

0                                <NA>
1    9EFF000CDB18B710CDDB43EE1D8C300B
2    D56FA7843AF6F2BC53A2E192B542EA58
3                                <NA>
4                                <NA>
5                                <NA>
6                                <NA>
7                                <NA>
8                                <NA>
9    F595B7DE8992A3D8C7948B4E81419D78
Name: raw_feature_tweet_domains, dtype: string

In [14]:
del df
gc.collect()


36

### Map creator_id, engager_id

In [15]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                         "raw_feature_creator_id",
                         "raw_feature_engager_id"
                     ],
                     engine='pyarrow')

CPU times: user 12.3 ms, sys: 0 ns, total: 12.3 ms
Wall time: 11.3 ms


In [16]:
%%time
# Create Dict
dir_user_id, _ = create_dict_feature(df["raw_feature_creator_id"].append(df["raw_feature_engager_id"]).rename("raw_user_id"), np.uint32)

CPU times: user 23 s, sys: 1.42 s, total: 24.4 s
Wall time: 22.1 s


In [17]:
# Map the feature
out_creator_id = map_column_single_value(df["raw_feature_creator_id"],
                                         dir_user_id,
                                         "mapped_creator_id",
                                         np.uint32)\
    .to_frame()

out_engager_id = map_column_single_value(df["raw_feature_engager_id"],
                                         dir_user_id,
                                         "mapped_engager_id",
                                         np.uint32)\
    .to_frame()

In [18]:
%%time
# Write the output dataset
out_creator_id.to_parquet(temp_output_path+"mapped_creator_id", write_index=False, compression="snappy", engine="pyarrow", overwrite="True")
out_engager_id.to_parquet(temp_output_path+"mapped_engager_id", write_index=False, compression="snappy", engine="pyarrow", overwrite="True")

CPU times: user 19.3 s, sys: 1.59 s, total: 20.9 s
Wall time: 13.6 s


In [19]:
%%time
# Write the dicts
with gzip.GzipFile(dict_path + "mapped_user_id" + "_dict", 'wb') as file:
    pickle.dump(dir_user_id, file, protocol=pickle.HIGHEST_PROTOCOL)

CPU times: user 34.4 s, sys: 244 ms, total: 34.6 s
Wall time: 34.8 s


In [23]:
out_engager_id.min().compute()

mapped_engager_id    16
dtype: int64

In [24]:
out_engager_id.max().compute()

mapped_engager_id    5780065
dtype: int64

In [25]:
# Clean variables
del dir_user_id, out_creator_id, out_engager_id, df
gc.collect()

15

### Map media

In [26]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                             "raw_feature_tweet_media"
                     ],
                     engine='pyarrow')


CPU times: user 14.7 ms, sys: 340 µs, total: 15 ms
Wall time: 13.9 ms


In [27]:
%%time
# Create Dict
media_dict = {
    "Photo":0,
    "GIF": 1,
    "Video": 2
}

columns_types = {
    "number_of_photo": np.uint8,
    "number_of_gif": np.uint8,
    "number_of_video": np.uint8,
}

# Map the feature

#Function mapping each list of splitted strings into the 3 counting columns
def count_media_types(vec: List[str]) -> List[int]:
    ret = [0 for _ in range(3)]
    for x in vec:
        if x != "":
            ret[media_dict[x]] +=1
    return ret

#Function responsible of mapping count_media_types and collecting result in a coherent pd.Dataframe
def to_map_on_media_col(media_col: pd.Series) -> pd.DataFrame:
    return pd.DataFrame(media_col.map(count_media_types).to_list(), columns=columns_types.keys())

#Function mapping each big raw string into the list of splitted strings and calling next funcs
out_media = df['raw_feature_tweet_media']\
    .fillna("")\
    .map_partitions(lambda s: to_map_on_media_col(s.str.split("\t")),
                    meta=columns_types)


CPU times: user 4.1 ms, sys: 0 ns, total: 4.1 ms
Wall time: 3.55 ms


In [28]:
%%time
# Write the output dataset
out_media.to_parquet(temp_output_path+"counted_media", write_index=False, compression="snappy", engine="pyarrow", overwrite="True")

CPU times: user 14.4 s, sys: 384 ms, total: 14.7 s
Wall time: 13.7 s


In [29]:
out_media.head(10)

Unnamed: 0,number_of_photo,number_of_gif,number_of_video
0,0,0,0
1,0,0,0
2,0,0,0
3,0,0,0
4,1,0,0
5,0,0,0
6,0,0,0
7,0,0,0
8,0,0,0
9,0,0,0


In [30]:
# Clean variables
del out_media, media_dict, columns_types, df
gc.collect()

33

### Map Links

In [31]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                             "raw_feature_tweet_links"
                     ],
                     engine='pyarrow')

CPU times: user 11.2 ms, sys: 0 ns, total: 11.2 ms
Wall time: 10.6 ms


In [39]:
df

Unnamed: 0_level_0,raw_feature_tweet_links
npartitions=48,Unnamed: 1_level_1
,string
,...
...,...
,...
,...


In [32]:
%%time
# Create Dict
dict_links_id, mapping = create_dict_feature_to_split(df["raw_feature_tweet_links"], '\t', np.uint32)

CPU times: user 2.69 s, sys: 64.1 ms, total: 2.75 s
Wall time: 1.94 s


In [42]:
# Map the feature
out_links_id = map_column_array(df["raw_feature_tweet_links"].fillna(""), dict_links_id, '\t', "mapped_tweet_links", np.uint32, "") \
    .to_frame()

In [43]:
%%time
# Write the output dataset
out_links_id.to_parquet(temp_output_path+"links", write_index=False, compression="snappy", engine="pyarrow", overwrite="True")

CPU times: user 8.2 s, sys: 213 ms, total: 8.41 s
Wall time: 7.68 s


In [44]:
out_links_id

Unnamed: 0_level_0,mapped_tweet_links
npartitions=48,Unnamed: 1_level_1
,object
,...
...,...
,...
,...


In [45]:
out_links_id.head()

Unnamed: 0,mapped_tweet_links
0,[]
1,[1]
2,[2]
3,[]
4,[]


In [46]:
%%time
# Write the dicts
with gzip.GzipFile(dict_path + "mapped_tweet_links" + "_dict", 'wb') as file:
    pickle.dump(dict_links_id, file, protocol=pickle.HIGHEST_PROTOCOL)

CPU times: user 2.76 s, sys: 8.29 ms, total: 2.77 s
Wall time: 2.8 s


In [47]:
# Clean variables
del dict_links_id, out_links_id, mapping, df
gc.collect()

10799

### Map Domains

In [48]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                             "raw_feature_tweet_domains"
                     ],
                     engine='pyarrow')

CPU times: user 11.7 ms, sys: 106 µs, total: 11.8 ms
Wall time: 11.1 ms


In [49]:
%%time
# Create Dict
dict_domains_id, mapping = create_dict_feature_to_split(df["raw_feature_tweet_domains"], '\t', np.uint32)

CPU times: user 1.93 s, sys: 84.8 ms, total: 2.01 s
Wall time: 1.47 s


In [50]:
# Map the feature
out_domains_id = map_column_array(df["raw_feature_tweet_domains"].fillna(""), dict_domains_id, '\t', "mapped_domains", np.uint32, "") \
    .to_frame()

In [51]:
%%time
# Write the output dataset
out_domains_id.to_parquet(temp_output_path+"domains", write_index=False, compression="snappy", engine="pyarrow", overwrite=True)

CPU times: user 10.4 s, sys: 81.4 ms, total: 10.5 s
Wall time: 9.72 s


In [52]:
out_domains_id

Unnamed: 0_level_0,mapped_domains
npartitions=48,Unnamed: 1_level_1
,object
,...
...,...
,...
,...


In [53]:
out_domains_id.head()

Unnamed: 0,mapped_domains
0,[]
1,[1]
2,[2]
3,[]
4,[]


In [54]:
%%time
# Write the dicts
with gzip.GzipFile(dict_path + "raw_feature_tweet_domains" + "_dict", 'wb') as file:
    pickle.dump(dict_domains_id, file, protocol=pickle.HIGHEST_PROTOCOL)

CPU times: user 224 ms, sys: 76 µs, total: 224 ms
Wall time: 222 ms


In [55]:
# Clean variables
del dict_domains_id, out_domains_id, mapping, df
gc.collect()

99

### Map Hashtags

In [56]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                             "raw_feature_tweet_hashtags"
                     ],
                     engine='pyarrow')

CPU times: user 1.62 ms, sys: 10.2 ms, total: 11.8 ms
Wall time: 11 ms


In [57]:
%%time
# Create Dict
dict_hashtags_id, mapping = create_dict_feature_to_split(df["raw_feature_tweet_hashtags"], '\t', np.uint32)

CPU times: user 4.66 s, sys: 123 ms, total: 4.78 s
Wall time: 3.24 s


In [60]:
# Map the feature
out_hashtags_id = map_column_array(df["raw_feature_tweet_hashtags"].fillna(""), dict_hashtags_id, '\t', "mapped_tweet_hashtags", np.uint32, "") \
    .to_frame()

In [61]:
%%time
# Write the output dataset
out_hashtags_id.to_parquet(temp_output_path+"hashtags", write_index=False, compression="snappy", engine="pyarrow", overwrite=True)

CPU times: user 10 s, sys: 279 ms, total: 10.3 s
Wall time: 9.2 s


In [63]:
out_hashtags_id

Unnamed: 0_level_0,mapped_tweet_hashtags
npartitions=48,Unnamed: 1_level_1
,object
,...
...,...
,...
,...


In [68]:
out_hashtags_id.head(40)

Unnamed: 0,mapped_tweet_hashtags
0,[]
1,[]
2,[]
3,[]
4,[]
5,[]
6,[]
7,[]
8,[]
9,[]


In [67]:
df.head(40)

Unnamed: 0,raw_feature_tweet_hashtags
0,
1,
2,
3,
4,
5,
6,
7,
8,
9,


In [62]:
%%time
# Write the dicts
with gzip.GzipFile(dict_path + "raw_feature_tweet_hashtags" + "_dict", 'wb') as file:
    pickle.dump(dict_hashtags_id, file, protocol=pickle.HIGHEST_PROTOCOL)

CPU times: user 3.58 s, sys: 7.71 ms, total: 3.59 s
Wall time: 3.61 s


In [69]:
# Clean variables
del dict_hashtags_id, out_hashtags_id, mapping, df
gc.collect()

23

### title

In [None]:
%%time
# Load dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                             "raw_feature_tweet_media"
                     ],
                     engine='pyarrow')

In [None]:
%%time
# Create Dict

In [None]:
# Map the feature

In [None]:
# Write the output dataset

In [None]:
# Write the dicts


In [None]:
# Clean variables

In [None]:
assert(False)

In [70]:
# Read not mapped features from original dataset
df = dd.read_parquet(dataset_path,
                     columns= [
                         "tweet_timestamp",
                         "creator_follower_count",
                         "creator_following_count",
                         "creator_is_verified",
                         "creator_creation_timestamp",
                         "engager_follower_count",
                         "engager_following_count",
                         "engager_is_verified",
                         "engager_creation_timestamp",
                         "engagement_creator_follows_engager",
                         "engagement_reply_timestamp",
                         "engagement_retweet_timestamp",
                         "engagement_comment_timestamp",
                         "engagement_like_timestamp"
                     ],
                     engine='pyarrow')

In [71]:
# Prepare to load the datasets created previously
df_list = []

columns_dict = {
    "mapped_creator_id": ["mapped_creator_id"],
    "counted_media": [
        "number_of_photo",
        "number_of_gif",
        "number_of_video"
    ],
    "links": ["mapped_tweet_links"],
    "domains": ["mapped_domains"],
    "hashtags": ["mapped_hashtags"]
}

for name, cols in columns_dict.items():
    df_list.append(dd.read_parquet(temp_output_path + name))

In [72]:
for i in range(1, len(df_list)):
    cur_df = df_list[i]
    for col in cur_df.columns:
        df[col] = cur_df[col]

df

Unnamed: 0_level_0,tweet_timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_creation_timestamp,engager_follower_count,engager_following_count,engager_is_verified,engager_creation_timestamp,engagement_creator_follows_engager,engagement_reply_timestamp,engagement_retweet_timestamp,engagement_comment_timestamp,engagement_like_timestamp,number_of_photo,number_of_gif,number_of_video,mapped_tweet_links,mapped_domains,mapped_tweet_hashtags
npartitions=48,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
,Int64,UInt32,UInt32,boolean,Int64,UInt32,UInt32,boolean,Int64,boolean,Int64,Int64,Int64,Int64,int64,int64,int64,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [73]:
%%time
df.head(10)

CPU times: user 182 ms, sys: 1.29 ms, total: 184 ms
Wall time: 202 ms


Unnamed: 0,tweet_timestamp,creator_follower_count,creator_following_count,creator_is_verified,creator_creation_timestamp,engager_follower_count,engager_following_count,engager_is_verified,engager_creation_timestamp,engagement_creator_follows_engager,engagement_reply_timestamp,engagement_retweet_timestamp,engagement_comment_timestamp,engagement_like_timestamp,number_of_photo,number_of_gif,number_of_video,mapped_tweet_links,mapped_domains,mapped_tweet_hashtags
0,1613237034,2473,662,False,1261859734,169,339,False,1520886748,False,,,,,0,0,0,[],[],[]
1,1613748600,4418640,228,True,1266804490,393,1190,False,1237570695,False,,,,,0,0,0,[1.0],[1.0],[]
2,1613386238,219715,3685,True,1202617218,629,1473,False,1263176351,False,,,,1613388292.0,0,0,0,[2.0],[2.0],[]
3,1613708640,2388283,13511,True,1251645191,123,200,False,1268276559,False,,,,,0,0,0,[],[],[]
4,1612586018,414,720,False,1578273274,134,379,False,1483862063,True,,,,1612587384.0,1,0,0,[],[],[]
5,1613047251,673,561,False,1318013852,2322,996,False,1385942383,True,,,,,0,0,0,[],[],[]
6,1613383429,9879,95,False,1551982041,61,1008,False,1573893335,False,,,,,0,0,0,[],[],[]
7,1614132656,209681,141,False,1516334262,35,106,False,1581377629,False,,,,1614134640.0,0,0,0,[],[],[]
8,1614179138,901,782,False,1401492344,9661,8573,False,1373060291,True,,,,,0,0,0,[],[],[]
9,1614172373,881,583,False,1254934130,355,650,False,1589197420,False,,,,1614175309.0,0,0,0,[3.0],[3.0],[]


In [None]:
%%time
df.to_parquet(output_path)

In [None]:
dask.config.

In [None]:
assert(False)

In [None]:
df = df.drop("raw_feature_tweet_text_token", axis=1)
del all_features_dtype["raw_feature_tweet_text_token"]
del all

In [None]:
#for k,t in {**all_features_dtype, **all_labels_dtype}.items():
#    if t == 'int32':
#        df[k] = df[k].fillna(0).astype(np.int32)
#    elif t == 'bool':
#        df[k] = df[k].astype(bool)
#    elif t == 'string':
#        df[k] = df[k].fillna("").astype(str)
#    else:
#        raise NotImplementedError()

In [None]:
df.dtypes

In [None]:
df.head(50)

Add ID column

In [None]:
df['id']   = df.index.astype(np.uint32)
df.head()

In [None]:
gc.collect()

### They thought me a lesson...
What was learnt from the dask API:
* `map_partition` applies a function on each partition overall, so it is most indicated for functions that must be applied at a ~dataframe level
* `apply` applies a function elementwise. If you see repo source, apply = map_partition + apply on each df
* `reset_index` is a partition-wise operation, so indices will be repeated on different partitions :)
* `compute` synchronous return of a result, no data persisted on cluster, full result returned. *"This turns a lazy Dask collection into its in-memory equivalent. For example a Dask array turns into a NumPy array and a Dask dataframe turns into a Pandas dataframe. The entire dataset must fit into memory before calling this operation."*
* `persist` asynchronous return of result, result persisted on cluster, full result available on request (ie. available a read away)
* `meta`: meta is the worst nightamre in dask. Required for many operations such as map_partitions and apply, you will never get them right at the first shot.
    * SO let's try to be precise from this [gold]: https://docs.dask.org/en/latest/dataframe-design.html#metadata
    * IF you apply a map_partition/apply, and expect to produce a dataframe THEN:
        * ONE ROW DATAFRAME EXAMPLE`ddf.map_partitions(foo, meta=pd.DataFrame({'a': 1, 'b': 2}))` since a whole dataframe (many columns possible) is expected to be produced from foo from working on each partition
        * DICT `{'a':int, 'b':int}` to be shorter, order must match Dataframe
    * ELIF you apply a map_partition/apply, and expect to produce a series THEN:
        * EMPTY SERIES `meta=pd.Series(dtype='int', name='the_custom_name_for_your_input')`
        * SINGLE TUPLE `('the_custom_name_for_your_input', int)`
    * ELSE you expect a single scalar:
        * SINGLE DTYPE `meta=int`
    * note that each dtype can be substituted with a string 'f8'~float8, 'O'~generic object ...
* `gc.collect()` are crucial to avoid OOMs :)
    * 2 types of OOM experienced:
        * dask backend crashes and computation halts
        * frozen computer, blue screens of death
    * PyCharm is heavy on RAM, close browser tabs/using lightweight browser can help \[true experience,sad\]
more info at https://distributed.dask.org/en/latest/memory.html, https://docs.dask.org/en/latest/dataframe-best-practices.html

Empirical concepts seen by visual inspection of graph computations happening
* `drop duplicates` is the unavoidable, long bottleneck, that however can benefit by an incredible amount of workers in parallel
* `cumsum` trick for computing unique increasing array is super fast
* translation to dictionary happens rather fast

### let's prepare some functions to use

In [None]:
def create_dict_feature(series: dask.dataframe.Series) -> (dict,dict):
    feature_name = series.name
    feature_name_encode = feature_name + "_encode"

    mapping = series.drop_duplicates().to_frame() #create a dataframe from a series
    mapping[feature_name_encode] = 1
    mapping[feature_name_encode] = mapping[feature_name_encode].cumsum()
    mapping, = dask.compute(mapping)
    #_ = wait(mapping)

    gc.collect()

    # define mapping dicts
    direct_dict = dict(zip(mapping[feature_name], mapping[feature_name_encode]))
    inverse_dict = dict(zip(mapping[feature_name_encode], mapping[feature_name]))
    # manage nans
    #direct_dict[pd.NA] = None
    #inverse_dict[None] = pd.NA

    del mapping
    gc.collect()
    return direct_dict, inverse_dict

def create_dict_feature_to_split(series: dask.dataframe.Series, sep: str) -> (dict,dict):
    feature_name = series.name
    feature_name_encode = feature_name + "_encode"

    #map partition internal function goes from series to dataframe
    mapping = series\
        .map_partitions(lambda s: pd.DataFrame([hashtag for line in s.dropna() for hashtag in line.split(sep)], columns=[feature_name]),
                       meta={feature_name:pd.StringDtype()})\
        .drop_duplicates()
    mapping[feature_name_encode] = 1
    mapping[feature_name_encode] = mapping[feature_name_encode].cumsum()
    mapping, = dask.compute(mapping)
    #_ = wait(mapping)

    gc.collect()

    # define mapping dicts
    direct_dict = dict(zip(mapping[feature_name], mapping[feature_name_encode]))
    inverse_dict = dict(zip(mapping[feature_name_encode], mapping[feature_name]))
    # manage nans
    #direct_dict[pd.NA] = None
    #inverse_dict[None] = pd.NA

    del mapping
    gc.collect()

    return direct_dict, inverse_dict


def map_column_single_value(series: dask.dataframe.Series, dictionary: dict, out_type: type = np.uint32) -> dask.dataframe.Series:
    feature_name = series.name
    feature_name_mapped = "mapped_" + feature_name

    return series\
        .apply(lambda x: dictionary[x],
               meta=pd.Series(dtype=out_type, name=feature_name_mapped))


def map_column_array(series: dask.dataframe.Series, dictionary: dict, sep: str, out_type: type = np.uint32) -> dask.dataframe.Series:
    feature_name = series.name
    feature_name_mapped = "mapped_" + feature_name

    return df[feature_name]\
        .apply(lambda x: np.array([dictionary[y] for y in x.split(sep)], dtype=out_type)
                                    if x is not pd.NA else None,
           meta=pd.Series(dtype='O', name=feature_name_mapped))

In [None]:
%%time
dir1, inv1 = create_dict_feature(df["raw_feature_tweet_id"])

In [None]:
%%time
dir2, inv2 = create_dict_feature_to_split(df["raw_feature_tweet_hashtags"], '\t')

In [None]:
#printing size of dictionaries in Megabytes
print('dir1 size:',sys.getsizeof(dir1)/(10**6))
print('inv1 size:',sys.getsizeof(inv1)/(10**6))
print('dir2 size:',sys.getsizeof(dir2)/(10**6))
print('inv2 size:',sys.getsizeof(inv2)/(10**6))

In [None]:
%%time
dir_language, inv_language = create_dict_feature(df["raw_feature_tweet_language"])

In [None]:
#printing size of dictionaries in Megabytes
print('dir_language size:',sys.getsizeof(dir_language)/(10**6))
print('inv_language size:',sys.getsizeof(inv_language)/(10**6))

In [None]:
%%time
dir_links, inv_links =  create_dict_feature_to_split(df["raw_feature_tweet_links"], '\t')

In [None]:
#printing size of dictionaries in Megabytes
print('dir_links size:',sys.getsizeof(dir_links)/(10**6))
print('inv_links size:',sys.getsizeof(inv_links)/(10**6))

In [None]:
%%time
dir_domains, inv_domains = create_dict_feature_to_split(df["raw_feature_tweet_domains"], '\t')

In [None]:
print('dir_domains size:',sys.getsizeof(dir_domains)/(10**6))
print('inv_domains size:',sys.getsizeof(inv_domains)/(10**6))

In [None]:
%%time
dir_creator_id, inv_creator_id =  create_dict_feature(df["raw_feature_creator_id"])

In [None]:
print('dir_creator_id size:',sys.getsizeof(dir_creator_id)/(10**6))
print('inv_creator_id size:',sys.getsizeof(inv_creator_id)/(10**6))

In [None]:
%%time
dir_engager_id, inv_engager_id = create_dict_feature(df["raw_feature_engager_id"])

In [None]:
print('dir_engager_id size:',sys.getsizeof(dir_engager_id)/(10**6))
print('inv_engager_id size:',sys.getsizeof(inv_engager_id)/(10**6))

In [None]:
gc.collect()

In [None]:
for x in itertools.islice(dir_engager_id.items(), 5):
    print(x)
for x in itertools.islice(dir_creator_id.items(), 5):
    print(x)

In [None]:
df["mapped_tweet_id"] = map_column_single_value(df["raw_feature_tweet_id"], dir1)

In [None]:
df["mapped_hashtags"] = map_column_array(df["raw_feature_tweet_hashtags"], dir2, '\t')

In [None]:
gc.collect()

In [None]:
df["mapped_language"] = map_column_single_value(df["raw_feature_tweet_language"], dir_language)

In [None]:
df["mapped_links"] = map_column_array(df["raw_feature_tweet_links"], dir_links, '\t')

In [None]:
gc.collect()

NB: Mapping the creator id and engager id together makes the .head() function stall. I have no idea why. If run individually, the .head() works fine

In [None]:
df["mapped_creator_id"] =  map_column_single_value(df["raw_feature_creator_id"], dir_creator_id)

In [None]:
df["mapped_engager_id"] =  map_column_single_value(df["raw_feature_engager_id"], dir_engager_id)

In [None]:
# Delle volte da il seguente errore, delle altre si pianta e basta. Fondamentalmente sembra che il problema sia la "computazione" effettiva sulle due features
# Runnate singolarmete funzionano, se runno sia il mapper_creator che il mapped_engager, si pianta.
# distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
# distributed.nanny - WARNING - Restarting worker
df["mapped_engager_id"].head(10) 

In [None]:
df

In [None]:
df[[
    "raw_feature_tweet_id", "raw_feature_tweet_hashtags", "raw_feature_tweet_language", "raw_feature_tweet_links",
   # "mapped_tweet_id", 
   # "mapped_hashtags", 
    "mapped_engager_id",
    "mapped_creator_id"]].head(20)

In [None]:
assert(False)

# Experiments

Generate Media columns

In [None]:
#df["raw_feature_tweet_media"].unique().compute()

media_dict = {
    "Photo":0,
    "GIF": 1,
    "Video": 2
}

def count_media_types(vec):
    ret = [0 for _ in range(3)]
    for x in vec:
        if x != "":
            ret[media_dict[x]] +=1
    return ret

def to_map_on_media_col(media_col: pd.Series) -> pd.DataFrame:
    return pd.DataFrame(media_col.map(count_media_types).to_list(), columns=media_dict.keys())

In [None]:
df['raw_feature_tweet_media'].fillna("").head(20)

In [None]:
temp = df['raw_feature_tweet_media']\
    .fillna("")\
    .map_partitions(lambda s: to_map_on_media_col(s.str.split("\t")),
                    meta={k:'uint8' for k in media_dict.keys()})
temp.head(20)

In [None]:
df['generated_feature_n_photo'] = temp['Photo']
df['generated_feature_n_gif'] = temp['GIF']
df['generated_feature_n_video'] = temp['Video']

df.head(5)

In [None]:
gc.collect()

In [None]:
df

### Dict mapping on column of single values

In [None]:
%%time
# dataset to dataset
mapping = df['raw_feature_tweet_id']\
    .drop_duplicates().to_frame() #create a dataframe from a series
mapping['tweet_encode'] = 1
mapping['tweet_encode'] = mapping['tweet_encode'].cumsum()
mapping, = dask.compute(mapping)
_ = wait(mapping)

In [None]:
#gc.collect()

In [None]:
%%time
# define mapping dicts
direct_dict = dict(zip(mapping["raw_feature_tweet_id"], mapping["tweet_encode"]))
# save RAM
# inverse_dict = dict(zip(mapping["tweet_encode"], mapping["raw_feature_tweet_id"]))
# # manage nans #NO MORE NEEDED
# direct_dict[pd.NA] = None
# inverse_dict[None] = pd.NA

In [None]:
# print results
print(f"{mapping.shape=}, {len(direct_dict)=}")
print("\ndirect_dict contains:")
for x in itertools.islice(direct_dict.items(), 5):
    print(x)
# print("\ninverse_dict contains:")
# for x in itertools.islice(inverse_dict.items(), 5):
#     print(x)

In [None]:
sys.getsizeof(mapping) / 1024**2
#nice 500MB mapping :)

In [None]:
sys.getsizeof(direct_dict) / 1024**2

In [None]:
#del mapping

In [None]:
#gc.collect()

In [None]:
client.restart()

In [None]:
%%time
#time needed to compute graph, not to execute computations

df['mapped_tweet_id'] = df['raw_feature_tweet_id']\
     .apply(lambda x: direct_dict[x],
            meta=pd.Series(dtype='uint32', name='mapped_tweet_id'))

#df['mapped_tweet_id'] = df['raw_feature_tweet_id']\
#    .map_partitions(lambda x: x.map(direct_dict),
#           meta=pd.Series(dtype=pd.UInt32Dtype(), name='mapped_tweet_id'))

In [None]:
%%time
#only head now, don't waste time mapping on whole dataset
df[['raw_feature_tweet_id', 'mapped_tweet_id']].head(10)

### Dict mapping on columns containing a list of values

In [None]:
#doing it here creates connectivity exceptions
client.restart()

In [None]:
df

In [None]:
#print(df.npartitions)
#df = df.repartition(partition_size='100MB')
#print(df.npartitions)

In [None]:
%%time
#map partition internal function goes from series to dataframe
mapping = df['raw_feature_tweet_hashtags']\
    .fillna("")\
    .map_partitions(lambda s: pd.DataFrame([hashtag for line in s for hashtag in line.split('\t')], columns=['hashtag']),
                   meta={'hashtag':pd.StringDtype()})\
    .drop_duplicates(split_out = 16)

mapping['hashtag_encode'] = 1
mapping['hashtag_encode'] = mapping['hashtag_encode'].cumsum()
mapping, = dask.compute(mapping)
_ = wait(mapping)

In [None]:
gc.collect()

In [None]:
%%time
# define mapping dicts
direct_dict = dict(zip(mapping["hashtag"], mapping["hashtag_encode"]))
# inverse_dict = dict(zip(mapping["hashtag_encode"], mapping["hashtag"]))
# # manage nans
# direct_dict[pd.NA] = None
# inverse_dict[None] = pd.NA

In [None]:
# print results
print(f"{mapping.shape=}, {len(direct_dict)=}")
print("\ndirect_dict contains:")
for x in itertools.islice(direct_dict.items(), 5):
    print(x)
# print("\ninverse_dict contains:")
# for x in itertools.islice(inverse_dict.items(), 5):
#     print(x)

In [None]:
#del mapping
gc.collect()

In [None]:
df['mapped_hashtag'] = df['raw_feature_tweet_hashtags']\
    .fillna("")\
    .apply(lambda x: np.array([direct_dict[y] for y in x.split('\t')], dtype=np.int32)
                                    if x != "" else None,
           meta=pd.Series(dtype='O', name='mapped_hashtag'))

In [None]:
%%time
#only head now, don't waste time mapping on whole dataset
df[['raw_feature_tweet_hashtags', 'mapped_hashtag']].head(40)

In [None]:
gc.collect()

In [None]:
#client.restart()

# here 'official' things stop

In [None]:
assert(False) #do not execute automatically after :)

In [None]:
mapping = df[['raw_feature_tweet_hashtags']]\
    .apply(lambda x: x.split('\t', expand=True).stack().rename("test"),
           axis=1,
           meta={'test':'O'})\
    .drop_duplicates(split_out=16)
mapping['hashtag_encode'] = 1
mapping['hashtag_encode'] = mapping['hashtag_encode'].cumsum()
mapping,shape = dask.compute(mapping, mapping.shape)
_ = wait(mapping)
_ = wait(shape)

In [None]:
%%time
mapping = df['raw_feature_tweet_hashtags'].map_partitions(lambda s: pd.DataFrame(s.dropna().str.split('\t', expand=True).stack().rename('test')), meta={'test':'O'}).drop_duplicates(split_out=16)
mapping , count = dd.compute(mapping, mapping.shape)
_ = wait(mapping)
_ = wait(count)

In [None]:
df['mapped_tweet_hashtags'] = df['raw_feature_tweet_hashtags']\
    .map_partitions(lambda s: s.map(lambda x: np.array([direct_dict[y] for y in x.split('\t')], dtype=np.int32)
                                    if x is not pd.NA else None),
                    meta={'raw_feature_tweet_hashtags': 'O'})['raw_feature_tweet_hashtags']
df[['raw_feature_tweet_hashtags', 'mapped_tweet_hashtags']].head(20)

In [None]:
def create_dict_feature_to_split(series, sep):
    series = series.dropna()\
        .str.split(sep, expand=True)\
        .stack().reset_index(drop=True, level=1)


    d.map_partitions(
    lambda df: df.drop('var2', axis=1).join(
        df.var2.str.split(',', expand=True).stack().reset_index(drop=True, level=1).rename('var2')))

    
data = pd.DataFrame([y for x in data.dropna() for y in x.split('\t')])
data = data[data.columns[0]]
dictionary = pd.DataFrame(data.unique()).to_dict()[0]

In [None]:
def map_column_single_value(series, dictionary):
    mapped_series = series.map(dictionary).astype(np.int32)
    return mapped_series


def map_column_array(series, dictionary):
    mapped_series = series.map(
        lambda x: np.array([dictionary[y] for y in x.split('\t')], dtype=np.int32) if x is not pd.NA else None)
    return mapped_series


In [12]:
import pandas as pd
import numpy as np
import string
d = {c:i for i,c in enumerate(string.ascii_lowercase)}
test_df = pd.DataFrame(["a\tb\tc", "a\td\te", "c\tb", "a\td"], columns=["test"])
test_df

Unnamed: 0,test
0,a\tb\tc
1,a\td\te
2,c\tb
3,a\td


In [19]:
test_df['test'].apply(lambda x: np.array(list(map(d, x.split('\t')))))

TypeError: 'dict' object is not callable

In [None]:
test_df['test'].apply(lambda x: x.split('\t', expand=True)).stack()

In [None]:
test_df['test'].str.split('\t')

In [None]:
test_df['test'].str.split('\t',expand=True)

In [None]:
test_df['test'].str.split('\t',expand=True).stack()

In [None]:
import itertools
d = {"1":"uno", 2:"due", 3:"lezzo"}
for x in itertools.islice(d.items(), 2):
    print(x)

In [None]:
chr(2)

In [None]:
df = pd.DataFrame({'a':[[1,2,3], [4,5]]})
df['a']

In [None]:
df['a'].dtype


In [None]:
df['a'].astype(pd.arrays.IntegerArray())

In [None]:
import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({'a':[1,2,3,4,5,6,7,8], 'b':[8,7,6,5,4,3,2,1], 'c':[1,3,5,7,2,4,6,8]})
df = dd.from_pandas(df)
df

In [None]:
df