In [None]:
import json
import logging

import lightgbm as lgb
import numpy as np
import polars as pl
import rootutils
import seaborn as sns
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import KFold

sns.set_style("whitegrid")
logging.basicConfig(level=logging.INFO)

ROOT = rootutils.setup_root(".", pythonpath=True, cwd=True)

from src.feature.tabular import OrdinalEncoder, RawEncoder
from src.feature.utils import cache
from src.model.sklearn_like import LightGBMWapper
from src.trainer.tabular.simple import single_inference_fn_v2, single_train_fn

DATA_DIR = ROOT / "data"
INPUT_DIR = DATA_DIR / "atmacup19_dataset"
OUTPUT_DIR = DATA_DIR / "output"
CACHE_DIR = DATA_DIR / "cache"

for d in [DATA_DIR, INPUT_DIR, OUTPUT_DIR, CACHE_DIR]:
    d.mkdir(exist_ok=True, parents=True)

pl.Config.set_fmt_str_lengths(200)
pl.Config.set_tbl_cols(50)
pl.Config.set_tbl_rows(50)


In [2]:
EXP_NAME = "006"

SEED = 42
N_SPLITS = 3

DEBUG = False

In [3]:
EXP_OUTPUT_DIR = OUTPUT_DIR / EXP_NAME
EXP_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

FEATURE_PREFIX = "f_"
NUMERICAL_FEATURE_PREFIX = f"{FEATURE_PREFIX}n_"
CATEGORICAL_FEATURE_PREFIX = f"{FEATURE_PREFIX}c_"

TARGET_COLS = [
    "ビール",
    "ヘアケア",
    "チョコレート",
    "米（5㎏以下）",
]
META_COLS = ["session_id", "顧客CD"] + TARGET_COLS
FOLD_COL = "fold"


### load data


In [None]:
ec_log_df = pl.read_csv(INPUT_DIR / "ec_log.csv", infer_schema_length=200000)
jan_df = pl.read_csv(INPUT_DIR / "jan.csv")
test_session_df = pl.read_csv(INPUT_DIR / "test_session.csv")
train_session_df = pl.concat(
    [pl.read_csv(INPUT_DIR / "train_session.csv"), pl.read_csv(INPUT_DIR / "train_target.csv")], how="horizontal"
)

# sampling
if DEBUG:
    train_session_df = train_session_df.sample(10000, seed=SEED)

train_log_df = pl.read_csv(INPUT_DIR / "train_log.csv")

raw_full_session_df = (
    pl.concat(
        [
            train_session_df.with_columns(dataset=pl.lit("TRAIN")),
            test_session_df.with_columns(dataset=pl.lit("TEST")),
        ],
        how="diagonal_relaxed",
    )
    .with_columns(pl.col("売上日").cast(pl.Date))
    .with_columns(
        pl.datetime(
            pl.col("売上日").dt.year(), pl.col("売上日").dt.month(), pl.col("売上日").dt.day(), pl.col("時刻")
        ).alias("session_datetime")
    )
)

print(f"train_session_df: {train_session_df.shape}")
print(f"test_session_df: {test_session_df.shape}")

In [None]:
def compare_lists(list_a, list_b):
    set_a = set(list_a)
    set_b = set(list_b)

    common = set_a & set_b
    only_a = set_a - set_b
    only_b = set_b - set_a

    return (list(common), list(only_a), list(only_b))


common_user_ids, only_train_user_ids, only_test_user_ids = compare_lists(
    list_a=train_session_df["顧客CD"].unique().to_numpy(),
    list_b=test_session_df["顧客CD"].unique().to_numpy(),
)
print(len(common_user_ids), len(only_train_user_ids), len(only_test_user_ids))

# common 以外の user id を null にする
full_session_df = raw_full_session_df.with_columns(
    pl.when(pl.col("顧客CD").is_in(common_user_ids))
    .then(pl.col("顧客CD"))
    .otherwise(pl.lit("__NULL__"))
    .alias("顧客CD"),
    pl.col("顧客CD").alias("original_顧客CD"),
)


### fe


In [7]:
# @cache(cache_dir=CACHE_DIR, overwrite=False)
def create_amount_pivot_df(
    train_log_df: pl.DataFrame,
    jan_df: pl.DataFrame,
    jan_type: str = "部門",
    amount_types: tuple[str] | None = None,
    prefix: str = "",
    agg_method: str = "sum",
) -> pl.DataFrame:
    # session id ごとに JAN と {jan_type} を紐づける
    amount_types = amount_types or ["売上数量", "売上金額", "値割金額", "値割数量"]

    agg_exprs = []
    for amount_type in amount_types:
        if agg_method == "sum":
            agg_exprs.append(pl.col(amount_type).sum())
        elif agg_method == "mean":
            agg_exprs.append(pl.col(amount_type).mean())
        else:
            raise ValueError(f"Invalid agg_method: {agg_method}")

    session_agg_jan_df = (
        train_log_df.lazy()
        .join(jan_df.lazy(), on="JAN", how="inner")
        .select(
            pl.col("session_id"),
            *[pl.col(amount_type) for amount_type in amount_types],
            pl.col(jan_type),
        )
        .group_by(["session_id", jan_type])
        .agg(agg_exprs)
        .collect()
    )

    # session_id を index, jan_type における amount_types を column に持つ pivot table を作成
    amount_pivot_df = pl.DataFrame()
    for i, amount_type in enumerate(amount_types):
        cols = [pl.exclude("session_id").name.prefix(f"{prefix}{jan_type}_{amount_type}_{agg_method}_")]
        if i == 0:
            cols.insert(0, pl.col("session_id"))

        amount_pivot_df = pl.concat(
            [
                amount_pivot_df,
                session_agg_jan_df.pivot(jan_type, index="session_id", values=amount_type)
                .sort("session_id")
                .select(*cols),
            ],
            how="horizontal",
        )

    return amount_pivot_df


