# cand_trial
1w版co_visit追加

In [21]:
import os
import sys
import traceback
import gc
import random
import pickle
import pathlib
import subprocess
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv
sys.path.append(os.getenv('UTILS_PATH'))
from tqdm import tqdm
import multiprocessing
import inspect

import pandas as pd
import polars as pl
import numpy as np
import itertools
import cudf
import lightgbm as lgb
import matplotlib.pyplot as plt
import seaborn as sns
import line_notify
import my_logger
from noglobal import noglobal

# 設定

In [22]:
@dataclass
class Cfg:
    loglevel = "INFO"
    exp_name = "cand_trial008"
    seed = 42
    k = 20
    cand_n = 15
    negative_sample = 1
    train_chunk_n_dict = {"clicks":2, "carts":1, "orders":1}
    test_chunk_n = 2
    type2id = {"clicks":0, "carts":1, "orders":2}
    id2type = {0:"clicks", 1:"carts", 2:"orders"}
    train_week = "week3"
    valid_week = "week4"
    valid_session_n = 100_000
    input_dir = os.getenv('INPUT_DIR')
    output_dir = os.getenv('OUTPUT_DIR')
    prep_dir = os.getenv("PREP_DIR")

    clicks_params = {'objective': 'binary', 'boosting': 'gbdt', 'learning_rate': 0.1, 'metric': 'binary_logloss', 'seed': 42, 'feature_pre_filter': False, 'lambda_l1': 5.485903737168179, 'lambda_l2': 0.005594683492536064, 'num_leaves': 79, 'feature_fraction': 0.552, 'bagging_fraction': 0.9295272232672004, 'bagging_freq': 2, 'min_child_samples': 10}
    carts_params = {'objective': 'binary', 'boosting': 'gbdt', 'learning_rate': 0.1, 'metric': 'binary_logloss', 'seed': 42, 'feature_pre_filter': False, 'lambda_l1': 8.709050252544463, 'lambda_l2': 0.06935262036337767, 'num_leaves': 252, 'feature_fraction': 0.4, 'bagging_fraction': 1.0, 'bagging_freq': 0, 'min_child_samples': 5}
    orders_params = {'objective': 'binary', 'boosting': 'gbdt', 'learning_rate': 0.1, 'metric': 'binary_logloss', 'seed': 42, 'feature_pre_filter': False, 'lambda_l1': 9.356310279757256, 'lambda_l2': 1.3120983078968551e-08, 'num_leaves': 174, 'feature_fraction': 0.5, 'bagging_fraction': 1.0, 'bagging_freq': 0, 'min_child_samples': 20}

cfg = Cfg()
os.makedirs(os.path.join(cfg.output_dir, cfg.exp_name), exist_ok=True)
os.makedirs(os.path.join(cfg.output_dir, cfg.exp_name, "cache"), exist_ok=True)
random.seed(cfg.seed)

logger = my_logger.init_logger(cfg.exp_name)

In [23]:
class Cache:
    @noglobal(excepts=["cfg", "logger"])
    def __init__(self):
        self.cache_dir = (os.path.join(cfg.output_dir, cfg.exp_name, "cache"))
        self.cache_dir_path = pathlib.Path(self.cache_dir)
        self.caches = list(self.cache_dir_path.glob("*"))

    def update(self):
        self.caches = list(self.cache_dir_path.glob("*"))

    @noglobal(excepts=["cfg", "logger"])
    def get_abspath(self, filename):
        return (os.path.join(self.cache_dir, filename))
    
    @noglobal(excepts=["cfg", "logger"])
    def exists(self, path):
        self.update()
        return len([str(c) for c in self.caches if path == str(c)]) > 0

cache = Cache()

In [24]:
def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float32)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')

    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    
    return df

