In [None]:
#!pip install gluonts==0.14.4
#!pip install 'gluonts[torch]'
#!pip install --upgrade gluonts
#!pip install --upgrade transformers

In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import LabelEncoder
import holidays
from transformers import PatchTSTConfig, PatchTSTForPrediction, TrainingArguments, Trainer

# 💡 라이브러리 변경: tsfm_public의 도구들을 가져옵니다.
from tsfm_public.toolkit.time_series_preprocessor import TimeSeriesPreprocessor
from tsfm_public.toolkit.dataset import ForecastDFDataset

DEVICE = "mps" #"cuda" if torch.cuda.is_available() else "cpu"
print(DEVICE)

In [None]:
import torch
import torch.nn.functional as F
from transformers.models.patchtst.modeling_patchtst import PatchTSTForPrediction

TARGET_CH = 0  # sales_log 채널 인덱스(보통 0)

class PatchTSTSalesOnly(torch.nn.Module):
    def __init__(self, base: PatchTSTForPrediction, target_ch: int = TARGET_CH):
        super().__init__()
        self.base = base
        self.target_ch = target_ch
        self.config = base.config  # HF가 참조

    # ★ Trainer가 state_dict 저장할 때 base만 저장되도록
    def state_dict(self, *args, **kwargs):
        return self.base.state_dict(*args, **kwargs)

    # ★ 로드 시에도 base로 로드되도록
    def load_state_dict(self, state_dict, strict=True):
        return self.base.load_state_dict(state_dict, strict)

    # ★ 명시적으로 HuggingFace 형식으로 저장하고 싶을 때
    def save_pretrained(self, save_directory):
        self.base.save_pretrained(save_directory)

    @staticmethod
    def _extract_preds_from_output(out, pred_len: int, num_in_ch: int):
        # dict-like
        if hasattr(out, "keys"):
            for k in ["logits", "predictions", "prediction_outputs", "y_hat", "yhat", "forecast"]:
                v = out.get(k, None)
                if isinstance(v, torch.Tensor):
                    return v
        # attribute
        for k in ["logits", "predictions", "prediction_outputs", "y_hat", "yhat", "forecast"]:
            v = getattr(out, k, None)
            if isinstance(v, torch.Tensor):
                return v
        # tuple/list
        if isinstance(out, (tuple, list)):
            cand = [t for t in out if isinstance(t, torch.Tensor)]
            for t in cand:
                if t.ndim == 3 and t.shape[-2] == pred_len and (t.shape[-1] in (1, num_in_ch)):
                    return t
            for t in cand:
                if t.ndim == 2 and t.shape[-1] == pred_len:
                    return t
            if cand:
                return max(cand, key=lambda x: x.numel())
        # fallback to tuple conversion
        try:
            tup = out.to_tuple()
            for t in tup:
                if isinstance(t, torch.Tensor) and t.ndim >= 2:
                    return t
        except Exception:
            pass
        raise AttributeError("예측 텐서를 출력에서 찾지 못했습니다.")

    def forward(self, past_values, past_observed_mask=None, future_values=None, **kwargs):
        # 내부 기본 loss는 피하고 예측만 얻기 위해 future_values=None으로 호출
        base_out = self.base(
            past_values=past_values,
            past_observed_mask=past_observed_mask,
            future_values=None,
            **kwargs
        )

        # 예측 텐서 추출
        preds_all = self._extract_preds_from_output(
            base_out,
            pred_len=self.config.prediction_length,
            num_in_ch=self.config.num_input_channels,
        )  # (B, pred_len, C) or (B, pred_len)

        # 타깃 채널만 선택
        if preds_all.ndim == 3:
            preds_target = preds_all[..., self.target_ch]  # (B, pred_len)
        else:
            preds_target = preds_all  # 이미 (B, pred_len)

        # 라벨도 타깃 채널만으로 맞춰서 손실 계산
        loss = None
        if future_values is not None:
            fv = future_values
            if fv.ndim == 3 and fv.shape[-1] == self.config.num_input_channels:
                target = fv[..., self.target_ch].float()      # (B, pred_len)
            elif fv.ndim == 3 and fv.shape[-1] == 1:
                target = fv.squeeze(-1).float()               # (B, pred_len)
            elif fv.ndim == 2:
                target = fv.float()
            else:
                raise RuntimeError(f"future_values shape 예상 밖: {fv.shape}")
            loss = F.mse_loss(preds_target.float(), target)

        # HF Trainer가 인식하는 dict 반환 (loss/logits 필수)
        ret = {
            "logits": preds_target,           # predict/eval에서 사용
            "predictions": preds_target,      # predict() 시 편의
        }
        if loss is not None:
            ret["loss"] = loss
        # 필요하면 loc/scale도 패스스루
        for k in ["loc", "scale"]:
            v = getattr(base_out, k, None) if not isinstance(base_out, dict) else base_out.get(k, None)
            if isinstance(v, torch.Tensor):
                ret[k] = v
        return ret


