In [1]:
VER = 6

import pandas as pd, numpy as np
from tqdm.notebook import tqdm
tqdm.pandas()

import os, sys, pickle, glob, gc
from collections import Counter
import cudf, itertools
print('We will use RAPIDS version',cudf.__version__)

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

from pandarallel import pandarallel

pandarallel.initialize(nb_workers=4, progress_bar=True, use_memory_fs=True)

import polars as pl

from pyarrow.parquet import ParquetFile
import pyarrow as pa 

We will use RAPIDS version 22.10.00a+392.g1558403753
INFO: Pandarallel will run on 4 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.


In [2]:
def reduce_memory(df):
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type != object:
            cmin = df[col].min()
            cmax = df[col].max()
            if str(col_type)[:3] == 'int':
                if cmin > np.iinfo(np.int32).min and cmax < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif cmin > np.iinfo(np.int64).min and cmax < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if cmin > np.finfo(np.float32).min and cmax < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
    return df

# Feature Extraction

In [3]:
GENERATE_FOR = "kaggle" # "kaggle"

In [4]:
type_labels = {'clicks':0, 'carts':1, 'orders':2}

In [5]:
CANDIDATE_COUNT = 100

In [6]:
DISK_PIECES = 4

clicks_cov_df = pl.from_pandas(pd.concat([pd.read_parquet(f'../raw_data/{GENERATE_FOR}_covisitation/{GENERATE_FOR}_top_{CANDIDATE_COUNT}_clicks_v{VER}_{k}.pqt') for k in range(0, DISK_PIECES)], ignore_index=True))
carts_orders_cov_df = pl.from_pandas(pd.concat([pd.read_parquet(f'../raw_data/{GENERATE_FOR}_covisitation/{GENERATE_FOR}_top_{CANDIDATE_COUNT}_carts_orders_v{VER}_{k}.pqt') for k in range(0, DISK_PIECES)], ignore_index=True))
buy2buy_cov_df = pl.from_pandas(pd.concat([pd.read_parquet(f'../raw_data/{GENERATE_FOR}_covisitation/{GENERATE_FOR}_top_{CANDIDATE_COUNT}_buy2buy_v{VER}_{k}.pqt') for k in range(0, 1)], ignore_index=True))

In [7]:
def get_covisitation_features(input_cand_df,
                              input_user_int_df,
                              input_covisit_df,
                              covisit_name="clicks"):      
    
    candidates_w_covisit = input_cand_df[["session", "aid"]].rename({"aid":"aid_x"}).\
        join(input_user_int_df.rename({"aid":"aid_y"})[["session", "aid_y"]],
              how="left",
              on="session").fill_null(0).join(input_covisit_df, how="left", on=["aid_x", "aid_y"])#.to_pandas()
#     candidates_w_covisit.loc[(candidates_w_covisit.aid_x != candidates_w_covisit.aid_y) &\
#                          (candidates_w_covisit.wgt.isna()), "wgt"] = 0


    candidates_w_covisit = candidates_w_covisit.fill_null(0.)
    
    candidates_w_covisit_gby = (
        candidates_w_covisit
        .groupby(["session", "aid_x"])
        .agg(
            [
                pl.col('wgt').max().alias(covisit_name + '_covisit_' + "max"),
                pl.col('wgt').min().alias(covisit_name + '_covisit_' + "min"),
                pl.col('wgt').std().alias(covisit_name + '_covisit_' + "std"),
                pl.col('wgt').sum().alias(covisit_name + '_covisit_' + "sum"),
                pl.col('wgt').mean().alias(covisit_name + '_covisit_' + "mean"),
                pl.col('wgt').count().alias(covisit_name + '_covisit_' + "count"),
            ]
        )
    ).sort("session", reverse=False)

#     candidates_w_covisit_gby = candidates_w_covisit.groupby(["session", "aid_x"]).agg({"wgt":["max", "min", "std",
#                                                                                               "sum", "mean", "count"]}).fillna(0)
#     candidates_w_covisit_gby.columns = [covisit_name + '_covisit_' +  '_'.join(col) for col in candidates_w_covisit_gby.columns]
#     candidates_w_covisit_gby.reset_index(inplace=True)

#     candidates_w_covisit_gby = pl.from_pandas(candidates_w_covisit_gby.rename(columns={"aid_x":"aid"}))
    candidates_w_covisit_gby = candidates_w_covisit_gby.rename({"aid_x":"aid"})
    
    candidates_w_covisit_gby = candidates_w_covisit_gby.with_column(pl.col("aid").cast(pl.Int32))
    candidates_w_covisit_gby = candidates_w_covisit_gby.with_column(pl.col("session").cast(pl.Int32)) 
    return candidates_w_covisit_gby