In [25]:
@noglobal(excepts=["cfg", "logger"])
def evaluate(clicks_labels, carts_labels, orders_labels, 
             clicks_preds, carts_preds, orders_preds, k=20):

    num_clicks = 0
    num_carts = 0
    num_orders = 0
    hit_clicks = 0
    hit_carts = 0
    hit_orders = 0

    for i in range(len(clicks_labels)):
        clicks_label = clicks_labels[i]
        carts_label = carts_labels[i]
        orders_label = orders_labels[i]
        clicks_pred = clicks_preds[i]
        carts_pred = carts_preds[i]
        orders_pred = orders_preds[i]

        if type(clicks_pred) == list:
            clicks_pred = clicks_pred[:k]
        else:
            clicks_pred = []
        if type(carts_pred) == list:
            carts_pred = carts_pred[:k]
        else:
            carts_pred = []    
        if type(orders_pred) == list:
            orders_pred = orders_pred[:k]
        else:
            orders_pred = []

        if not np.isnan(clicks_label):
            num_clicks += 1
            hit_clicks += int(clicks_label in clicks_pred)

        if type(carts_label) == np.ndarray:
            num_carts += min(len(carts_label), k)
            hit_carts += len(set(carts_pred) & set(carts_label))
            
        if type(orders_label) == np.ndarray:
            num_orders += min(len(orders_label), k)
            hit_orders += len(set(orders_pred) & set(orders_label))


    recall_clicks = hit_clicks / num_clicks
    recall_carts = hit_carts / num_carts
    recall_orders = hit_orders / num_orders
    w_recall_clicks = recall_clicks * 0.10
    w_recall_carts = recall_carts * 0.30
    w_recall_orders = recall_orders * 0.60
    score = w_recall_clicks + w_recall_carts + w_recall_orders

    results = {}
    results["num_clicks"] = num_clicks
    results["hit_clicks"] = hit_clicks
    results["num_carts"] = num_carts
    results["hit_carts"] = hit_carts
    results["num_orders"] = num_orders
    results["hit_orders"] = hit_orders
    results["recall_clicks"] = format(recall_clicks, ".6f")
    results["recall_carts"] = format(recall_carts, ".6f")
    results["recall_orders"] = format(recall_orders, ".6f")
    results["w_recall_clicks"] = format(w_recall_clicks, ".6f")
    results["w_recall_carts"] = format(w_recall_carts, ".6f")
    results["w_recall_orders"] = format(w_recall_orders, ".6f")
    results["score"] = format(score, ".6f")

    return results

In [26]:
class Candidate:
    @noglobal(excepts=["cfg", "logger"])
    def __init__(self, pl_df):
        self.df = cudf.from_pandas(pl_df.to_pandas())
        self.sessions = []
        self.aids = []

    @noglobal(excepts=["cfg", "logger"])
    def add(self, strategy, name, usetypes=['clicks', 'carts', 'orders'], trans_map=None, k=20):
        logger.info(f"[add_candidate] {name} : start")
        candidate_df = self.df[self.df["type"].isin(usetypes)].copy()
        
        if strategy == "session_frequent":
            candidate_df = self._session_frequent(candidate_df, k)
        elif strategy == "session_latest":
            candidate_df = self._session_latest(candidate_df, k)
        elif strategy == "total_frequent":
            candidate_df = self._total_frequent(candidate_df, k)
        
        if trans_map is not None:
            candidate_df["aid"] = candidate_df["aid"].map(trans_map).astype("int32")
            candidate_df = candidate_df.dropna(subset=["aid"])
        
        self.sessions.extend(candidate_df["session"].to_arrow().to_pylist())
        self.aids.extend(candidate_df["aid"].to_arrow().to_pylist())

    @noglobal(excepts=["cfg", "logger"])
    def get_dataframe(self):
        cand_df = pl.DataFrame([
            pl.Series("session", self.sessions, pl.Int32),
            pl.Series("aid", self.aids, pl.Int32)
            ]).unique()
        return cand_df

    @noglobal(excepts=["cfg", "logger"])
    def _session_frequent(self, candidate_df, k):
        candidate_df = candidate_df.groupby(["session", "aid"])["ts"].count().reset_index()
        candidate_df.columns = ["session", "aid", "aid_count"]
        candidate_df = candidate_df.sort_values(["session", "aid_count", "aid"], ascending=(True, False, True))
        candidate_df = candidate_df[candidate_df.groupby("session")["aid_count"].cumcount() < k].copy()
        candidate_df = candidate_df[["session", "aid"]].copy()
        return candidate_df

    @noglobal(excepts=["cfg", "logger"])
    def _session_latest(self, candidate_df, k):
        candidate_df = candidate_df.sort_values(["session", "ts"], ascending=(True, False)).drop_duplicates(subset=["session", "aid"])
        candidate_df = candidate_df.sort_values(["session", "ts"], ascending=(True, False))[candidate_df.groupby("session")["ts"].cumcount() < k].copy()
        candidate_df = candidate_df[["session", "aid"]].copy()
        return candidate_df

    @noglobal(excepts=["cfg", "logger"])
    def _total_frequent(self, candidate_df, k):
        candidate_df = candidate_df.groupby("aid")["ts"].count().reset_index()
        candidate_df.columns = ["aid", "aid_count"]
        topk_freq_aids = candidate_df.sort_values("aid_count", ascending=False)["aid"].to_arrow().to_pylist()[:k]
        sessions = []
        aids = []
        for session, aid in itertools.product(self.target_sessions, topk_freq_aids):
            sessions.append(session)
            aids.append(aid)
        candidate_df = cudf.DataFrame({"session": sessions, "aid": aids})
        return candidate_df