## 학습 데이터 준비

In [None]:
# open_date.csv의 메뉴 이름 집합
launch_menu_names = set(pd.read_csv('./EDA/open_date.csv')['메뉴'].dropna())

# train.csv의 메뉴 이름 집합
sales_menu_names = set(pd.read_csv("./dataset/train/train.csv")['영업장명_메뉴명'].dropna())

# 출시일에만 있고 판매 데이터에는 없는 메뉴 (문제가 될 가능성은 적음)
print("출시일에만 있는 메뉴:", launch_menu_names - sales_menu_names)

# 판매 데이터에는 있는데 출시일 정보가 없는 메뉴 (이 부분을 확인해야 함)
print("판매 데이터에만 있는 메뉴:", sales_menu_names - launch_menu_names)

In [None]:
'''
- 영업 시작 전 데이터는 결측 처리함 -> 학습에 사용되지 않음.
- 라그로타_까르보나라, 담하 꼬막 비빔밥은 판매수량이 0이어도 판매하지 않는 기간까지 학습하기 위해 시작 시점 수정함.
'''
# ==============================================
# 1. 데이터 로드 및 전처리 (사용자 코드 유지)
# ==============================================

# 1. 신메뉴 출시일 데이터 준비
menu_launch_df = pd.read_csv('./EDA/open_date.csv')
menu_launch_df['출시'] = pd.to_datetime(menu_launch_df['출시'], errors='coerce')
launch_dates = menu_launch_df.set_index('메뉴')['출시'].dropna().to_dict()

def mask_prelaunch_sales(group):
    menu_name = group.name
    launch_date = launch_dates.get(menu_name)
    
    if launch_date:
        group.loc[group['date'] < launch_date, 'sales'] = np.nan
    return group

df = pd.read_csv("./dataset/train/train.csv")
df.columns = ["date", "store_menu", "sales"]
df["date"] = pd.to_datetime(df["date"])

df.loc[df['sales'] < 0, 'sales'] = 0
df["sales"] = df["sales"].astype(float)
# 메뉴별로 그룹화하여 함수 적용 후 인덱스 초기화
df = df.groupby('store_menu').apply(mask_prelaunch_sales).reset_index(drop=True)
df["sales_log"] = np.log1p(df["sales"])     # target은 이제 sales_log

# entity embedding용 ID 인코딩
# LabelEncoder 객체를 저장해두면 나중에 원래 이름으로 복원할 때 유용합니다.
encoder = LabelEncoder()
df["store_menu_id"] = encoder.fit_transform(df["store_menu"])
num_entities = df["store_menu_id"].nunique() # 고유 ID 개수 저장

# feature 추가
kr_holidays = holidays.KR(years=df['date'].dt.year.unique())
df["is_holiday"] = df["date"].isin(kr_holidays).astype(int)
df["is_weekend"] = df["date"].dt.day_of_week.isin([5, 6]).astype(int)
df["is_ski_season"] = df["date"].dt.month.isin([12, 1, 2]).astype(int)

print("데이터 전처리 완료.")
#print(df.head())

In [None]:
tmp = df["sales"].isna().value_counts()
print(f"- 결측치 확인\n {tmp}\n\n")

print(f"- 데이터 샘플")
df.loc[(df["store_menu"] == "느티나무 셀프BBQ_쌈장")]

In [None]:
# ==============================================
# 2. ForecastDFDataset으로 변환
# ==============================================
forecast_horizon = 7
context_length = 28

