In [1]:
from typing import Tuple
import numpy as np
import pandas as pd
import polars as pl
from omegaconf import DictConfig, OmegaConf
import hydra
from hydra import initialize, initialize_config_module, initialize_config_dir, compose
import pathlib
from annoy import AnnoyIndex
from gensim.models import Word2Vec
from typing import Dict, List, Tuple

In [2]:
initialize(config_path="../conf", job_name="features_app")
config = compose(config_name="config", overrides=[
                                                    "data_path=../data/", 
                                                    "validation_path=../data/local_validation/", 
                                                    "ranker_path=../data/ranker/", 
                                                    "artifact_path=../artifacts/",
                                                    "model_path=../models/",
                                                    ]
                                                )
print(OmegaConf.to_yaml(config))

name: ''
tags:
- covisit
- word2vec
description: ' '
data_path: ../data/
validation_path: ../data/local_validation/
ranker_path: ../data/ranker/
artifact_path: ../artifacts/
debug: false
local_validation: true
word2vec: true
train_file: train.parquet
test_file: test.parquet
test_labels_file: test_labels.parquet
submission_path: submissions/
submission_file: submission.csv
model_path: ../models/
curr_model_path: models/word2vec.model
type_labels:
  clicks: 0
  carts: 1
  orders: 2
type_weight:
  0: 0.5
  1: 9
  2: 0.5
version: 1
chunk_size: 200000
random_state: 42
fraction: 0.02
n_top: 15
n_top_clicks: 20
n_samples: 50
time_diff: 7 * 24 * 60 * 60
one_week: 7 * 24 * 60 * 60
vector_size: 100
window: 5
negative: 10
workers: 12
epochs: 2
min_count: 5
n_trees: 100
n_recent: 10



The version_base parameter is not specified.
Please specify a compatability version level, or None.
Will assume defaults for version 1.1
  initialize(config_path="../conf", job_name="features_app")


In [3]:
def load_data() -> Tuple[pl.DataFrame, pl.DataFrame]:
    """Load data from parquet files."""
    if config.local_validation:
        train = pl.read_parquet(config.validation_path + config.train_file)
        # test = pl.read_parquet(config.validation_path + config.test_file)
        test = pl.read_parquet(config.ranker_path + config.test_file) # for ranker model
    else:
        train = pl.read_parquet(config.data_path + config.train_file)
        test = pl.read_parquet(config.data_path + config.test_file)
    return train, test


def load_model():
    """Load word2vec model."""
    if not config.word2vec:
        return None
    print("Loading word2vec model...")
    model = Word2Vec.load(f"{config.model_path}word2vec_windowl_{config.window}.model")
    print(
        f"Model loaded from path: {config.model_path}word2vec_windowl_{config.window}.model"
    )
    return model


def build_index(model, n_trees=100) -> Tuple[AnnoyIndex, Dict[str, int]]:
    """Build index for word2vec model."""
    if config.word2vec:
        print("Building index for word2vec model...")
        aid2idx = {aid: i for i, aid in enumerate(model.wv.index_to_key)}
        index = AnnoyIndex(model.wv.vector_size, metric="euclidean")
        for idx in aid2idx.values():
            index.add_item(idx, model.wv.vectors[idx])
        index.build(n_trees=n_trees)
        return index, aid2idx
    else:
        return None, None


def load_combined_covisitation(type: str = "clicks") -> pd.DataFrame:
    top_20 = pd.read_pickle(
        f"{config.data_path}top_20_{type}_v{config.version}.pkl"
    )
    print(f"Size of top_20_{type}:", len(top_20))
    return top_20

In [4]:
train, test = load_data()

In [5]:
print("Shape of train and test data:", train.shape, test.shape)

Shape of train and test data: (163955180, 4) (2127742, 4)


In [None]:
covisit_clicks = load_combined_covisitation(type="clicks")


Size of top_20_clicks: 1813303


In [None]:
model = load_model()
index, aid2idx = build_index(model)

Loading word2vec model...
Model loaded from path: ../models/word2vec_windowl_5.model
Building index for word2vec model...


In [None]:
key_to_index = model.wv.key_to_index
vectors = model.wv.vectors

# reduce dimensionality of vectors
from sklearn.decomposition import PCA
pca = PCA(n_components=10)
pca.fit(vectors)
vectors = pca.transform(vectors)

### USER-ITEM-FEATURES