In [None]:
# user
amount_pivot_df = create_amount_pivot_df(
    train_log_df=train_log_df,
    jan_df=jan_df.filter(
        pl.col("カテゴリ名").is_in(
            [
                "ビール",
                "ヘアケア",
                "チョコレート",
                "米（5㎏以下）",
            ]
        )
    ),
    jan_type="カテゴリ名",
    amount_types=("売上数量",),
    prefix="amount_",
)


for group_key in [
    ["顧客CD"],
    ["店舗名", "年代", "性別"],
    # ["店舗名", "年代", "性別", "時刻"],
]:
    group_key_str = "_".join(group_key)
    user_amount_pivot_df = (
        full_session_df.filter(pl.col("dataset") == "TRAIN")
        .select(["session_id"] + group_key)
        .join(amount_pivot_df, on="session_id", how="inner")
        .drop("session_id")
        .group_by(group_key)
        .agg(pl.col("*").sum())
    )

    value_cols = [x for x in user_amount_pivot_df.columns if x not in group_key]

    full_session_df = (
        full_session_df.join(user_amount_pivot_df, on=group_key, how="left")
        .with_columns([pl.col(x).fill_null(0).name.suffix(f"_grpby_{group_key_str}") for x in value_cols])
        .drop(value_cols)
    )


# add feature
full_session_df = (
    full_session_df.with_columns(pl.lit(1).alias("dummy"))
    .with_columns(
        pl.col("顧客CD").is_in(full_session_df.filter(pl.col("dataset") != "TRAIN")["顧客CD"].unique()).alias("is_hot"),
        pl.col("売上日").dt.day().alias("day"),
        pl.col("売上日").dt.weekday().alias("weekday"),
        pl.col("年代")
        .replace(
            {
                "10代以下": 10,
                "20代": 20,
                "30代": 30,
                "40代": 40,
                "50代": 50,
                "60代": 60,
                "70代": 70,
                "80代以上": 80,
                "不明": None,
            }
        )
        .cast(pl.Float32)
        .alias("age"),
        pl.col("session_id").cum_count().over("顧客CD").alias("cum_visit_count"),
        pl.col("session_datetime").diff().over("顧客CD").alias("days_since_last_visit").dt.total_days(),
        pl.col("dummy").rolling_sum_by("session_datetime", window_size="1mo").over("顧客CD").alias("visit_count_1mo"),
        pl.col("dummy").rolling_sum_by("session_datetime", window_size="1w").over("顧客CD").alias("visit_count_1w"),
        pl.col("dummy").sum().over(["顧客CD", "売上日"]).alias("visit_count_today"),
        pl.col("dummy").sum().over(["顧客CD"]).alias("visit_count_total"),
    )
    .drop("dummy")
)


In [None]:
def fe(train_df: pl.DataFrame, test_df: pl.DataFrame) -> tuple[pl.DataFrame, pl.DataFrame]:
    encoders = [
        RawEncoder(
            columns=META_COLS,
            prefix="",
        ),
        RawEncoder(
            columns=[
                *[x for x in train_df.columns if x.startswith("amount_")],
                "時刻",
                "day",
                "weekday",
                "age",
                "cum_visit_count",
                "days_since_last_visit",
                "visit_count_1mo",
                "visit_count_1w",
                "visit_count_today",
                "visit_count_total",
                "is_hot",
            ],
            prefix=NUMERICAL_FEATURE_PREFIX,
        ),
        OrdinalEncoder(
            columns=[
                "性別",
                "顧客CD",
                "店舗名",
            ],
            prefix=CATEGORICAL_FEATURE_PREFIX,
        ),
    ]

    # train のみで fit
    for encoder in encoders:
        print(f"fit: {encoder}")
        encoder.fit(pl.concat([train_df, test_df], how="diagonal_relaxed"))

    # train, test に transform
    train_feature_df = pl.concat(
        [encoder.fit_transform(train_df) for encoder in encoders],
        how="horizontal",
    )

    test_feature_df = pl.concat(
        [encoder.transform(test_df) for encoder in encoders],
        how="horizontal",
    )
    return train_feature_df, test_feature_df