# 학습/검증 데이터 분리
split_date = df['date'].max() - pd.Timedelta(days=forecast_horizon * 2)
train_data = df[df['date'] < split_date]
valid_data = df[df['date'] >= split_date]  # 검증 데이터는 전체 사용

# ForecastDFDataset 생성
train_dataset = ForecastDFDataset(
    train_data,
    id_columns=["store_menu_id"],
    timestamp_column="date",
    target_columns=["sales_log"],
    control_columns=["is_holiday", "is_weekend", "is_ski_season"],
    context_length=context_length,
    prediction_length=forecast_horizon,
)

valid_dataset = ForecastDFDataset(
    valid_data,
    id_columns=["store_menu_id"],
    timestamp_column="date",
    target_columns=["sales_log"],
    control_columns=["is_holiday", "is_weekend", "is_ski_season"],
    context_length=context_length,
    prediction_length=forecast_horizon,
)

print("데이터셋 변환 완료 ✅")
print("train_dataset 길이:", len(train_dataset))
print("valid_dataset 길이:", len(valid_dataset))

## 모델 및 학습 설정

In [None]:
# ==============================================
# 3. PatchTST 모델 및 학습 설정 (Hugging Face 코드)
# ==============================================
config = PatchTSTConfig(
    # --- 데이터 관련 설정 ---
    num_input_channels=4, # sales + 3 known covariates
    context_length=context_length,
    prediction_length=forecast_horizon,
    # 💡 시간에 따라 변하는 외부 변수의 개수
    num_time_varying_known_reals=3, # is_holiday, is_weekend, is_ski_season

    # --- Entity Embedding 관련 설정 ---
    # 💡 고유 ID를 embedding 하기 위한 설정
    num_static_categorical_features=1, # store_menu_id 1개
    cardinality=[num_entities],      # store_menu_id의 고유값 개수
    embedding_dimension=[32],        # store_menu_id를 32차원으로 임베딩

    # --- 모델 구조 설정 ---
    patch_length=8,
    patch_stride=8,
    d_model=128,
    num_attention_heads=16,
    num_hidden_layers=3,
    ffn_dim=256,
    dropout=0.2,
    head_dropout=0.2,
    scaling="std",
    loss="mse",
)

#model = PatchTSTForPrediction(config)
# 기존 구성
base_model = PatchTSTForPrediction(config)

# 래핑
model = PatchTSTSalesOnly(base_model, target_ch=0)

training_args = TrainingArguments(
    output_dir="./patchtst_sales_forecast",
    overwrite_output_dir=True,
    num_train_epochs=50, # 예시로 에폭 수 줄임
    do_eval=True,
    eval_strategy="epoch",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    save_strategy="epoch",
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    label_names=["future_values"],
    dataloader_pin_memory=False,
    use_mps_device=True,
)

# 그대로 Hugging Face Trainer 사용
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
)

### Optuna

In [None]:
import optuna
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback
from transformers.models.patchtst import PatchTSTConfig, PatchTSTForPrediction
import os, optuna

STUDY_NAME = "patchtst_sales_forecast"  # 원하는 이름 (기존과 동일해야 이어짐)
STORAGE = f"sqlite:///{os.path.abspath('./patchtst_sales_forecast/optuna.sqlite3')}"

# ---- 1) trial=None 안전한 helper ----
def s_cat(trial, name, choices, default):
    return trial.suggest_categorical(name, choices) if trial else default

def s_int(trial, name, low, high, default):
    return trial.suggest_int(name, low, high) if trial else default

def s_float(trial, name, low, high, default, log=False):
    return trial.suggest_float(name, low, high, log=log) if trial else default


