In [12]:
import sys

sys.path.append("../../")
%cd /workspace/src

import hashlib
from functools import cached_property, wraps
from pathlib import Path
from typing import Callable

import joblib
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from custom.config_types import CONFIG_TYPES
from logger import Logger
from pytorch_pfn_extras.config import Config
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence
from torch.utils.data import DataLoader, Dataset
from util import load_yaml, reduce_mem_usage, sort_df_numpy

/workspace/src


In [13]:
logger = Logger(name="fe")

# set config
pre_eval_config = load_yaml()
config = Config(pre_eval_config, types=CONFIG_TYPES)

# set const
DEBUG = True

In [14]:
class TaskDatset:
    def __init__(self, config, overwrite=False) -> None:
        self.config = config
        self.dirpath = Path(config["/global/resources"]) / "input"
        self.dataset_name = config["/fe/dataset"]

        self.raw_train_filepath = self.dirpath / f"{self.dataset_name}_raw_train.parquet"
        self.raw_test_filepath = self.dirpath / f"{self.dataset_name}_raw_test.parquet"

        self.overwrite = overwrite

    @property
    def raw_train_data(self):
        if self.raw_train_filepath.is_file() and (not self.overwrite):
            return pd.read_parquet(self.raw_train_filepath)

        raw_train_df = self.raw_data.query("x != 999").reset_index(drop=True)
        raw_train_df.to_parquet(self.raw_train_filepath)
        return raw_train_df

    @property
    def raw_test_data(self):
        if self.raw_test_filepath.is_file() and (not self.overwrite):
            return pd.read_parquet(self.raw_test_filepath)

        raw_test_df = self.raw_data.query("x == 999")
        raw_test_df.to_parquet(self.raw_test_filepath).reset_index(drop=True)
        return raw_test_df

    @cached_property
    def raw_data(self):
        return read_parquet_from_csv(
            filepath=self.dirpath / f"{self.dataset_name}.csv.gz",
            dirpath=self.dirpath,
            process_fns=[reduce_mem_usage, sort_df_numpy],
            overwrite=self.config["/fe/overwrite"],
        )

    @property
    def poi_data(self):
        return read_parquet_from_csv(
            filepath=self.dirpath / "cell_POIcat.csv.gz", dirpath=self.dirpath
        )


def read_parquet_from_csv(
    filepath: Path,
    dirpath: Path,
    process_fns: list[Callable] | None = None,
    overwrite: bool = False,
) -> pd.DataFrame:
    name = filepath.name.split(".")[0]
    parquet_filepath = dirpath / f"{name}.parquet"
    if parquet_filepath.is_file() and (not overwrite):
        logger.info(f"load parquet file ({str(filepath)})")
        return pd.read_parquet(parquet_filepath)

    logger.info(f"load csv & convert to parquet ({str(filepath)})")
    df = pd.read_csv(filepath)

    if process_fns is not None:
        for fn in process_fns:
            logger.info(f"excute {fn.__name__}")
            df = fn(df)

    df.to_parquet(parquet_filepath)
    return df


def cache(out_dir: Path, overwrite: bool = False, no_cache: bool = False):
    out_dir.mkdir(parents=True, exist_ok=True)

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if no_cache:
                return func(*args, **kwargs)

            extractor = args[1]

            extractor_name = extractor.__class__.__name__
            hash_input = extractor_name + str(extractor.__dict__)

            # use hash
            extractor_id = hashlib.sha256(hash_input.encode()).hexdigest()
            filename = f"{extractor_name}_{extractor_id}"
            cache_file = out_dir / f"{filename}.pkl"

            if cache_file.exists() and not overwrite:
                logger.debug(f"use cache : {filename}")
                result = joblib.load(cache_file)
            else:
                result = func(*args, **kwargs)
                joblib.dump(result, cache_file)
            return result

        return wrapper

    return decorator


def make_features(config, df, overwrite=False):
    extractors = config["fe/extractors"]
    out_dir = Path(config["/global/resources"]) / "output" / config["fe/out_dir"]

    @cache(out_dir=out_dir, overwrite=overwrite, no_cache=True)
    def _extract(df, extractor):
        with logger.time_log(target=extractor.__class__.__name__):
            return extractor(df)

    features_df = pd.concat([df] + [_extract(df, extractor) for extractor in extractors], axis=1)
    return features_df