In [6]:
def add_mean_word2vec_feature(aids):
    """Add mean word2vec feature to dataframe."""
    if config.word2vec and len(aids) == 0 or not config.word2vec:
        return np.zeros(config.vector_size)
    else:
        return list(
            np.mean(
            [vectors[key_to_index[aid]] for aid in aids if aid in key_to_index], axis=0
        ))


def add_word2vec_count_feature(aids):
    pass


def add_avg_cosine_similarity(aids):
    pass


# how many items (that user has already clicked) have recommended this item with their co-visitation matrix
def apply_covisit_clicks_feature(aids):
    covisit_recomms = [0] * len(aids)
    window_start = 0
    for idx, window_end in enumerate(range(len(aids))):
        user_sequence = aids[window_start:window_end]
        curr_aid = aids[window_end]
        unique_aids = list(set(user_sequence))
        covisit_count = sum(curr_aid in covisit_clicks[aid] for aid in unique_aids if aid in covisit_clicks)
        covisit_recomms[idx] = covisit_count
    assert len(covisit_recomms) == len(aids)
    return covisit_recomms


def explode_word2vec_to_columns(df):
    df = df.to_pandas()
    columns = [f'word2vec_feature_{i}' for i in range(10)]
    word2vec_features = df['word2vec_feature'].apply(pd.Series)
    word2vec_features.columns = columns
    word2vec_features, df = pl.DataFrame(word2vec_features), pl.DataFrame(df)
    df = pl.concat([df, word2vec_features], how='horizontal')
    df = df.drop(columns=['word2vec_feature'])
    return df


In [7]:
# logspace averaged mean of the feature vecture
def add_word2vec_feature(df):
    return df.select(
        [
            pl.col("*"),
            pl.col(["aid"])
            .apply(add_mean_word2vec_feature)
            .list()
            .over("session")
            .alias("word2vec_feature"),
        ]
    )


def add_covisit_clicks_feature(df):
    return df.select(
        [
            pl.col("*"),
            pl.col(["aid"])
            .apply(apply_covisit_clicks_feature)
            .over("session")
            .alias("covisit_clicks_feature"),
        ]
    )


# number of clicks in a session at each timestamp
def add_click_num_chrono(df):
    return df.select([
        pl.col('*'),
        pl.when(pl.col('type') == 0)
          .then(pl.col('aid').cumcount().over('session'))
          .otherwise(pl.lit(None)).alias('click_num_chrono')
          .forward_fill()
    ])


# number of clicks in a session at each timestamp in reverse chronological order
def add_click_num_reverse_chrono(df):
    return df.select([
        pl.col('*'),
        pl.when(pl.col('type') == 0)
          .then(pl.col('aid').cumcount().reverse().forward_fill().over('session'))
          .otherwise(pl.lit(None)).alias('click_num_reverse_chrono')
          .forward_fill()
    ])


def add_counter(event_types):
    cumcount, counter = [], 0
    for event in event_types:
        if event == 1:
            counter += 1
        cumcount.append(counter)
    return cumcount


# number of carts in a session at each timestamp
def add_cart_num_chrono(df):
    return df.select([
        pl.col('*'),
        pl.when(pl.col('type') == 1)
          .then(pl.col(['type']).apply(add_counter).forward_fill().over('session'))
          .otherwise(pl.lit(0)).alias('cart_num_chrono')
    ])


# number of carts in a session at each timestamp
def add_order_num_chrono(df):
    return df.select([
        pl.col('*'),
        pl.when(pl.col('type') == 2)
          .then(pl.col(['type']).apply(add_counter).forward_fill().over('session'))
          .otherwise(pl.lit(0)).alias('order_num_chrono')
    ])


# doesn't seem correct - please cross-check
def add_aid_interacted_with_count(df):
    temp_df = df.unique(subset=['session', 'aid'])
    temp_df = temp_df.select([
            pl.col('*'),
            pl.col('session').cumcount().over('session').alias('aid_interacted_with_unique')
        ])
    df = df.join(temp_df, on=['session', 'aid'], how='left')
    return df.select(pl.exclude("^.*right$"))


def add_sec_since_last_action(df):
    return df.select(
        [
            pl.col("*"),
            pl.col("ts")
            .diff()
            .over("session")
            .alias("seconds_since_last_action"),
        ]
    ).fill_null(-1)


def add_sec_since_first_action(df):
    return df.select(
        [
            pl.col("*"),
            (pl.col("ts") - pl.col("ts").min())
            .over("session")
            .alias("seconds_since_first_action"),
        ]
    ).fill_null(-1)