# --- 1) 모델 생성 함수: trial로부터 아키텍처/하이퍼파라미터를 받아서 모델 구성 ---
def model_init(trial):
    # [디버깅] 이 함수가 호출될 때마다 실제 사용되는 값을 출력합니다.
    print(f"--- Optuna Trial: Creating model with context={context_length}, horizon={forecast_horizon} ---")
    # ⬇︎ 아키텍처 탐색 공간 (필요한 것만 남기고/늘려도 됨)
    d_model  = s_cat(trial, "d_model", [64, 128, 256], 128)
    # d_model로 나누어떨어지는 head만 허용
    heads_cand = [h for h in [4, 8, 16] if d_model % h == 0]
    num_heads = s_cat(trial, "num_attention_heads", heads_cand, heads_cand[0])
    num_layers = s_int(trial, "num_hidden_layers", 2, 4, 3)
    ffn_dim   = s_cat(trial, "ffn_dim", [128, 256, 512], 256)
    dropout   = s_float(trial, "dropout", 0.0, 0.3, 0.2)
    head_do   = s_float(trial, "head_dropout", 0.0, 0.3, 0.2)
    patch_choices = [1, 7]
    patch_len = s_cat(trial, "patch_length", patch_choices, 7)
    patch_str = patch_len  # stride=length 고정

    cfg = PatchTSTConfig(
        # --- 고정 (네 파이프라인) ---
        num_input_channels=4,
        context_length=context_length,
        prediction_length=forecast_horizon,
        num_time_varying_known_reals=3,
        num_static_categorical_features=1,
        cardinality=[num_entities],
        embedding_dimension=[32],
        scaling="std",
        loss="mse",
        # --- 탐색 대상 ---
        d_model=d_model,
        num_attention_heads=num_heads,
        num_hidden_layers=num_layers,
        ffn_dim=ffn_dim,
        dropout=dropout,
        head_dropout=head_do,
        patch_length=patch_len,
        patch_stride=patch_str,
    )
    import math
    def _eff(L,p,s): return p * math.ceil(L / s)
    def assert_no_padding(cfg):
        ec = _eff(cfg.context_length, cfg.patch_length, cfg.patch_stride)
        ep = _eff(cfg.prediction_length, cfg.patch_length, cfg.patch_stride)
        if (ec, ep) != (cfg.context_length, cfg.prediction_length):
            raise ValueError(f"padding: ctx {cfg.context_length}->{ec}, pred {cfg.prediction_length}->{ep} "
                            f"(p={cfg.patch_length}, s={cfg.patch_stride})")
    # model_init 내부에서 cfg 만든 직후 호출
    assert_no_padding(cfg)

    base = PatchTSTForPrediction(cfg)
    # sales 채널만 loss/예측하도록 만든 래퍼
    return PatchTSTSalesOnly(base, target_ch=0)

# --- 2) 학습 세팅 쪽 탐색 공간 (TrainingArguments) ---
def hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 5e-4, log=True),
        "weight_decay":  trial.suggest_float("weight_decay", 0.0, 0.1),
        "warmup_ratio":  trial.suggest_float("warmup_ratio", 0.0, 0.2),
        "lr_scheduler_type": trial.suggest_categorical(
            "lr_scheduler_type", ["linear", "cosine", "cosine_with_restarts", "polynomial"]
        ),
        # 필요시 배치/에폭도 탐색
        "per_device_train_batch_size": trial.suggest_categorical("per_device_train_batch_size", [32, 64, 96]),
        "per_device_eval_batch_size":  trial.suggest_categorical("per_device_eval_batch_size",  [32, 64, 96]),
        "num_train_epochs": trial.suggest_int("num_train_epochs", 10, 40),
    }

# --- 3) 목표 메트릭 (작을수록 좋게) ---
def compute_objective(metrics):
    # eval_loss만 최소화
    return metrics["eval_loss"]

# --- 4) HPO용 트레이너: model이 아니라 model_init를 넘겨야 함! ---
trainer_hpo = Trainer(
    model_init=model_init,
    args=training_args,                # 네 기존 args (eval_strategy="epoch" 등 포함)
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2, early_stopping_threshold=0.0)],
)

# --- 5) 탐색 실행 ---
best_run = trainer_hpo.hyperparameter_search(
    direction="minimize",
    backend="optuna",
    n_trials=1,                # 리소스에 맞게 늘리기/줄이기
    hp_space=hp_space,
    study_name=STUDY_NAME,
    storage=STORAGE,
    load_if_exists=True,
    compute_objective=compute_objective,
)
print("BEST:", best_run)
print("BEST params:", best_run.hyperparameters)


### Cross_validation
- Expanding Window

### Fold 개수에 따른 장단점

1. Fold 개수를 늘리면 (예: 5~10개)

- 장점 ✅: 더 많은 기간에 걸쳐 모델을 테스트하고 평균을 내므로, 평가 결과의 신뢰도가 높아지고 더 안정적인 점수를 얻을 수 있습니다. 특정 기간에 운 좋게 점수가 잘 나오는 상황을 방지할 수 있습니다.