train_df = full_session_df.filter(pl.col("dataset") != "TEST")
test_df = full_session_df.filter(pl.col("dataset") == "TEST")

train_feature_df, test_feature_df = fe(train_df, test_df)

cat_feature_cols = [x for x in train_feature_df.columns if x.startswith(CATEGORICAL_FEATURE_PREFIX)]
num_feature_cols = [x for x in train_feature_df.columns if x.startswith(NUMERICAL_FEATURE_PREFIX)]
feature_cols = cat_feature_cols + num_feature_cols

### train


In [11]:
class EvalFn:
    def __init__(self, target_col: str, pred_col: str = "pred"):
        self.target_col = target_col

    def __call__(self, input_df: pl.DataFrame) -> dict[str, float]:
        y_true = input_df[self.target_col].to_numpy()
        y_pred = input_df["pred"].to_numpy()

        scores = {
            "rocauc": roc_auc_score(y_true, y_pred),
        }
        return scores

    @property
    def __name__(self) -> str:
        return self.__class__.__name__


def add_kfold(
    input_df: pl.DataFrame,
    n_splits: int,
    random_state: int,
    fold_col: str,
) -> pl.DataFrame:
    skf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state)  # NOTE:gkf にするべき?
    folds = np.zeros(len(input_df), dtype=np.int32)
    for fold, (_, valid_idx) in enumerate(skf.split(X=input_df)):
        folds[valid_idx] = fold
    return input_df.with_columns(pl.Series(name=fold_col, values=folds))

In [None]:
results = {}
scores = {}
for target_col in TARGET_COLS:
    name = f"lgb_{target_col}"
    _va_result_df, _va_scores, trained_models = single_train_fn(
        model=LightGBMWapper(
            name=name,
            model=lgb.LGBMModel(
                objective="binary",
                boosting="gbdt",
                n_estimators=5000,
                learning_rate=0.1,
                num_leaves=31,
                colsample_bytree=0.1,
                subsample=0.1,
                importance_type="gain",
                random_state=SEED,
                force_col_wise=True,
            ),
            fit_params={
                "callbacks": [
                    lgb.early_stopping(100, first_metric_only=True),
                    lgb.log_evaluation(period=100),
                ],
                "categorical_feature": cat_feature_cols,
                "feature_name": feature_cols,
                "eval_metric": "auc",
            },
        ),
        features_df=add_kfold(
            train_feature_df,
            n_splits=N_SPLITS,
            random_state=SEED,
            fold_col=FOLD_COL,
        ),
        feature_cols=feature_cols,
        target_col=target_col,
        fold_col=FOLD_COL,
        meta_cols=META_COLS + [FOLD_COL],
        out_dir=EXP_OUTPUT_DIR,
        eval_fn=EvalFn(target_col=target_col),
        overwrite=False,
    )

    results[name] = {
        "result_df": _va_result_df,
        "models": trained_models,
    }
    scores[name] = _va_scores

In [None]:
def construct_va_result_df(results: dict) -> pl.DataFrame:
    result_dfs = {
        k.split("_")[1]: x["result_df"].with_columns(pl.col("pred").alias(f"pred_{k.split('_')[1]}"))
        for k, x in results.items()
    }

    va_result_df = pl.DataFrame()
    for i, (name, result_df) in enumerate(result_dfs.items()):
        if i == 0:
            i_df = result_df.select(["session_id", "顧客CD", f"pred_{name}", name])
        else:
            i_df = result_df.select([f"pred_{name}", name])
        va_result_df = pl.concat([va_result_df, i_df], how="horizontal")
    return va_result_df


va_result_df = construct_va_result_df(results)
pred_cols = [f"pred_{x}" for x in TARGET_COLS]
score = roc_auc_score(va_result_df[TARGET_COLS].to_numpy(), va_result_df[pred_cols].to_numpy(), average="macro")
scores["final_metric"] = score

va_result_df.write_parquet(EXP_OUTPUT_DIR / "va_result_df.parquet")
with open(EXP_OUTPUT_DIR / "scores.json", "w") as f:
    json.dump(scores, f, indent=4)

scores

### inference


In [None]:
te_result_df = pl.DataFrame()
for i, (name, res) in enumerate(results.items()):
    target_name = name.split("_")[1]
    if i == 0:
        cols = [pl.col("session_id"), pl.col("pred").alias(target_name)]
    else:
        cols = [pl.col("pred").alias(target_name)]

    _te_result_df = single_inference_fn_v2(
        models=res["models"],
        features_df=test_feature_df,
        feature_names=feature_cols,
    ).select(cols)
    te_result_df = pl.concat([te_result_df, _te_result_df], how="horizontal")

submission_df = (
    test_session_df.select("session_id")
    .join(te_result_df, on="session_id", how="inner")
    .select(["session_id"] + TARGET_COLS)
)

submission_df.write_parquet(EXP_OUTPUT_DIR / "te_result_df.parquet")
submission_df.select(
    [
        "チョコレート",
        "ビール",
        "ヘアケア",
        "米（5㎏以下）",
    ]
).write_csv(EXP_OUTPUT_DIR / "submission.csv")