class TrainValidDataset:
    def __init__(self, config, uids, overwrite=True):
        self.config = config
        out_dir = Path(config["/global/resources"]) / "output" / config["fe/out_dir"]
        self.train_filepath = out_dir / "train_feaures_df.pkl"
        self.valid_filepath = out_dir / "valid_features_df.pkl"

        self.uids = uids
        self.overwrite = overwrite

    @cached_property
    def valid_uids(self):
        valid_uids = (
            pd.Series(np.unique(self.uids))
            .sample(self.config["/cv/n_valid_uids"], random_state=self.config["/global/seed"])
            .tolist()
        )
        return valid_uids

    def load_valid_data(self, df):
        if self.valid_filepath.is_file() and (not self.overwrite):
            return joblib.load(self.train_filepath)

        valid_df = df[df["uid"].isin(self.valid_uids)].reset_index(drop=True)
        joblib.dump(valid_df, self.valid_filepath)
        return valid_df

    def load_train_data(self, df):
        if self.train_filepath.is_file() and (not self.overwrite):
            return joblib.load(self.train_filepath)

        train_df = df[~df["uid"].isin(self.valid_uids)].reset_index(drop=True)
        joblib.dump(train_df, self.train_filepath)
        return train_df


# load data
task_dataset = TaskDatset(config=config, overwrite=True)
raw_train_df = task_dataset.raw_train_data
poi_df = task_dataset.poi_data

if DEBUG:
    user_ids = raw_train_df["uid"].sample(100, random_state=config["/global/seed"]).tolist()
    raw_train_df = raw_train_df[raw_train_df["uid"].isin(user_ids)].reset_index(drop=True)

# feature engineering
train_df = make_features(config=config, df=raw_train_df, overwrite=True)
train_valid_dataset = TrainValidDataset(config=config, uids=train_df["uid"], overwrite=True)
valid_df = train_valid_dataset.load_valid_data(df=train_df)
train_df = train_valid_dataset.load_train_data(df=train_df)