- 단점 ❌: 모델 전체를 5번, 10번 학습시켜야 하므로 검증에 걸리는 시간이 크게 늘어납니다.

2. Fold 개수를 줄이면 (예: 2~3개)

- 장점 ✅: 전체 학습을 2~3번만 실행하므로 검증이 매우 빠릅니다.

- 단점 ❌: 적은 수의 기간만으로 성능을 판단하므로, 검증 기간에 특이한 패턴이 있었다면 평가 결과가 불안정할 수 있습니다. (예: 3번의 모의고사 중 1번만 유독 쉽게 나와 평균 점수가 부풀려지는 현상)

In [None]:
from transformers import Trainer, TrainingArguments, PatchTSTConfig, PatchTSTForPrediction
import numpy as np
import pandas as pd

# ===================================================================
# 0. 사전 준비: Optuna 실행 후 best_run에서 파라미터 가져오기
# ===================================================================
best_params = best_run.hyperparameters
print("✅ Optuna로 찾은 최적 하이퍼파라미터를 사용합니다.")
print(best_params)


# ===================================================================
# 1. 최적 파라미터로 Config 및 TrainingArguments 객체 생성
# 이 객체들은 모든 Fold에서 동일하게 사용됩니다.
# ===================================================================

# --- 1-1. 최적 모델 구조로 PatchTSTConfig 생성 ---
patch_length = best_params["patch_length"]
config_best = PatchTSTConfig(
    # --- 고정 파라미터 ---
    num_input_channels=4,
    context_length=context_length,
    prediction_length=forecast_horizon,
    num_time_varying_known_reals=3,
    num_static_categorical_features=1,
    cardinality=[num_entities],
    embedding_dimension=[32],
    scaling="std",
    loss="mse",
    # --- HPO로 찾은 최적 파라미터 적용 ---
    d_model=best_params["d_model"],
    num_attention_heads=best_params["num_attention_heads"],
    num_hidden_layers=best_params["num_hidden_layers"],
    ffn_dim=best_params["ffn_dim"],
    dropout=best_params["dropout"],
    head_dropout=best_params["head_dropout"],
    patch_length=patch_length,
    patch_stride=patch_length, # stride=length 가정
)

# --- 1-2. 최적 학습 설정으로 TrainingArguments 생성 ---
# 기존 training_args를 복사하여 HPO 결과로 업데이트
args_dict = training_args.to_dict()
for k, v in best_params.items():
    if k in args_dict:
        args_dict[k] = v

# TrainingArguments 객체를 한 번만 생성
# (단, output_dir은 루프 안에서 fold별로 덮어쓸 예정)
best_args = TrainingArguments(**args_dict)


# ===================================================================
# 2. 시계열 교차검증(TSCV) 루프 실행
# ===================================================================
N_SPLITS = 3  # How many folds to run
VALIDATION_DAYS = 14  # Use 14 days for each validation set
full_data_df = df # Use the preprocessed DataFrame from your previous step

all_eval_losses = []
end_of_all_data = full_data_df['date'].max()

