# Import

In [None]:
import os
import random
import glob
import re
import holidays

import pandas as pd
import numpy as np

from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
from tqdm import tqdm


# Fixed RandomSeed & Setting Hyperparameter

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)

    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(42)

In [None]:
LOOKBACK, PREDICT, BATCH_SIZE, EPOCHS = 28, 7, 16, 30
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

#Feature Engineering

In [None]:
def add_date_features(df):
    df = df.copy()
    df['영업일자'] = pd.to_datetime(df['영업일자'])

    # =================================
    # 1. 기본 날짜 Features
    # =================================

    # 공휴일 정보 생성
    kr_holidays = holidays.KR()
    df['is_holiday'] = df['영업일자'].apply(lambda x: 1 if x in kr_holidays else 0)

    # 기본 날짜 정보
    df['month'] = df['영업일자'].dt.month
    df['weekday'] = df['영업일자'].dt.weekday
    df['weekday_name'] = df['영업일자'].dt.day_name().map({
        'Monday': '월', 'Tuesday': '화', 'Wednesday': '수',
        'Thursday': '목', 'Friday': '금', 'Saturday': '토', 'Sunday': '일'
    })
    df['is_weekend'] = df['weekday'].apply(lambda x: 1 if x >= 5 else 0)
    df['weekofyear'] = df['영업일자'].dt.isocalendar().week.astype(int)
    df['dayofmonth'] = df['영업일자'].dt.day
    df['quarter'] = df['영업일자'].dt.quarter

    # =================================
    # 2. 수정된 공휴일 클러스터 분석
    # ================================='

    # 공휴일 전날/다음날 계산
    df = df.sort_values('영업일자').reset_index(drop=True)
    df['holiday_prev'] = df['is_holiday'].shift(1).fillna(0).astype(int)  # 전날이 공휴일
    df['holiday_next'] = df['is_holiday'].shift(-1).fillna(0).astype(int)  # 다음날이 공휴일

    # 공휴일 전날 (내일이 공휴일)
    df['is_holiday_eve'] = df['holiday_next']
    # 공휴일 다음날 (어제가 공휴일)
    df['is_holiday_after'] = df['holiday_prev']

    # 연휴 길이 계산 (분석 결과 반영)
    df['holiday_cluster_length'] = (
        df['is_holiday_eve'] + df['is_holiday'] + df['is_holiday_after']
    )

    # 공휴일 클러스터 점수 (분석 결과: 28.12/10.25 = 2.74배)
    def get_holiday_cluster_score(row):
        if row['holiday_cluster_length'] == 3:  # 3연휴: 2.74배
            return 2.74
        elif row['is_holiday'] and row['is_holiday_after']:  # 공휴일+다음날: 1.43배
            return 1.43
        elif row['is_holiday_after'] and not row['is_holiday']:  # 다음날만: 1.72배
            return 1.72
        elif row['is_holiday_eve'] and not row['is_holiday']:  # 전날만: 1.32배
            return 1.32
        elif row['is_holiday'] and not row['is_holiday_eve'] and not row['is_holiday_after']:  # 공휴일만: 1.23배
            return 1.23
        else:
            return 1.0  # 평상시

    df['holiday_effect_score'] = df.apply(get_holiday_cluster_score, axis=1)

    return df

# Data Load

In [None]:
from google.colab import drive
drive.mount('/content/drive')

train = pd.read_csv('/content/drive/MyDrive/open/train/train.csv')
train['매출수량'] = train['매출수량'].clip(lower=0) # 음수 값 0으로...
train = add_date_features(train)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Define Model

In [None]:
class MultiOutputLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim=96, num_layers=2, output_dim=7, dropout_rate=0.3):
        super(MultiOutputLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout_rate if num_layers > 1 else 0)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.dropout(out[:, -1, :])
        return self.fc(out)

# Train

In [None]:
def train_lstm(train_df):
    trained_models = {}

    for store_menu, group in tqdm(train_df.groupby(['영업장명_메뉴명']), desc ='Training LSTM'):
        store_train = group.sort_values('영업일자').copy()
        if len(store_train) < LOOKBACK + PREDICT:
            continue

        features = ['매출수량', 'month', 'is_holiday', 'is_weekend', 'weekofyear', 'dayofmonth',
                    'quarter', 'holiday_effect_score'] # 추가 features
        scaler = MinMaxScaler()
        store_train[features] = scaler.fit_transform(store_train[features])
        train_vals = store_train[features].values  # shape: (N, 1)

        # 시퀀스 구성
        X_train, y_train = [], []
        for i in range(len(train_vals) - LOOKBACK - PREDICT + 1):
            X_train.append(train_vals[i:i+LOOKBACK])
            y_train.append(train_vals[i+LOOKBACK:i+LOOKBACK+PREDICT, 0])

        X_train = torch.tensor(X_train).float().to(DEVICE)
        y_train = torch.tensor(y_train).float().to(DEVICE)

        model = MultiOutputLSTM(input_dim=len(features), output_dim=PREDICT).to(DEVICE)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.MSELoss()

        model.train()
        for epoch in range(EPOCHS):
            idx = torch.randperm(len(X_train))
            for i in range(0, len(X_train), BATCH_SIZE):
                batch_idx = idx[i:i+BATCH_SIZE]
                X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]
                output = model(X_batch)
                loss = criterion(output, y_batch)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        trained_models[store_menu] = {
            'model': model.eval(),
            'scaler': scaler,
            'last_sequence': train_vals[-LOOKBACK:], # (28, 1)
            'last_date': store_train['영업일자'].iloc[-1] # 수정
        }

    return trained_models