[2023-08-05 21:57:08] [32m[fe] [INFO] - load parquet file (/workspace/resources/input/task2_dataset.csv.gz)[0m
[2023-08-05 21:57:14] [32m[fe] [INFO] - load parquet file (/workspace/resources/input/cell_POIcat.csv.gz)[0m
[2023-08-05 21:57:15] [32m[fe] [INFO] - start GroupedDiffFeatureExtractor 🚀[0m
[2023-08-05 21:57:15] [32m[fe] [INFO] - end GroupedDiffFeatureExtractor ✨ - elapsed time: 0.02 seconds ⏰[0m


In [15]:
def make_sequences(df: pd.DataFrame, group_key: str, group_values: list[str]):
    grouped = df.groupby(group_key, sort=False)
    sequences = [torch.tensor(group[group_values].to_numpy()) for _, group in grouped]
    return sequences


feature_names = [x for x in train_df.columns if x.startswith("f_")]
feature_seqs = make_sequences(df=train_df, group_key="uid", group_values=feature_names)
target_seqs = make_sequences(df=train_df, group_key="uid", group_values=["x", "y"])

In [23]:
class TrainDataset(Dataset):
    def __init__(self, feature_seqs, target_seqs):
        self.feature_seqs = feature_seqs
        self.target_seqs = target_seqs

    def __len__(self):
        return len(self.feature_seqs)

    def __getitem__(self, index: int) -> dict[str : torch.Tensor]:
        feature_seqs = torch.Tensor(self.feature_seqs[index]).float()
        target_seqs = torch.Tensor(self.target_seqs[index]).float()
        return {"feature_seqs": feature_seqs, "target_seqs": target_seqs}


class TestDataset(Dataset):
    def __init__(self, feature_seqs):
        self.feature_seqs = feature_seqs

    def __len__(self):
        return len(self.feature_seqs)

    def __getitem__(self, index: int) -> dict[str : torch.Tensor]:
        feature_seqs = torch.Tensor(self.feature_seqs[index]).float()
        return {"feature_seqs": feature_seqs}


class CustomCollate:
    def __init__(self, is_train_mode=True):
        self.is_train_mode = is_train_mode

    def __call__(self, batch):
        feature_seqs = [item["feature_seqs"] for item in batch]
        lengths = [len(seq) for seq in feature_seqs]
        feature_seqs_padded = pad_sequence(
            [(seq) for seq in feature_seqs], batch_first=True
        )  # (sequence_len, feature_dim)

        if not self.is_train_mode:
            return {
                "feature_seqs": feature_seqs_padded,
                "lengths": lengths,
            }

        target_seqs = [item["target_seqs"] for item in batch]
        target_seqs_padded = pad_sequence(
            [(seq) for seq in target_seqs], batch_first=True
        )  # (sequence_len, target_dim)
        return {
            "feature_seqs": feature_seqs_padded,
            "target_seqs": target_seqs_padded,
            "lengths": lengths,
        }

In [None]:
class CustomLSTMModelV1(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super().__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, batch):
        # to variable length
        x = pack_padded_sequence(
            batch["feature_seqs"],
            batch["lengths"],
            batch_first=True,
            enforce_sorted=False,
        )
        packed_out, _ = self.lstm(x)
        out, _ = pad_packed_sequence(packed_out, batch_first=True)  # to fixible length
        out = self.fc(out)
        return out

In [None]:
def train_fn(
    config,
    model,
    dataloader,
    criterion,
    optimizer,
    scheduler,
    device,
    wandb_logger,
    _custom_step,
):
    model.train()
    scaler = torch.cuda.amp.GradScaler()
    losses = []

    tbar = tqdm(enumerate(dataloader), total=len(dataloader))
    for step, batch in tbar:
        for k, v in batch.items():
            batch[k] = v.to(device)
        targets = batch["targets"]
        batch_size = targets.size(0)

        with torch.cuda.amp.autocast():
            batch_outputs = model(batch)
            loss = criterion(batch_outputs, targets)

        if config.gradient_accumulation_steps > 1:
            loss = loss / config.gradient_accumulation_steps

        scaler.scale(loss).backward()
        if config.clip_grad_norm is not None:
            torch.nn.utils.clip_grad_norm_(model.parameters(), config.clip_grad_norm)

        if (step + 1) % config.gradient_accumulation_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()
            _custom_step += 1

            if config.batch_scheduler:
                scheduler.step()

        wandb_logger.log(
            {"train_loss": loss, "lr": scheduler.get_lr()[0], "train_step": _custom_step}
        )

        losses.append(float(loss))
        tbar.set_description(f"loss: {np.mean(losses):.4f} lr: {scheduler.get_lr()[0]:.6f}")

    loss = np.mean(losses)
    return loss, _custom_step


def valid_fn(
    config,
    model,
    dataloader,
    criterion,
    device,
    wandb_logger,
    _custom_step,
):
    model.eval()
    outputs, targets = [], []
    losses = []

    tbar = tqdm(enumerate(dataloader), total=len(dataloader))
    for step, batch in tbar:
        targets.append(batch["targets"])

        for k, v in batch.items():
            batch[k] = v.to(device)

        batch_size = batch["targets"].size(0)
        with torch.no_grad():
            batch_outputs = model(batch["images"], labels=batch["targets"])
            loss = criterion(batch_outputs, batch["targets"])

        if config.gradient_accumulation_steps > 1:
            loss = loss / config.gradient_accumulation_steps

        batch_outputs = torch.softmax(batch_outputs, dim=1)  # to proba
        batch_outputs = batch_outputs.to("cpu").numpy()
        outputs.append(batch_outputs)

        wandb_logger.log({"valid_loss": loss, "valid_step": _custom_step})
        _custom_step += 1
        losses.append(float(loss))

        tbar.set_description(f"loss: {np.mean(losses):.4f}")

    outputs = np.concatenate(outputs)
    targets = np.concatenate(targets)

    loss = np.mean(losses)
    return (loss, outputs, targets, _custom_step)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

max_epochs = 2
batch_size = 2

input_dim = 4
output_dim = 2
hidden_dim = 32
num_layers = 2

train_dataset = TrainDataset(feature_seqs=feature_seqs, target_seqs=target_seqs)
train_dataloader = DataLoader(
    train_dataset,
    batch_size=5,
    collate_fn=CustomCollate(is_train_mode=True),
    shuffle=False,
)

criterion = nn.MSELoss()
model = CustomLSTMModel(
    input_dim=input_dim,
    hidden_dim=hidden_dim,
    num_layers=num_layers,
    output_dim=output_dim,
)

for _ in range(max_epochs):
    train_loss = noopt_train_loop(
        model=model,
        dataloader=train_dataloader,
        criterion=criterion,
        device=device,
    )
    print(train_loss)

In [None]:
def infer(model, dataloader, device):
    model.eval()  # モデルを評価モードに設定
    all_outputs = []

    with torch.no_grad():  # 勾配の計算を無効化
        for data in dataloader:
            inputs = data["feature_seqs"].to(device)
            lengths = data["lengths"]
            inputs = pack_padded_sequence(inputs, lengths, batch_first=True, enforce_sorted=False)

            # モデルの予測を計算
            outputs = model(inputs)
            all_outputs.append(outputs)

    return all_outputs


test_dataset = TestDataset(feature_seqs=feature_seqs)
test_dataloader = DataLoader(
    test_dataset,
    batch_size=5,
    collate_fn=CustomCollate(is_train_mode=False),
    shuffle=False,
)
model = CustomLSTMModel(
    input_dim=input_dim,
    hidden_dim=hidden_dim,
    num_layers=num_layers,
    output_dim=output_dim,
)
outputs = infer(model=model, dataloader=test_dataloader, device=device)

In [None]:
print(outputs[0].shape, len(outputs))

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader


def sequential_prediction(model, initial_features, steps_to_predict):
    # 入力特徴量のサイズ取得
    batch_size, seq_length, feature_dim = initial_features.size()
    # LSTMの隠れ状態とセル状態を初期化
    hidden_state = torch.zeros(model.num_layers, batch_size, model.hidden_dim).to(
        initial_features.device
    )
    cell_state = torch.zeros(model.num_layers, batch_size, model.hidden_dim).to(
        initial_features.device
    )
    # 初回の特徴量を設定
    current_features = initial_features
    predictions = []

    # 逐次的に予測を行う
    for step in range(steps_to_predict):
        # 前回の隠れ状態とセル状態を使用して、LSTMから出力を取得
        output, (hidden_state, cell_state) = model.lstm(
            current_features, (hidden_state, cell_state)
        )
        # 最後の時間ステップの出力を取得して予測に変換
        prediction = model.fc(output[:, -1, :])
        predictions.append(prediction)

        # 新しい特徴量を作成 (例: 最後の予測値を特徴量として追加)
        # 必要に応じて、新しい特徴量の形状を調整
        new_feature = prediction.view(batch_size, 1, -1)
        current_features = torch.cat((current_features[:, 1:, :], new_feature), dim=1)

    # 予測結果をテンソルに変換
    predictions = torch.stack(predictions, dim=1)
    return predictions


# データの準備 (仮想のデータセット)
train_features = torch.randn(100, 10, 5)  # 100サンプル, 10時間ステップ, 5特徴量
train_targets = torch.randn(100, 10, 1)  # 100サンプル, 10時間ステップ, 1出力
train_dataset = torch.utils.data.TensorDataset(train_features, train_targets)
train_loader = DataLoader(train_dataset, batch_size=32)

# モデルの定義
input_dim = 5
hidden_dim = 20
num_layers = 2
output_dim = 5
model = CustomLSTMModel(input_dim, hidden_dim, num_layers, output_dim)

# 逐次的予測
initial_features = train_features[0:1]  # 1サンプル, t0 ~ t10の特徴量
steps_to_predict = 10
predictions = sequential_prediction(model, initial_features, steps_to_predict)
print("Sequential predictions:", predictions)

In [None]:
test_df = raw_train_df.query(f"x == 999")
test_df

In [None]:
train_df_ = raw_train_df.query(f"x != 999")
train_df_["d"].max()

In [None]:
train_df_ = raw_train_df.query(f"x != 999")
valid_df