# The main loop for cross-validation
for i in range(N_SPLITS):
    print(f" BOLD-START BOLD-END --- Starting Fold {i+1}/{N_SPLITS} --- BOLD-START BOLD-END ")

    # --- 1. Calculate split dates for the current fold ---
    # We work backwards from the end of the dataset to define our validation splits
    validation_end_date = end_of_all_data - pd.Timedelta(days=i * VALIDATION_DAYS)
    validation_start_date = validation_end_date - pd.Timedelta(days=VALIDATION_DAYS)
    
    # Training data is everything before the current validation period starts
    train_data_fold = full_data_df[full_data_df['date'] < validation_start_date]
    valid_data_fold = full_data_df[
        (full_data_df['date'] >= validation_start_date) & 
        (full_data_df['date'] < validation_end_date)
    ]

    print(f"Train period: {train_data_fold['date'].min().date()} to {train_data_fold['date'].max().date()}")
    print(f"Valid period: {valid_data_fold['date'].min().date()} to {valid_data_fold['date'].max().date()}\n")

    # --- 2. Create datasets for this fold ---
    # (This is the same logic as your original code, just with the fold's data)
    train_dataset_fold = ForecastDFDataset(
        train_data_fold,
        id_columns=["store_menu_id"],
        timestamp_column="date",
        target_columns=["sales_log"],
        control_columns=["is_holiday", "is_weekend", "is_ski_season"],
        context_length=context_length,
        prediction_length=forecast_horizon,
    )

    valid_dataset_fold = ForecastDFDataset(
        valid_data_fold,
        id_columns=["store_menu_id"],
        timestamp_column="date",
        target_columns=["sales_log"],
        control_columns=["is_holiday", "is_weekend", "is_ski_season"],
        context_length=context_length,
        prediction_length=forecast_horizon,
    )

    # --- 2-3. 모델과 Trainer 재생성 ---
    # ★★★ 항상 위에서 정의한 최적의 config_best를 사용합니다 ★★★
    base_model_fold = PatchTSTForPrediction(config_best)
    model_fold = PatchTSTSalesOnly(base_model_fold, target_ch=0)

    # ★★★ 위에서 정의한 best_args를 사용하되, 결과 저장 경로는 Fold별로 지정 ★★★
    training_args_fold = best_args.to_dict()
    training_args_fold['output_dir'] = f"./patchtst_sales_final_eval_fold_{i+1}"
    training_args_fold = TrainingArguments(**training_args_fold)

    trainer_fold = Trainer(
        model=model_fold,
        args=training_args_fold,
        train_dataset=train_dataset_fold,
        eval_dataset=valid_dataset_fold,
    )

    # --- 4. Train and evaluate the model for this fold ---
    trainer_fold.train()

    # --- 5. Store the best evaluation metric from this fold ---
    best_loss = trainer_fold.state.best_metric
    print(f"\n BOLD-START ✅ Fold {i+1} Best Validation Loss: {best_loss:.4f} BOLD-END \n")
    all_eval_losses.append(best_loss)


# ==============================================
# 2. Final Cross-Validation Results
# ==============================================
mean_loss = np.mean(all_eval_losses)
std_loss = np.std(all_eval_losses)

print(" BOLD-START --- Time Series Cross-Validation Final Report --- BOLD-END ")
print(f"Validation losses across all folds: {[f'{loss:.4f}' for loss in all_eval_losses]}")
print(f" BOLD-START Average Validation Loss: {mean_loss:.4f} BOLD-END ")
print(f"Standard Deviation of Validation Loss: {std_loss:.4f}")

### 최종 학습

In [None]:
best = best_run.hyperparameters

# TrainingArguments 반영

args_dict = training_args.to_dict()
for k, v in best.items():
    if k in args_dict:
        args_dict[k] = v
best_args = TrainingArguments(**args_dict)

def model_init_best():
    # best 값으로 동일하게 구성
    trial_like = None
    # 그냥 model_init(None) 쓰면 기본값이 들어가므로,
    # 아래처럼 직접 config를 만드는 게 안전. (간단히는 best를 model_init에서 읽도록 바꿔도 OK)
    patch_length = best["patch_length"]
    cfg_best = PatchTSTConfig(
        num_input_channels=4,
        context_length=context_length,
        prediction_length=forecast_horizon,
        num_time_varying_known_reals=3,
        num_static_categorical_features=1,
        cardinality=[num_entities],
        embedding_dimension=[32],
        d_model=best["d_model"],
        num_attention_heads=best["num_attention_heads"],
        num_hidden_layers=best["num_hidden_layers"],
        ffn_dim=best["ffn_dim"],
        dropout=best["dropout"],
        head_dropout=best["head_dropout"],
        patch_length=patch_length,
        patch_stride=patch_length,
        scaling="std",
        loss="mse",
    )

    # 안전가드
    import math
    def _eff(L,p,s): return p * math.ceil(L/s)
    assert _eff(cfg_best.context_length, cfg_best.patch_length, cfg_best.patch_stride) == cfg_best.context_length
    assert _eff(cfg_best.prediction_length, cfg_best.patch_length, cfg_best.patch_stride) == cfg_best.prediction_length

    base = PatchTSTForPrediction(cfg_best)
    return PatchTSTSalesOnly(base, target_ch=0)

final_trainer = Trainer(
    model_init=model_init_best,
    args=best_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
)
final_trainer.train()