def add_sec_to_session_end(df):
    return df.select([
        pl.col('*'),
        (pl.col('ts').max() - pl.col('ts'))
        .over('session')
        .alias('seconds_to_session_end')
    ]).fill_null(-1)


def add_action_num_reverse_chrono_unique(df):
    temp_df = df.unique(subset=['session', 'aid'])
    temp_df = temp_df.select([
            pl.col('*'),
            pl.col('session').cumcount().reverse().over('session').alias('action_num_reverse_chrono_unique')
        ])
    df = df.join(temp_df, on=['session', 'aid'], how='left')
    return df.select(pl.exclude("^.*right$"))


def add_action_num_reverse_chrono(df):
    return df.select([
        pl.col('*'),
        pl.col('session').cumcount().reverse().over('session').alias('action_num_reverse_chrono')
    ])


def add_session_length(df):
    return df.select([
        pl.col('*'),
        pl.col('session').count().over('session').alias('session_length')
    ])


def add_type_weighted_log_recency_score(df):
    type_weights = {0:0.5, 1:9, 2:0.5}
    type_weighted_log_recency_score = pl.Series(df['log_recency_score'] / df['type'].apply(lambda x: type_weights[x]))
    return df.with_column(type_weighted_log_recency_score.alias('type_weighted_log_recency_score'))


def add_log_recency_score(df):
    linear_interpolation = 0.1 + ((1-0.1) / (df['session_length']-1)) * (df['session_length']-df['action_num_reverse_chrono']-1)
    return df.with_columns(pl.Series(2**linear_interpolation - 1).alias('log_recency_score')).fill_nan(1)


def apply(df, pipeline):
    for f in pipeline:
        df = f(df)
    return df

In [None]:
# add features
pipeline = [
    add_covisit_clicks_feature,
    add_click_num_chrono,
    add_click_num_reverse_chrono,
    add_cart_num_chrono,
    add_order_num_chrono,
    add_aid_interacted_with_count,
    add_sec_since_last_action,
    add_sec_since_first_action,
    add_sec_to_session_end,
    add_action_num_reverse_chrono_unique,
    add_action_num_reverse_chrono,
    add_session_length,
    add_log_recency_score,
    add_type_weighted_log_recency_score,
    add_word2vec_feature,
    explode_word2vec_to_columns
]

In [88]:
def add_datetime(df: pl.DataFrame) -> pl.DataFrame:
    MILLISECONDS_IN_SECOND = 1000

    df = df.with_columns(
        [
            (pl.col("ts").cast(pl.Int64) * MILLISECONDS_IN_SECOND)
            .cast(pl.Datetime)
            .dt.with_time_unit("ms")
            .alias("datetime")
        ]
    )
    return df


def add_day_of_week_pandas(df):
    df = df.to_pandas()
    df['day_of_week'] = df['datetime'].dt.dayofweek
    return pl.DataFrame(df)


def add_hour_of_the_day_pandas(df):
    df = df.to_pandas()
    df['hour_of_the_day'] = df['datetime'].dt.hour
    return pl.DataFrame(df)


def add_is_day_pandas(df):
    df = df.to_pandas()
    df['is_day'] = df['hour_of_the_day'].apply(lambda x: 1 if x >= 6 and x <= 18 else 0)
    return pl.DataFrame(df)


test = add_datetime(test)
test = add_day_of_week_pandas(test)
test = add_hour_of_the_day_pandas(test)
test = add_is_day_pandas(test)

In [81]:
test.head()

session,aid,ts,type,datetime,day_of_week,hour_of_the_day,is_day
i32,i32,i32,u8,datetime[ns],i64,i64,i64
11098530,264500,1661119200,0,2022-08-21 22:00:00,6,22,0
11098530,264500,1661119288,0,2022-08-21 22:01:28,6,22,0
11098530,409236,1661119369,0,2022-08-21 22:02:49,6,22,0
11098530,409236,1661119441,0,2022-08-21 22:04:01,6,22,0
11098530,409236,1661120165,0,2022-08-21 22:16:05,6,22,0


### USER FEATURES

In [98]:

real_session_gap = 2 * 60 * 60 

