In [1]:
%cd ..

/kaggle/working


In [2]:
from hydra import compose, initialize
from omegaconf import OmegaConf

with initialize(version_base=None, config_path="../generate_datasets/009"):
    cfg = compose(config_name="config.yaml", overrides=["debug=True"])
    print(OmegaConf.to_yaml(cfg))

debug: true
seed: 7
dir:
  data_dir: /kaggle/working/input/atmaCup16_Dataset
  output_dir: /kaggle/working/output
  exp_dir: /kaggle/working/output/exp
  cand_unsupervised_dir: /kaggle/working/output/cand_unsupervised
  cand_supervised_dir: /kaggle/working/output/cand_supervised
  datasets_dir: /kaggle/working/output/datasets
exp:
  fold_path: /kaggle/working/output/datasets/make_cv/base/train_fold.parquet
  candidate_info_list:
  - name: transition_prob/base
    max_num_candidates: 100
    dir: /kaggle/working/output/cand_unsupervised/transition_prob/base
  - name: transition_prob_all/base
    max_num_candidates: 100
    dir: /kaggle/working/output/cand_unsupervised/transition_prob_all/base
  - name: ranking_location/sml_cd
    max_num_candidates: 20
    dir: /kaggle/working/output/cand_unsupervised/ranking_location/sml_cd
  - name: ranking_location/lrg_cd
    max_num_candidates: 20
    dir: /kaggle/working/output/cand_unsupervised/ranking_location/lrg_cd
  - name: ranking_location/ke

In [3]:
import os
import sys
from pathlib import Path

import hydra
import polars as pl
from hydra.core.hydra_config import HydraConfig
from omegaconf import DictConfig, OmegaConf
from sklearn.preprocessing import OrdinalEncoder

from utils.data import convert_to_32bit
from utils.load import load_label_data, load_log_data, load_yad_data
from utils.logger import get_logger

numerical_cols = [  # あとで書き換えるので注意
    "total_room_cnt",
    "wireless_lan_flg",
    "onsen_flg",
    "kd_stn_5min",
    "kd_bch_5min",
    "kd_slp_5min",
]

categorical_cols = [
    "yad_type",
    "wid_cd",
    "ken_cd",
    "lrg_cd",
    "sml_cd",
]

logger = None
ordinal_encoder = None

In [4]:
def load_yad_data_with_features(cfg):
    global numerical_cols
    yad_df = load_yad_data(Path(cfg.dir.data_dir))
    original_cols = yad_df.columns
    for path in cfg.exp.yad_feature_paths:
        feature_df = pl.read_parquet(path)
        yad_df = yad_df.join(feature_df, on="yad_no")
    new_cols = [col for col in yad_df.columns if col not in original_cols]
    numerical_cols = list(set(numerical_cols) | set(new_cols))
    return yad_df


def load_and_union_candidates(cfg, mode: str):
    # logデータのsession中のyad_noを候補に加える
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    df = log_df.group_by("session_id").agg(pl.col("yad_no").alias("candidates"))
    dfs = [df]
    for candidate_info in cfg.exp.candidate_info_list:
        df = pl.read_parquet(Path(candidate_info["dir"]) / f"{mode}_candidate.parquet")
        df = df.with_columns(
            pl.col("candidates")
            .list.head(candidate_info["max_num_candidates"])
            .alias("candidates")
        ).filter(pl.col("candidates").list.len() > 0)
        dfs.append(df)
    df = pl.concat(dfs)
    df = (
        df.group_by("session_id")
        .agg(pl.col("candidates").flatten())
        .with_columns(pl.col("candidates").list.unique())
    ).select(["session_id", "candidates"])

    # リストを展開
    candidate_df = df.explode("candidates")

    # セッション最後のyad_noを除外
    last_df = (
        load_log_data(Path(cfg.dir.data_dir), mode)
        .group_by("session_id")
        .agg(pl.col("yad_no").last().alias("candidates"))
        .with_columns(pl.lit(True).alias("last"))
        .sort(by="session_id")
    )
    candidate_df = (
        candidate_df.join(last_df, on=["session_id", "candidates"], how="left")
        .filter(pl.col("last").is_null())
        .drop("last")
    )
    return candidate_df


def concat_label_fold(cfg, mode: str, candidate_df):
    """
    train に対して original, label, fold を付与する
    validationのスコア計算時にはoriginalを外して計算を行う
    """
    if mode == "train":
        candidate_df = (
            pl.concat(
                [
                    candidate_df.with_columns(
                        pl.lit(True).alias("original"), pl.lit(False).alias("label")
                    ),
                    load_label_data(Path(cfg.dir.data_dir))
                    .with_columns(
                        pl.col("yad_no").alias("candidates"),
                        pl.lit(False).alias("original"),
                        pl.lit(True).alias("label"),
                    )
                    .drop("yad_no"),
                ]
            )
            .group_by(["session_id", "candidates"])
            .agg(pl.sum("original"), pl.sum("label"))
        )
        fold_df = pl.read_parquet(cfg.exp.fold_path)
        candidate_df = candidate_df.join(fold_df, on="session_id")
    return candidate_df


def concat_session_feature(cfg, mode: str, candidate_df: pl.DataFrame):
    """
    # TODO: categorical_colsの情報もあとで追加する
    session_id, seq_no, yad_no に yado.csv を結合して集約し、セッションに関する特徴量を作成する
    """
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    yad_df = load_yad_data_with_features(cfg)
    log_yad_df = log_df.join(yad_df.fill_null(0), on="yad_no")
    log_yad_df = log_yad_df.group_by(by="session_id").agg(
        [pl.sum(col).name.suffix("_session_sum") for col in numerical_cols]
        + [pl.min(col).name.suffix("_session_min") for col in numerical_cols]
        + [pl.max(col).name.suffix("_session_max") for col in numerical_cols]
        + [pl.std(col).name.suffix("_session_std") for col in numerical_cols]
    )

    candidate_df = candidate_df.join(log_yad_df, on="session_id")

    return candidate_df