# 훈련 직후
SAVE_DIR = "./patchtst_sales_forecast_best/base"   # 새 폴더
final_trainer.model.base.save_pretrained(SAVE_DIR) # ★ config.json까지 생성됨
print("saved to:", SAVE_DIR)

## Predict

In [None]:
import numpy as np
import torch

def _to_numpy(x):
    return x.detach().cpu().numpy() if isinstance(x, torch.Tensor) else x

def _pick_pred_array(preds, horizon=forecast_horizon, target_ch=0):
    """
    pred_output.predictions가 tuple/list/dict/object ndarray인 다양한 경우를 모두 커버해서
    (N, horizon) 형태의 sales 채널만 꺼내 반환.
    """
    # dict-like
    if isinstance(preds, dict):
        for k in ["predictions", "logits", "prediction_outputs", "y_hat", "forecast"]:
            if k in preds:
                arr = _to_numpy(preds[k])
                if isinstance(arr, np.ndarray):
                    return arr

    # tuple/list
    if isinstance(preds, (list, tuple)):
        for x in preds:
            arr = _to_numpy(x)
            if isinstance(arr, np.ndarray) and arr.ndim >= 2:
                return arr

    # numpy object 배열 (ragged)
    if isinstance(preds, np.ndarray) and preds.dtype == object:
        for x in preds.tolist():
            arr = _to_numpy(x)
            if isinstance(arr, np.ndarray) and arr.ndim >= 2:
                return arr

    # 이미 ndarray인 경우
    if isinstance(preds, np.ndarray):
        return preds

    # 마지막 수단
    arr = np.asarray(preds, dtype=object)
    raise ValueError(f"예측 배열을 추출하지 못함: type={type(preds)}, dtype={getattr(arr,'dtype',None)}")

'''
🏷️ 매출 예측이라면?
소수점 0.5 기준 반올림 (np.rint) 이 가장 많이 씁니다.
다만 0.07, 0.08 같은 작은 값들이 실제로는 “0건 매출”인 경우가 많기 때문에, 
임계값(threshold) 규칙을 추가하면 더 좋아요.
'''

def round_with_threshold(x, threshold=0.3):
    if x < threshold:
        return 0
    return int(np.rint(x))

#df_result["y_pred_int"] = df_result["y_pred"].apply(round_with_threshold)

In [None]:
import pandas as pd
import numpy as np
import torch
from transformers import Trainer, AutoConfig
from transformers.models.patchtst.modeling_patchtst import PatchTSTForPrediction


# 1. 학습 시 TrainingArguments의 output_dir에 저장된 'best' 모델 경로를 지정합니다.
#    보통 output_dir 내부에 'checkpoint-...' 형태의 폴더로 저장됩니다.
MODEL_PATH = "./patchtst_sales_forecast_best/base"

# 2. 저장된 경로에서 config.json을 명시적으로 먼저 불러옵니다.
print(f"Loading configuration from: {MODEL_PATH}")
config = AutoConfig.from_pretrained(MODEL_PATH)

# 3. 위에서 불러온 config 객체를 사용하여 모델을 생성합니다.
# 이렇게 하면 context_length 등이 올바르게 설정됩니다.
print("Loading model with specified configuration...")
model = PatchTSTForPrediction.from_pretrained(MODEL_PATH, config=config)

# 3. 예측 전용 Trainer를 생성합니다.
trainer = Trainer(model=model)

print(f"✅ 모델 로드 완료: {MODEL_PATH}")

# 2) 모델이 기대하는 길이 읽기
CTX = getattr(model.config, "context_length", None) or getattr(model.config, "sequence_length", None)
H   = getattr(model.config, "prediction_length", None)
print(f"model expects context_length={CTX}, prediction_length={H}, num_input_channels={model.config.num_input_channels}")

In [None]:
import os

path = "./dataset/test"
files = os.listdir(path)
rows = []