## Generating Features

In [8]:
def generate_datetime_features(input_df):
    input_df["datetime"] = pd.to_datetime(input_df.ts + (2 * 60 * 60), unit='s')
    input_df["hour"] = input_df["datetime"].dt.hour
    input_df["dayofweek"] = input_df["datetime"].dt.dayofweek
    input_df["is_weekend"] = (input_df["dayofweek"]>4).astype(int)
    return input_df

def datetime_aggregator(input_df,
                        group_cols=[],
                        wanted_cols=[]):
    return_df = input_df.groupby(group_cols).agg(
        {'hour':['mean', 'std'],
         'dayofweek':['mean', 'std'],
         'is_weekend':['mean']
        })
    return_df.columns = ['_'.join(group_cols) + '_' +  '_'.join(col) for col in return_df.columns]
    return return_df

def type_distribution_aggregator(input_df, 
                                 group_cols=[]):
    return_df = input_df.groupby(group_cols)['type'].value_counts(normalize=True)
    return_df = return_df.unstack('type')
    return_df.columns = ['_'.join(group_cols) + '_type' + str(col) + "_mean" for col in return_df.columns]
    return return_df

def existence_amount_aggregator(input_df,
                                 group_cols=[],
                                wanted_cols=[]):
    
    return_df = input_df.groupby(group_cols).agg({col:["count"] for col in wanted_cols})
    return_df.columns = ['_'.join(group_cols) + '_' +  '_'.join(col) for col in return_df.columns]
    
    count_cols = list(return_df.columns)
    
    for count_col in count_cols:  
        return_df[count_col.replace("count", "existed")] = (return_df[count_col]>0).astype(int)
        return_df[count_col.replace("count", "existed_multiple")] = (return_df[count_col]>1).astype(int)
#         return_df[count_col.replace("count", "existed_times")] = (return_df[count_col]).astype(int)
    
    return_df = return_df[[col for col in return_df.columns if ("count" not in col)]]
    
    return return_df

def nunique_aggregator(input_df,
                       group_cols=[],
                       wanted_cols=[]):
    
    return_df = input_df.groupby(group_cols).agg({col:["nunique"] for col in wanted_cols})
    return_df.columns = ['_'.join(group_cols) + '_' +  '_'.join(col) for col in return_df.columns]

    return return_df

def is_last_aid_of_the_session(input_df,
                               group_cols=["session", "aid"],
                               wanted_cols=[]
                              ):
    
    return_df = input_df[group_cols].copy()
    return_df["is_aid_interacted_last"] = 0
    return_df.loc[return_df.session.shift(-1) != return_df.session, "is_aid_interacted_last"] = 1
    return_df = return_df.groupby(group_cols).agg({"is_aid_interacted_last":["max"]})
    return_df.columns = ["is_aid_interacted_last_in_session"]
    return return_df

def session_len(input_df,
                group_cols=["session"],
                wanted_cols=[],
                return_min_max=False
               ):
    return_df = input_df[group_cols + ["ts"]].copy()
    return_df = return_df.groupby(group_cols).agg({"ts":["min", "max"]})
    return_df.columns = ["session_start", "session_end"]
    return_df["session_len"] = return_df["session_end"] - return_df["session_start"]
    
    if return_min_max:
        return return_df
    else:
        return return_df[["session_len"]]

def aid_session_ts_offsets(input_df,
                group_cols=["session", "aid"],
                wanted_cols=[]):
    session_lens = session_len(input_df,
                               return_min_max=True).reset_index()
    return_df = input_df[group_cols + ["ts"]].copy()
    return_df = return_df.groupby(group_cols).agg({"ts":["last"]})
    return_df.columns = ["session_aid_last_ts"]
    return_df.reset_index(inplace=True)
    return_df = return_df.merge(session_lens, how="left", on="session")
    return_df["aid_ts_session_end_offset"] = return_df["session_end"] - return_df["session_aid_last_ts"]
    return_df["aid_ts_session_start_offset"] = return_df["session_aid_last_ts"] - return_df["session_start"]

    return_df = return_df[group_cols + ["aid_ts_session_end_offset", "aid_ts_session_start_offset"]].set_index(group_cols)
    return return_df

def type_based_aggregator(input_df,
                          group_cols=[],
                          wanted_cols=[],
                          aggregators=[]):
    type_dfs = []
    for type_id in range(3):
        for aggregator in aggregators:
            aggregator_df = aggregator(input_df[input_df.type==type_id].reset_index(drop=True),
                                       group_cols=group_cols,
                                       wanted_cols=wanted_cols)
            aggregator_df.columns = ["type" + str(type_id) + "_" + col for col in aggregator_df.columns]
        type_dfs.append(aggregator_df)
        
    return pd.concat(type_dfs, axis=1)