In [None]:
# 학습
trained_models = train_lstm(train)

  X_train = torch.tensor(X_train).float().to(DEVICE)
Training LSTM: 100%|██████████| 193/193 [08:24<00:00,  2.61s/it]


# Prediction

In [None]:
def predict_lstm(test_df, trained_models, test_prefix: str):
    results = []
    features = ['매출수량', 'month', 'is_holiday', 'is_weekend', 'weekofyear', 'dayofmonth',
                'quarter', 'holiday_effect_score'] # 추가 features

    for store_menu, store_test in test_df.groupby(['영업장명_메뉴명']):
        key = store_menu
        if key not in trained_models:
            continue

        model = trained_models[key]['model']
        scaler = trained_models[key]['scaler']
        last_sequence = trained_models[key]['last_sequence']  # (28, 6)
        last_date = trained_models[key]['last_date']

        # test_df에 날짜 특성 추가
        store_test = add_date_features(store_test)
        store_test_sorted = store_test.sort_values('영업일자')

        # 28일 데이터 추출 및 스케일링
        recent_data = store_test_sorted[features].values[-LOOKBACK:]  # (28, 6)
        if len(recent_data) < LOOKBACK:
            continue

        # 스케일링
        recent_data_scaled = scaler.transform(recent_data)
        x_input = torch.tensor([recent_data_scaled]).float().to(DEVICE)

        with torch.no_grad():
            pred_scaled = model(x_input).squeeze().cpu().numpy()

        # 역변환 (매출수량만)
        restored = []
        for i in range(PREDICT):
            dummy = np.zeros((1, len(features)))
            dummy[0, 0] = pred_scaled[i]  # 첫 번째 feature가 매출수량
            restored_val = scaler.inverse_transform(dummy)[0, 0]
            restored.append(max(restored_val, 0))

        # 예측일자: TEST_00+1일 ~ TEST_00+7일
        pred_dates = [f"{test_prefix}+{i+1}일" for i in range(PREDICT)]

        for d, val in zip(pred_dates, restored):
            results.append({
                '영업일자': d,
                '영업장명_메뉴명': store_menu,
                '매출수량': val
            })

    return pd.DataFrame(results)

In [None]:
all_preds = []

# 모든 test_*.csv 순회
test_files = sorted(glob.glob('/content/drive/MyDrive/open/test/TEST_*.csv'))

for path in test_files:
    test_df = pd.read_csv(path)
    test_df['매출수량'] = test_df['매출수량'].clip(lower=0) # 음수 값 0으로...
    test_df = add_date_features(test_df) # 수정

    # 파일명에서 접두어 추출 (예: TEST_00)
    filename = os.path.basename(path)
    test_prefix = re.search(r'(TEST_\d+)', filename).group(1)

    pred_df = predict_lstm(test_df, trained_models, test_prefix)
    all_preds.append(pred_df)

full_pred_df = pd.concat(all_preds, ignore_index=True)



# Submission

In [None]:
def convert_to_submission_format(pred_df: pd.DataFrame, sample_submission: pd.DataFrame):
    # (영업일자, 메뉴) → 매출수량 딕셔너리로 변환
    pred_dict = dict(zip(
        zip(pred_df['영업일자'], pred_df['영업장명_메뉴명']),
        pred_df['매출수량']
    ))

    final_df = sample_submission.copy()

    for row_idx in final_df.index:
        date = final_df.loc[row_idx, '영업일자']
        for col in final_df.columns[1:]:  # 메뉴명들
            final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정

    return final_df

In [None]:
sample_submission = pd.read_csv('/content/drive/MyDrive/open/sample_submission.csv')
submission = convert_to_submission_format(full_pred_df, sample_submission)
submission.to_csv('L_baseline_submission.csv', index=False, encoding='utf-8-sig')

  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_idx, col] = pred_dict.get((date, (col,)), 0) # get((date, col), 0) 수정
  final_df.loc[row_id