for file in files:

    test_df = pd.read_csv(os.path.join(path, file))
    test_df.columns = ["date", "store_menu", "sales"]
    test_df["date"] = pd.to_datetime(test_df["date"])

    test_df.loc[test_df['sales'] < 0, 'sales'] = 0
    test_df["sales"] = test_df["sales"].astype(float)
    test_df["sales_log"] = np.log1p(test_df["sales"])     # target은 이제 sales_log

    # 기존 인코더 사용 (encoder는 train 단계에서 fit된 걸 그대로 써야 consistency 보장)
    test_df["store_menu_id"] = encoder.transform(test_df["store_menu"])

    # 동일한 feature 생성
    kr_holidays = holidays.KR(years=test_df['date'].dt.year.unique())
    test_df["is_holiday"] = test_df["date"].isin(kr_holidays).astype(int)
    test_df["is_weekend"] = test_df["date"].dt.day_of_week.isin([5, 6]).astype(int)
    test_df["is_ski_season"] = test_df["date"].dt.month.isin([12, 1, 2]).astype(int)

    print(f"{file}테스트 데이터 전처리 완료")

    # ==============================================
    # 2. ForecastDFDataset 변환
    # ==============================================
    test_dataset = ForecastDFDataset(
        test_df,
        id_columns=["store_menu_id"],
        timestamp_column="date",
        target_columns=["sales_log"],
        control_columns=["is_holiday", "is_weekend", "is_ski_season"],
        context_length=context_length,
        prediction_length=forecast_horizon,
    )

    print("테스트 데이터셋 길이:", len(test_dataset))

    # ==============================================
    # 3. 예측 실행 (견고 추출 버전)
    # ==============================================
    pred_output = trainer.predict(test_dataset)
    preds_raw = pred_output.predictions  # 컨테이너일 수 있음


    arr = _pick_pred_array(preds_raw, horizon=forecast_horizon, target_ch=0)

    # ---- (N, 7)로 정규화 ----
    if arr.ndim == 3:
        # 흔한 케이스 1: (N, horizon, C)
        if arr.shape[-2] == forecast_horizon:
            arr = arr[..., 0]                 # sales 채널만
        # 흔한 케이스 2: (N, C, horizon)
        elif arr.shape[-1] == forecast_horizon:
            arr = arr[:, 0, :]                # sales 채널만
        # 백업: 두 번째 축이 horizon이면 3번째 축을 잘라본다
        elif arr.shape[1] == forecast_horizon:
            arr = arr[:, :, 0]
        else:
            raise ValueError(f"예상 밖 3D shape: {arr.shape}")
    elif arr.ndim == 2:
        # (horizon, N) 이면 전치
        if arr.shape[0] == forecast_horizon and arr.shape[1] != forecast_horizon:
            arr = arr.T

    # 이제 (N, 7)이어야 정상
    assert arr.ndim == 2 and arr.shape[1] == forecast_horizon, f"정규화 실패: {arr.shape}"

    # 로그 역변환 + 음수 방지
    y_pred_log = arr
    y_pred_sales = np.expm1(y_pred_log)
    y_pred_sales = np.clip(y_pred_sales, 0, None)

    # 7일 미래 날짜
    last_date = test_df["date"].max()
    future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1),
                                periods=forecast_horizon)

    # 매장명 순서 고정 (id 정렬)
    keys_df = (
        test_df.sort_values(["store_menu_id", "date"])
            .drop_duplicates("store_menu_id")[["store_menu_id", "store_menu"]]
    )
    store_names = keys_df["store_menu"].to_numpy()

    # N 검증
    assert y_pred_sales.shape[0] == len(store_names), (
        f"N 불일치: preds={y_pred_sales.shape[0]} vs stores={len(store_names)}"
    )

    # 매장×7일 테이블
    file_name = file.split(".c")[0]
    for store_name, pred_row in zip(store_names, y_pred_sales):   # pred_row: (7,)
        for day_num, (d, yhat) in enumerate(zip(future_dates, pred_row), start=1):
            date_str = f"{file_name}+{day_num}일"
            rows.append({"date": date_str, "store_menu": store_name, "y_pred": float(yhat)})

df_result = pd.DataFrame(rows).pivot(index="date", columns="store_menu", values="y_pred")
#df_result["y_pred_int"] = df_result["y_pred"].apply(round_with_threshold)  # 결과 정수화를 원할 경우 사용
df_result.index.name = "영업일자"
df_result.to_csv("test_predictions.csv", index=True, encoding="utf-8-sig")

print("예측 테이블 shape:", df_result.shape)  # (7, 매장수)
print(df_result.head(7))