In [9]:
if GENERATE_FOR == "local":
    train_df = pd.read_parquet(f"./splitted_raw_data/train.parquet")
    val_df = pd.read_parquet(f"./splitted_raw_data/val.parquet")

elif GENERATE_FOR == "kaggle":
    train_df = pd.read_parquet(f"./splitted_raw_data/all_train.parquet")
    val_df = pd.read_parquet(f"./splitted_raw_data/test.parquet")

train_df = generate_datetime_features(train_df)
val_df = generate_datetime_features(val_df)

item_df = pd.concat([train_df,val_df], ignore_index=True)
user_df = val_df
user_item_int_df = val_df    

print("Data is read!")

############

# item_features = item_df.groupby('aid').agg({'aid':['count'], 'session':['nunique']})
# item_features.columns = ['aid_' + "_".join(col) for col in item_features.columns]

item_features = pd.concat([
#     item_features,
    existence_amount_aggregator(item_df,
                                group_cols=["aid"],
                                wanted_cols=["session", "aid"]),
    nunique_aggregator(item_df,
                       group_cols=["aid"],
                       wanted_cols=["session"]),
    datetime_aggregator(item_df,
                        group_cols=["aid"]),
    type_distribution_aggregator(item_df,
                                 group_cols=["aid"]),
#     type_based_aggregator(item_df,
#                           group_cols=["aid"],
#                           wanted_cols=["aid", "session"],
#                           aggregators=[datetime_aggregator,
#                                        nunique_aggregator,
#                                        existence_amount_aggregator])
], axis=1)

item_features = reduce_memory(item_features)

item_features.to_parquet(f'./all_features/{GENERATE_FOR}_item_features.pqt')

print("Item features are created!")

############

# user_features = user_df.groupby('session').agg({'session':['count'], 'aid':['nunique']})

# user_features.columns = ['session_' + "_".join(col) for col in user_features.columns]

user_features = pd.concat([
#     user_features,
    existence_amount_aggregator(user_df,
                                group_cols=["session"],
                                wanted_cols=["session", "aid"]),
    session_len(user_df),
#     nunique_aggregator(user_df,
#                        group_cols=["session"],
#                        wanted_cols=["aid"]),
    datetime_aggregator(user_df,
                        group_cols=["session"]),
    type_distribution_aggregator(user_df,
                                 group_cols=["session"]),
#     type_based_aggregator(user_df,
#                           group_cols=["session"],
#                           wanted_cols=["aid", "session"],
#                           aggregators=[datetime_aggregator,
#                                        session_len,
# #                                        nunique_aggregator,
#                                        existence_amount_aggregator])
], axis=1)

user_features = reduce_memory(user_features)

user_features.to_parquet(f'./all_features/{GENERATE_FOR}_user_features.pqt')

print("User features are created!")

############

# user_item_int_features = user_item_int_df.groupby(['session', 'aid']).agg({'aid':['count']})

# user_item_int_features.columns = ['session_aid_' + "_".join(col) for col in user_item_int_features.columns]

user_item_int_features = pd.concat([
#     user_item_int_features,
    existence_amount_aggregator(user_item_int_df,
                                group_cols=["session", "aid"],
                                wanted_cols=["aid"]),
    is_last_aid_of_the_session(user_item_int_df),
    aid_session_ts_offsets(user_item_int_df),
#     nunique_aggregator(user_df,
#                        group_cols=["session"],
#                        wanted_cols=["aid"]),
    datetime_aggregator(user_item_int_df,
                        group_cols=['session', 'aid']),
    type_distribution_aggregator(user_item_int_df,
                                 group_cols=['session', 'aid']),
#     type_based_aggregator(user_item_int_df,
#                           group_cols=['session', 'aid'],
#                           wanted_cols=["aid"],
#                           aggregators=[datetime_aggregator,
#                                        is_last_aid_of_the_session,
#                                        aid_session_ts_offsets,
# #                                        nunique_aggregator,
#                                        existence_amount_aggregator])
], axis=1)

user_item_int_features = reduce_memory(user_item_int_features)

user_item_int_features.to_parquet(f'./all_features/{GENERATE_FOR}_user_item_int_features.pqt')

print("User-Item Interaction features are created!")

Data is read!
Item features are created!
User features are created!
User-Item Interaction features are created!