def concat_candidate_feature(cfg, mode: str, candidate_df: pl.DataFrame):
    """
    # TODO: categorical_colsの情報もあとで追加する
    candidateの特徴量を抽出する
    """
    original_cols = candidate_df.columns

    yad_df = load_yad_data_with_features(cfg)
    candidate_yad_df = candidate_df.join(
        yad_df.select(["yad_no"] + numerical_cols + categorical_cols),
        left_on="candidates",
        right_on="yad_no",
    )

    new_cols = [col for col in candidate_yad_df.columns if col not in original_cols]
    print(f"new_cols: {new_cols}")
    return candidate_yad_df

In [6]:
mode = "train"
candidate_df = load_and_union_candidates(cfg, mode)

candidate_df2 = concat_label_fold(cfg, mode, candidate_df)

candidate_df3 = concat_session_feature(cfg, mode, candidate_df2)

candidate_df4 = concat_candidate_feature(cfg, mode, candidate_df3)

new_cols: ['rank_ranking_location/wid_cd', 'total_room_cnt', 'rank_ranking_location/lrg_cd', 'kd_stn_5min', 'rank_ranking/base', 'onsen_flg', 'rank_ranking_location/sml_cd', 'kd_slp_5min', 'counts_ranking_location/wid_cd', 'rank_ranking_location/ken_cd', 'counts_ranking_location/sml_cd', 'counts_ranking/base', 'wireless_lan_flg', 'kd_bch_5min', 'counts_ranking_location/ken_cd', 'counts_ranking_location/lrg_cd', 'yad_type', 'wid_cd', 'ken_cd', 'lrg_cd', 'sml_cd']


In [None]:
def concat_session_candidate_feature(cfg, mode: str, candidate_df: pl.DataFrame):
    """
    session中の特徴とcandidateの関係性を特徴量として抽出する
    例: session中におけるcandidateの出現回数(割合)、candidateと同一地域のものを見た回数(割合)
    """
    original_cols = candidate_df.columns
    print(original_cols)

    # 同じ candidate の出現回数
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    tmp = (
        log_df.group_by(by=["session_id", "yad_no"])
        .agg(pl.count("session_id").alias("appear_count"))
        .with_columns(
            (
                pl.col("appear_count") / pl.col("appear_count").sum().over("session_id")
            ).alias("appear_rate"),
            pl.col("yad_no").alias("candidates"),
        )
    )
    candidate_df = candidate_df.join(
        tmp.select(["session_id", "candidates", "appear_count", "appear_rate"]),
        on=["session_id", "candidates"],
        how="left",
    )

    # 同じ categorical の出現回数
    ## (series_id, categorical) でグループ化して、session_id ごとに出現回数を集計する
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    yad_df = load_yad_data(Path(cfg.dir.data_dir))
    log_yad_df = log_df.join(yad_df.fill_null(0), on="yad_no")
    for col in categorical_cols:
        tmp = (
            log_yad_df.group_by(by=["session_id", col])
            .agg(pl.count("session_id").alias(f"same_{col}_count"))
            .with_columns(
                pl.col(f"same_{col}_count").sum().over("session_id").alias("seq_sum")
            )
            .with_columns(
                (pl.col(f"same_{col}_count") / pl.col("seq_sum")).alias(
                    f"same_{col}_rate"
                )
            )
        )
        candidate_df = candidate_df.join(
            tmp.select(["session_id", col, f"same_{col}_count", f"same_{col}_rate"]),
            on=["session_id", col],
            how="left",
        )

    # transition probを追加
    yad2yad_prob = pl.read_parquet(cfg.exp.transition_prob_path)
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    last_log_df = (
        log_df.group_by("session_id")
        .agg(pl.all().sort_by("seq_no").last())
        .sort(by="session_id")
        .with_columns(pl.col("yad_no").alias("from_yad_no"))
    ).select(["session_id", "from_yad_no"])
    last_log_prob_df = last_log_df.join(yad2yad_prob, on="from_yad_no")
    candidate_df = candidate_df.join(
        last_log_prob_df,
        left_on=["session_id", "candidates"],
        right_on=["session_id", "to_yad_no"],
        how="left",
    ).drop("from_yad_no")

    # last 以外からのtransition probも追加
    yad2yad_prob = pl.read_parquet(cfg.exp.transition_prob_path)
    prob_col = "transition_prob_transition_prob/base"
    log_df = load_log_data(Path(cfg.dir.data_dir), mode)
    log_df = (
        log_df.sort(by="session_id").with_columns(pl.col("yad_no").alias("from_yad_no"))
    ).select(["session_id", "from_yad_no"])
    log_df = (
        log_df.join(yad2yad_prob, on="from_yad_no")
        .group_by(["session_id", "to_yad_no"])
        .agg(pl.sum(prob_col).alias(prob_col + "_from_all"))
    )
    candidate_df = candidate_df.join(
        log_df,
        left_on=["session_id", "candidates"],
        right_on=["session_id", "to_yad_no"],
        how="left",
    ).drop("from_yad_no")

    # 増えたカラムを出力
    new_cols = [col for col in candidate_df.columns if col not in original_cols]
    print(f"new_cols: {new_cols}")

    return candidate_df


concat_session_candidate_feature(cfg, mode, candidate_df4)