In [23]:
import os
import random
import glob
import re
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from tqdm import tqdm
from pandas.tseries.holiday import AbstractHolidayCalendar, Holiday
import holidays

In [24]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(42)

In [25]:
LOOKBACK, PREDICT, BATCH_SIZE, EPOCHS = 28, 7, 16, 50
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
train_df = pd.read_csv('./train/train.csv')

# 매출수량 음수 제거
train_df = train_df[train_df['매출수량'] >= 0].reset_index(drop=True)

# 영업일자(object) datetime 형식으로 변환
train_df['영업일자'] = pd.to_datetime(train_df['영업일자'], format='%Y-%m-%d')

# 영업장명 / 메뉴명 분리
train_df['영업장명'] = train_df['영업장명_메뉴명'].str.split('_').str[0]
train_df['메뉴명'] = train_df['영업장명_메뉴명'].str.split('_').str[1]

# 요일, 월, 주차, 분기
train_df['요일'] = train_df['영업일자'].dt.dayofweek       # 월=0 / 화=1 / 수=2 / 목=3 / 금=4 / 토=5 / 일=6
train_df['월'] = train_df['영업일자'].dt.month
train_df['주차'] = train_df['영업일자'].dt.isocalendar().week.astype(int)
train_df['분기'] = train_df['영업일자'].dt.quarter

# 공휴일 여부 (is_holiday)
kr_holidays = holidays.KR(years=[2023, 2024])
train_df['is_holiday'] = train_df['영업일자'].isin(kr_holidays).astype(int)

# 주말 여부 (is_weekend)
train_df['is_weekend'] = train_df['요일'].isin([5,6]).astype(int)     # 토=5 / 일=6

# 이전 매출 수 (lag_1,2,7,14)
for lag in [1,2,7]:
    train_df[f'lag_{lag}'] = train_df.groupby('영업장명_메뉴명')['매출수량'].shift(lag)

# (rolling_mean/std_3,7,14)
for win in [3,7]:
    train_df[f'rolling_mean_{win}'] = train_df.groupby('영업장명_메뉴명')['매출수량'].shift(1).rolling(window=win).mean()
    train_df[f'rolling_std_{win}'] = train_df.groupby('영업장명_메뉴명')['매출수량'].shift(1).rolling(window=win).std()

# 매출이 마지막으로 있었던 날과의 거리 (day_diff)
train_df['day_diff'] = train_df.groupby('영업장명_메뉴명')['영업일자'].diff().dt.days

# 월초 / 월말 (is_month_start / is_month_end)
train_df['is_month_start'] = train_df['영업일자'].dt.is_month_start.astype(int)
train_df['is_month_end'] = train_df['영업일자'].dt.is_month_end.astype(int)


  test_df['is_holiday'] = test_df['영업일자'].isin(kr_holidays).astype(int)


<pandas.core.groupby.generic.DataFrameGroupBy object at 0x0000018D69C507D0>

In [27]:
class MultiOutputLSTM(nn.Module):
    def __init__(self, input_dim=1, hidden_dim=64, num_layers=2, output_dim=7):
        super(MultiOutputLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :])  # (B, output_dim)

In [28]:
def train_lstm(train_df):
    trained_models = {}     # 각 영업장명_메뉴명 별로 학습된 모델을 저장하는 딕셔너리

    # 메뉴별 그룹화 및 필터링
    for store_menu, group in tqdm(train_df.groupby(['영업장명_메뉴명']), desc ='Training LSTM'):
        store_train = group.sort_values('영업일자').copy()
        if len(store_train) < LOOKBACK + PREDICT:
            continue

        # 사용할 피처 정의
        features = ['매출수량', '요일', '월', '주차', '분기',
                    'is_holiday', 'is_weekend', 'lag_1', 'lag_2', 'lag_7',
                    'rolling_mean_3', 'rolling_std_3', 'rolling_mean_7', 'rolling_std_7',
                    'day_diff', 'is_month_start',
                    'is_month_end']
        
        # 정규화: 각 피처가 [0,1] 범위에 오도록 함
        # LSTM은 정규화 매우 중요
        scaler = MinMaxScaler()
        store_train[features] = scaler.fit_transform(store_train[features])

        # 시계열 데이터 -> LSTM 학습용 시퀀스로 변환
        train_vals = store_train[features].values  # shape: (N, 1)
        # 시퀀스 구성
        X_train, y_train = [], []
        for i in range(len(train_vals) - LOOKBACK - PREDICT + 1):
            X_train.append(train_vals[i:i+LOOKBACK, :])
            y_train.append(train_vals[i+LOOKBACK:i+LOOKBACK+PREDICT, 0])

        # 시퀀스 -> 텐서로 변환
        X_train = torch.tensor(X_train).float().to(DEVICE)
        y_train = torch.tensor(y_train).float().to(DEVICE)

        # LSTM 모델 구성 및 학습
        model = MultiOutputLSTM(input_dim=len(features), output_dim=PREDICT).to(DEVICE)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.MSELoss()
        model.train()

        # Epoch 반복 학습
        for epoch in range(EPOCHS):
            idx = torch.randperm(len(X_train))
            for i in range(0, len(X_train), BATCH_SIZE):
                batch_idx = idx[i:i+BATCH_SIZE]
                X_batch, y_batch = X_train[batch_idx], y_train[batch_idx]
                output = model(X_batch)
                loss = criterion(output, y_batch)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # 학습된 모델 저장
        trained_models[store_menu] = {
            'model': model.eval(),
            'scaler': scaler,
            'last_sequence': train_vals[-LOOKBACK:],  # (28, 1)
            'features': features
        }

    return trained_models