In [10]:
del item_features, user_features, user_item_int_features
gc.collect()

20

In [12]:
del item_df, train_df

## Merging Features w/ Candidates

In [22]:
item_features = pl.read_parquet(f'./all_features/{GENERATE_FOR}_item_features.pqt')
user_features = pl.read_parquet(f'./all_features/{GENERATE_FOR}_user_features.pqt')
user_item_int_features = pl.read_parquet(f'./all_features/{GENERATE_FOR}_user_item_int_features.pqt')

if GENERATE_FOR == "local":
    val_df = pl.read_parquet(f"./splitted_raw_data/val.parquet")
elif GENERATE_FOR == "kaggle":
    val_df = pl.read_parquet(f"./splitted_raw_data/test.parquet")
    
for type_str in tqdm(list(type_labels.keys())):
    
    pf = ParquetFile(f"./candidate_data/{GENERATE_FOR}_{CANDIDATE_COUNT}candidates_{type_str}.parquet")
    chunk = 10_000_000
    
    total_candidate_df = 0
    
    
    for batch_i, batch in tqdm(enumerate(pf.iter_batches(batch_size = chunk))):
        candidate_df = batch.to_pandas()
        candidate_df = pl.from_pandas(candidate_df)
        
        candidate_df = candidate_df.with_column(pl.col("aid").cast(pl.Int32))
        candidate_df = candidate_df.with_column(pl.col("session").cast(pl.Int32)) 
        
        val_df = val_df.with_column(pl.col("aid").cast(pl.Int32))
        val_df = val_df.with_column(pl.col("session").cast(pl.Int32))
        
        for covisit in[[clicks_cov_df, "clicks"],
                       [carts_orders_cov_df, "carts_orders"],
                       [buy2buy_cov_df, "buy2buy"]]:
            
            covisit[0] = covisit[0].with_column(pl.col("aid_x").cast(pl.Int32))
            covisit[0] = covisit[0].with_column(pl.col("aid_y").cast(pl.Int32))
        
            candidate_df = candidate_df.join(
                get_covisitation_features(input_cand_df=candidate_df,
                                          input_user_int_df=val_df,
                                          input_covisit_df=covisit[0],
                                          covisit_name=covisit[1]),
                how="left",
                on=["session", "aid"])
                
        candidate_df = candidate_df.with_column(pl.col("aid").cast(pl.Int64))
        candidate_df = candidate_df.with_column(pl.col("session").cast(pl.Int64))         
        
        #candidate_df = pl.read_parquet(f"./candidate_data/{GENERATE_FOR}_candidates_{type_str}.parquet").drop("__index_level_0__")
        rank_repeater = np.hstack([list(range(1,CANDIDATE_COUNT+1)) for i in range(int(len(candidate_df)/CANDIDATE_COUNT))])
        candidate_df = candidate_df.with_column(pl.Series(name="candidate_rank", values=rank_repeater))
        del rank_repeater;gc.collect()
        #print('Candidate Rank Features, Done...')
        candidate_df = candidate_df.join(item_features, on='aid', how='left').fill_null(-1)
        #print('Item Features, Done...')
        candidate_df = candidate_df.join(user_features, on='session', how='left').fill_null(-1)
        #print('User Features, Done...')
        candidate_df = candidate_df.join(user_item_int_features,
                                          on=['session', 'aid'],
                                          how='left').fill_null(-1)
        #print('User-Item Features, Done...')
        tar = pd.read_parquet('./splitted_raw_data/val_labels.parquet')
        tar = tar.loc[ tar['type'] == type_str ]
        aids = tar.ground_truth.explode().rename('aid')
        tar = tar[['session']]
        tar = tar.merge(aids, left_index=True, right_index=True, how='left')
        tar['label'] = 1
        #print('Extract Labels, Done...')
        
        tar = pl.from_pandas(tar)
        
        candidate_df = candidate_df.join(tar, on=['session','aid'], how='left').fill_null(0)
        
        candidate_df.write_parquet(f'./candidated_features/{GENERATE_FOR}_{type_str}_all_data_{CANDIDATE_COUNT}candidates_p{batch_i}.pqt')
        
        del candidate_df,tar,aids;gc.collect()

  0%|          | 0/3 [00:00<?, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

In [None]:
# dff = pd.read_parquet(f'./candidated_features/{GENERATE_FOR}_clicks_all_data.pqt')

In [None]:
# dff

In [None]:
candidate_df

In [None]:
get_covisitation_features(input_cand_df=candidate_df,
                                          input_user_int_df=val_df,
                                          input_covisit_df=covisit[0],
                                          covisit_name=covisit[1])