def add_avg_hour_user_clicks(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .filter(pl.col('type') == 0)
            .dt.hour()
            .mean()
            .alias('avg_hour_user_clicks')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_hour_user_carts(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .filter(pl.col('type') == 1)
            .dt.hour()
            .mean()
            .alias('avg_hour_user_carts')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_hour_user_orders(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .filter(pl.col('type') == 2)
            .dt.hour()
            .mean()
            .alias('avg_hour_user_orders')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_hour_user_interactions(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .dt.hour()
            .mean()
            .alias('avg_hour_user_interactions')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_last_day_of_week_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .max()
            .alias('last_day_of_week_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_first_day_of_week_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .min()
            .alias('first_day_of_week_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_day_of_week_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .mean()
            .alias('avg_day_of_week_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


# number of real sessions based on time gap activity
def add_num_real_sessions(df):
    result = df.groupby('session').agg(
        [
            pl.col('ts')
            .diff()
            .filter(pl.col('datetime').diff() > real_session_gap)
            .count()
            .alias('num_real_sessions')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_hour_in_each_real_session(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .filter(pl.col('datetime').diff() > real_session_gap)
            .dt.hour()
            .mean()
            .alias('avg_hour_in_each_real_session')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_items_in_each_real_session(df):
    result = df.groupby('session').agg(
        [
            pl.col('aid')
            .filter(pl.col('datetime').diff() > real_session_gap)
            .count()
            # .mean()
            .alias('avg_items_in_each_real_session')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_avg_time_between_clicks(df):
    result = df.groupby('session').agg(
        [
            pl.col('ts')
            .filter(pl.col('type') == 0)
            .diff()
            .mean()
            .alias('avg_time_between_clicks')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_num_of_day_events(df):
    result = df.groupby('session').agg(
        [
            pl.col('is_day')
            .filter(pl.col('is_day') == 1)
            .count()
            .alias('num_day_events')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_num_of_night_events(df):
    result = df.groupby('session').agg(
        [
            pl.col('is_day')
            .filter(pl.col('is_day') == 0)
            .count()
            .alias('num_night_events')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_num_of_weekend_events(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .filter(pl.col('day_of_week') > 4)
            .count()
            .alias('num_weekend_events')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_num_of_weekday_events(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .filter(pl.col('day_of_week') < 5)
            .count()
            .alias('num_weekday_events')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')



def add_if_weekday_user(df):
    # if major fraction of events are weekday events
    return df.with_column(
        pl.when(
            pl.col('num_weekday_events') > pl.col('num_weekend_events')
        )
        .then(1)
        .otherwise(0)
        .alias('if_weekday_user')
    )


def add_if_weekend_user(df):
    # if major fraction of events are weekend events
    return df.with_column(
        pl.when(
            pl.col('num_weekday_events') < pl.col('num_weekend_events')
        )
        .then(1)
        .otherwise(0)
        .alias('if_weekend_user')
    )


def add_first_hour_of_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .dt.hour()
            .min()
            .alias('first_hour_of_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_last_hour_of_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('datetime')
            .dt.hour()
            .max()
            .alias('last_hour_of_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_first_day_of_week_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .min()
            .alias('first_day_of_week_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')


def add_last_day_of_week_user_activity(df):
    result = df.groupby('session').agg(
        [
            pl.col('day_of_week')
            .max()
            .alias('last_day_of_week_user_activity')
        ]
    ).fill_null(-1)
    return df.join(result, on='session', how='left')
        

def add_click_ratio(df):
    pass


def add_cart_ratio(df):
    pass


def add_order_ratio(df):
    pass


In [104]:
pipeline = [
    add_avg_day_of_week_user_activity,
    add_first_day_of_week_user_activity,
    add_last_day_of_week_user_activity,
    add_first_day_of_week_user_activity,
    add_last_day_of_week_user_activity,
    add_avg_hour_user_interactions,
    add_first_hour_of_user_activity,
    add_last_hour_of_user_activity,
    add_avg_hour_in_each_real_session,
    add_avg_items_in_each_real_session,
    add_avg_time_between_clicks,
    add_num_real_sessions,
    add_num_of_day_events,
    add_num_of_night_events,
    add_num_of_weekend_events,
    add_num_of_weekday_events,
    add_if_weekday_user,
    add_if_weekend_user,
]

In [109]:
user_features = apply(test, pipeline)
user_features = user_features.unique(subset=['session'])
user_features = user_features.drop(['datetime', 'ts', 'day_of_week', 'is_day', 'hour_of_the_day', 'type', 'aid'])
user_features

session,avg_day_of_week_user_activity,first_day_of_week_user_activity,last_day_of_week_user_activity,first_day_of_week_user_activity_right,last_day_of_week_user_activity_right,avg_hour_user_interactions,first_hour_of_user_activity,last_hour_of_user_activity,avg_hour_in_each_real_session,avg_items_in_each_real_session,avg_time_between_clicks,num_real_sessions,num_day_events,num_night_events,num_weekend_events,num_weekday_events,if_weekday_user,if_weekend_user
i32,f64,i64,i64,i64,i64,f64,i64,i64,f64,i64,f64,i64,i64,i64,i64,i64,i32,i32
11098530,6.0,6,6,6,6,22.0,22,22,22.0,0,241.25,5,0,6,6,0,0,1
11098531,6.0,6,6,6,6,22.0,22,22,22.0,11,18.052632,20,0,24,24,0,0,1
11098535,5.4,0,6,0,6,21.1,13,22,21.0,6,6784.0,9,1,9,9,1,0,1
11098539,6.0,6,6,6,6,22.0,22,22,22.0,2,54.5,2,0,3,3,0,0,1
11098542,6.0,6,6,6,6,22.0,22,22,22.0,19,50.928571,21,0,22,22,0,0,1
11098551,6.0,6,6,6,6,22.0,22,22,-1.0,1,-1.0,0,0,1,1,0,0,1
11098552,6.0,6,6,6,6,22.0,22,22,22.0,0,-1.0,0,0,1,1,0,0,1
11098557,6.0,6,6,6,6,22.0,22,22,22.0,12,40.272727,11,0,12,12,0,0,1
11098558,6.0,6,6,6,6,22.0,22,22,22.0,7,73.9,11,0,12,12,0,0,1
11098559,6.0,6,6,6,6,22.0,22,22,22.0,3,57.5,2,0,3,3,0,0,1


In [None]:
# user_features.groupby('session').agg([
#     pl.col('type_weighted_log_recency_score').mean().alias('type_weighted_log_recency_score_mean'),
#     pl.col('type_weighted_log_recency_score').sum().alias('type_weighted_log_recency_score_sum'),
#     pl.col('type_weighted_log_recency_score').max().alias('type_weighted_log_recency_score_max'),
#     pl.col('type_weighted_log_recency_score').min().alias('type_weighted_log_recency_score_min'),
#     pl.col('type_weighted_log_recency_score').std().alias('type_weighted_log_recency_score_std'),
#     pl.col('type_weighted_log_recency_score').var().alias('type_weighted_log_recency_score_var'),
#     pl.col('type_weighted_log_recency_score').median().alias('type_weighted_log_recency_score_median'),
#     pl.col('type_weighted_log_recency_score').quantile(0.25).alias('type_weighted_log_recency_score_q1'),
#     pl.col('type_weighted_log_recency_score').quantile(0.75).alias('type_weighted_log_recency_score_q3'),
#     pl.col('type_weighted_log_recency_score').kurtosis().alias('type_weighted_log_recency_score_kurtosis'),
#     pl.col('type_weighted_log_recency_score').count().alias('type_weighted_log_recency_score_count'),
# ]).head(10)

session,type_weighted_log_recency_score_mean,type_weighted_log_recency_score_sum,type_weighted_log_recency_score_max,type_weighted_log_recency_score_min,type_weighted_log_recency_score_std,type_weighted_log_recency_score_var,type_weighted_log_recency_score_median,type_weighted_log_recency_score_q1,type_weighted_log_recency_score_q3,type_weighted_log_recency_score_kurtosis,type_weighted_log_recency_score_count
i32,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,u32
11098564,2.0,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,,1
11098676,0.962832,15.405305,2.0,0.018304,0.615807,0.379218,0.928804,0.531513,1.530812,-1.117153,16
11098669,0.995082,5.970491,2.0,0.143547,0.695332,0.483487,0.933871,0.42839,1.530812,-1.218815,6
11098607,2.0,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,,1
11098551,2.0,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,,1
11098639,2.0,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,,1
11098606,2.0,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,,1
11098739,0.995082,5.970491,2.0,0.143547,0.695332,0.483487,0.933871,0.42839,1.530812,-1.218815,6
11098615,0.535671,2.142683,1.24901,0.111111,0.533399,0.284515,0.391281,0.143547,1.24901,-1.269535,4
11098595,0.978408,38.157907,2.0,0.143547,0.555597,0.308688,0.928171,0.484849,1.450588,-1.148419,39