In [None]:
# 학습
trained_models = train_lstm(train_df)

In [None]:
def predict_lstm(test_df: pd.DataFrame, trained_models: dict, test_prefix: str):
    results = []

    # 그룹별 순회 및 학습 모델 존재 여부 확인
    for store_menu, store_test in test_df.groupby(['영업장명_메뉴명']):
        key = store_menu
        if key not in trained_models:
            continue

        # 모델, 스케일러, 피처 불러오기
        model = trained_models[key]['model']
        scaler = trained_models[key]['scaler']
        features = trained_models[key]['features']

        # 테스트셋 정렬 및 최근 LOOKBACK만큼 추출
        store_test_sorted = store_test.sort_values('영업일자')
        recent_vals = store_test_sorted[features].values[-LOOKBACK:]
        if len(recent_vals) < LOOKBACK:
            continue

        # 정규화
        recent_vals = scaler.transform(recent_vals)
        x_input = torch.tensor([recent_vals]).float().to(DEVICE)

        # 예측 수행
        with torch.no_grad():
            pred_scaled = model(x_input).squeeze().cpu().numpy()

        # 예측값 역변환
        restored = []
        for i in range(PREDICT):
            dummy = np.zeros((1, len(features)))
            dummy[0, 0] = pred_scaled[i]
            restored_val = scaler.inverse_transform(dummy)[0, 0]
            restored.append(max(restored_val, 0))

        # 예측 결과 저장 = 예측일자: TEST_00+1일 ~ TEST_00+7일
        pred_dates = [f"{test_prefix}+{i+1}일" for i in range(PREDICT)]
        for d, val in zip(pred_dates, restored):
            results.append({
                '영업일자': d,
                '영업장명_메뉴명': store_menu,
                '매출수량': val
            })

    return pd.DataFrame(results)

In [None]:
def preprocess_test(test_df):
    # 1. datetime 변환
    test_df['영업일자'] = pd.to_datetime(test_df['영업일자'])

    # 2. 파생 컬럼 생성
    test_df['요일'] = test_df['영업일자'].dt.dayofweek
    test_df['월'] = test_df['영업일자'].dt.month
    test_df['주차'] = test_df['영업일자'].dt.isocalendar().week.astype(int)
    test_df['분기'] = test_df['영업일자'].dt.quarter
    test_df['is_weekend'] = test_df['요일'].isin([5, 6]).astype(int)
    test_df['is_month_start'] = test_df['영업일자'].dt.is_month_start.astype(int)
    test_df['is_month_end'] = test_df['영업일자'].dt.is_month_end.astype(int)
    kr_holidays = holidays.KR(years=[2023, 2024])
    test_df['is_holiday'] = test_df['영업일자'].isin(kr_holidays).astype(int)

    # 이전 매출 수 (lag_1,2,7)
    for lag in [1,2,7]:
        test_df[f'lag_{lag}'] = test_df.groupby('영업장명_메뉴명')['매출수량'].shift(lag)

    # (rolling_mean/std_3,7)
    for win in [3,7]:
        test_df[f'rolling_mean_{win}'] = test_df.groupby('영업장명_메뉴명')['매출수량'].shift(1).rolling(window=win).mean()
        test_df[f'rolling_std_{win}'] = test_df.groupby('영업장명_메뉴명')['매출수량'].shift(1).rolling(window=win).std()
    
    # 6. day difference
    test_df['day_diff'] = test_df.groupby('영업장명_메뉴명')['영업일자'].diff().dt.days.fillna(0)

    # 7. 테스트셋만 추출
    test_processed = test_df
    
    return test_processed


In [None]:
all_preds = []

# 모든 test_*.csv 순회
test_files = sorted(glob.glob('./test/TEST_*.csv'))

for path in test_files:
    test_df = pd.read_csv(path)

    # 파일명에서 접두어 추출 (예: TEST_00)
    filename = os.path.basename(path)   # ex) TEST_00.csv
    test_prefix = re.search(r'(TEST_\d+)', filename).group(1)   # TEST_00

    # test 전처리
    test_df = preprocess_test(test_df)

    # 예측
    pred_df = predict_lstm(test_df, trained_models, test_prefix)

    all_preds.append(pred_df)
    
full_pred_df = pd.concat(all_preds, ignore_index=True)

In [None]:
def convert_to_submission_format(pred_df: pd.DataFrame, sample_submission: pd.DataFrame):
    # (영업일자, 메뉴) → 매출수량 딕셔너리로 변환
    pred_dict = dict(zip(
        zip(pred_df['영업일자'], pred_df['영업장명_메뉴명']),
        pred_df['매출수량']
    ))

    final_df = sample_submission.copy()

    for row_idx in final_df.index:
        date = final_df.loc[row_idx, '영업일자']
        for col in final_df.columns[1:]:  # 메뉴명들
            final_df.loc[row_idx, col] = pred_dict.get((date, (col,)),0)

    return final_df

In [None]:
sample_submission = pd.read_csv('./sample_submission.csv')
submission = convert_to_submission_format(full_pred_df, sample_submission)
submission.to_csv('baseline_submission.csv', index=False, encoding='utf-8-sig')