In [27]:
@noglobal(excepts=["cfg", "logger"])
def make_cand_df(sessions, trans_maps=None):
    cand = Candidate(sessions)
    cand.add(strategy="session_frequent", name="session_frequent", k=15)
    cand.add(strategy="session_latest", name="session_latest", k=15)
    
    for i in range(20):
        cand.add(strategy="session_latest", trans_map=trans_maps[f"time_wt_pair{str(i)}"], name=f"session_latest_time_wt_pair{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"word2vec_pair{str(i)}"], name=f"session_latest_word2vec_pair{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"pair{str(i)}"], name=f"session_latest_pair{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"type_wt_pair{str(i)}"], name=f"session_latest_type_wt_pair{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"clicks2carts_pair{str(i)}"], name=f"session_latest_clicks2carts_pair{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"clicks2orders_pair{str(i)}"], name=f"session_latest_clicks2orders_pair{str(i)}", k=5)

    for i in range(5):
        cand.add(strategy="session_latest", trans_map=trans_maps[f"time_wt_pair_1w{str(i)}"], name=f"session_latest_time_wt_pair_1w{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"pair_1w{str(i)}"], name=f"session_latest_pair_1w{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"type_wt_pair_1w{str(i)}"], name=f"session_latest_type_wt_pair_1w{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"clicks2carts_pair_1w{str(i)}"], name=f"session_latest_clicks2carts_pair_1w{str(i)}", k=5)
        cand.add(strategy="session_latest", trans_map=trans_maps[f"clicks2orders_pair_1w{str(i)}"], name=f"session_latest_clicks2orders_pair_1w{str(i)}", k=5)    

    return cand.get_dataframe()

In [28]:
@noglobal(excepts=["cfg", "logger"])
def get_pair_df(name, wt_col, week):
    if week is not None:
        name = f"{name}_{week}"
    return pl.read_parquet(cfg.prep_dir + f"{name}.parquet").sort(wt_col, reverse=True)

@noglobal(excepts=["cfg", "logger"])
def get_trans_maps(week=None):
    """
    aidを別のaidに置き換えるmapping辞書を取得する
    """
    trans_maps = {}

    pair_df_org = get_pair_df("co_visitation_matrix", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"pair{str(i)}"] = pair_dict
    
    pair_df_org = get_pair_df("co_visitation_matrix_time_weighted", "wt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"time_wt_pair{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_type_weighted", "wt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"type_wt_pair{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_clicks2carts", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"clicks2carts_pair{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_clicks2orders", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"clicks2orders_pair{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("word2vec_similar", "sim", None)
    for i in range(20):
        pair_df = pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"word2vec_pair{str(i)}"] = pair_dict


    pair_df_org = get_pair_df("co_visitation_matrix_1w", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"pair_1w{str(i)}"] = pair_dict
    
    pair_df_org = get_pair_df("co_visitation_matrix_time_weighted_1w", "wt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"time_wt_pair_1w{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_type_weighted_1w", "wt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"type_wt_pair_1w{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_clicks2carts_1w", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"clicks2carts_pair_1w{str(i)}"] = pair_dict

    pair_df_org = get_pair_df("co_visitation_matrix_clicks2orders_1w", "cnt", week)
    for i in range(20):
        pair_df = pair_df_org.groupby("aid_x").head(i+1)
        pair_dict = {k: v for k, v in zip(pair_df.get_column("aid_x").to_list(), pair_df.get_column("aid_y").to_list())}
        trans_maps[f"clicks2orders_pair_1w{str(i)}"] = pair_dict

    return trans_maps

In [29]:
@noglobal(excepts=["cfg", "logger", "cache"])
def make_valid_cand():
    # データ読み込み
    week_sessions = pl.read_parquet(cfg.prep_dir + f"train_sessions_{cfg.valid_week}.parquet")
    week_sessions = week_sessions.select([
        pl.col("session").cast(pl.Int32),
        pl.col("aid").cast(pl.Int32),
        pl.col("ts"),
        pl.col("type")
        ])
    
    week_labels = pl.read_parquet(cfg.prep_dir + f"labels_{cfg.valid_week}.parquet")
    week_labels = week_labels.select([
        pl.col("session").cast(pl.Int32),
        pl.col("clicks_labels").cast(pl.Int32),
        pl.col("carts_labels"),
        pl.col("orders_labels")
        ])
    valid_sessions = week_sessions.get_column("session").unique().to_list()
    
    trans_map_cache_file = f"trans_maps_{cfg.valid_week}.pkl"
    trans_map_cache_path = cache.get_abspath(trans_map_cache_file)
    with open(trans_map_cache_path, "rb") as f:
        trans_maps = pickle.load(f)

    # validに使うsessionを絞る
    use_session_n = min(cfg.valid_session_n, len(valid_sessions))
    random.seed(cfg.seed)
    use_sessions = random.sample(valid_sessions, use_session_n)
    week_sessions = week_sessions.filter(pl.col("session").is_in(use_sessions))
    week_labels = week_labels.filter(pl.col("session").is_in(use_sessions))

    # 候補選出
    cand_df = make_cand_df(week_sessions, trans_maps=trans_maps)
    return cand_df, week_sessions, week_labels

In [30]:
logger.info("\n\nSTART\n\n")
cache_file = f"trans_maps_{cfg.valid_week}.pkl"
cache_path = cache.get_abspath(cache_file)

if cache.exists(cache_path):
    logger.info(f"[{cache_file}] cache exists. load cache and skip process.")
    with open(cache_path, "rb") as f:
        trans_maps = pickle.load(f)
else:
    trans_maps = get_trans_maps(cfg.valid_week)
    with open(cache_path, "wb") as f:
        pickle.dump(trans_maps, f)

cand_df, _, labels = make_valid_cand()

cand_df = cand_df.to_pandas()
labels = labels.to_pandas()
vl_pred_df = cand_df.groupby("session")["aid"].apply(list).reset_index()
session_n_max = cand_df["session"].value_counts().max()
session_n_mean = cand_df["session"].value_counts().mean()
session_n_median = cand_df["session"].value_counts().median()

score = evaluate(labels["clicks_labels"].tolist(),
                 labels["carts_labels"].tolist(),
                 labels["orders_labels"].tolist(),
                 vl_pred_df["aid"].tolist(),
                 vl_pred_df["aid"].tolist(),
                 vl_pred_df["aid"].tolist(),
                 session_n_max)

session_n_max = cand_df["session"].value_counts().max()
session_n_mean = cand_df["session"].value_counts().mean()
session_n_median = cand_df["session"].value_counts().median()
logger.info(f"session_n_max : {str(session_n_max)},  session_n_mean : {str(session_n_mean)},  session_n_median : {str(session_n_median)}")
logger.info(f"score : {str(score)}")

0:00:00.409982 	 

START


0:00:00.414029 	 [trans_maps_week4.pkl] cache exists. load cache and skip process.
0:01:44.098079 	 [add_candidate] session_frequent : start
0:01:44.498179 	 [add_candidate] session_latest : start
0:01:44.925879 	 [add_candidate] session_latest_time_wt_pair0 : start
0:01:45.581043 	 [add_candidate] session_latest_word2vec_pair0 : start
0:01:46.053220 	 [add_candidate] session_latest_pair0 : start
0:01:46.858992 	 [add_candidate] session_latest_type_wt_pair0 : start
0:01:47.319356 	 [add_candidate] session_latest_clicks2carts_pair0 : start
0:01:47.718714 	 [add_candidate] session_latest_clicks2orders_pair0 : start
0:01:48.035518 	 [add_candidate] session_latest_time_wt_pair1 : start
0:01:48.661544 	 [add_candidate] session_latest_word2vec_pair1 : start
0:01:49.155694 	 [add_candidate] session_latest_pair1 : start
0:01:49.873621 	 [add_candidate] session_latest_type_wt_pair1 : start
0:01:50.353253 	 [add_candidate] session_latest_clicks2carts_pair1 : start
0:01