In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset

In [None]:
seed = 2026
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
df = pd.read_csv("./dataset/kospi.csv")
df

In [None]:
if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'], errors = 'coerce') # 날짜 형식으로 변환
    df = df.sort_values('Date').reset_index(drop=True) # 날짜 타입으로 변환

In [None]:
def set_seed(seed=2026):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

def make_seq(X, y, L):
    Xs, ys = [], []
    for i in range(len(X) - L): # L = 20일을 줄것임.
        Xs.append(X[i:i+L]) # 예측하는데에 필요한 기간
        ys.append(y[i+L]) # 이후 예측
    Xs = torch.tensor(np.array(Xs), dtype=torch.float32)
    ys = torch.tensor(np.array(ys), dtype=torch.float32).view(-1, 1)
    return Xs, ys

In [None]:
def fill_missing_after_split(train_df, test_df, cols): # train과 test로 나눈 후에 결측치를 채우는 함수
    train_df = train_df.copy()
    test_df = test_df.copy()
    train_df[cols] = train_df[cols].ffill() # 과거값으로 ffill
    bridge = pd.concat([train_df.tail(1), test_df], axis = 0) # train과 test의 경계값을 연결
    # train 마지막행 + test를 이어붙여 ffill후 test만 분리
    bridge[cols] = bridge[cols].ffill() # 연결된 데이터에서 ffill
    test_df[cols] = bridge.iloc[1:][cols].values # test_df에 채워진 값 적용
    return train_df, test_df

In [None]:
class EarlyStopping:
    def __init__(self, patience = 30, min_delta = 1e-6):
        self.patience = patience
        self.min_delta = min_delta
        self.count = 0
        self.best = None
        self.best_state = None
    def step(self, val_loss, model):
        if self.best is None or val_loss < self.best - self.min_delta:
            self.best = val_loss
            self.count = 0
            self.best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            return False
        self.count += 1
        return self.count >= self.patience

    def load_best(self, model, device):
        model.load_state_dict({k: v.to(device) for k, v in self.best_state.items()})

In [None]:
class RNNRegressor(nn.Module):
    def __init__(self, input_size, hidden_size = 64, num_layers = 2, dropout = 0.1):
        super().__init__()
        self.rnn = nn.RNN(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers
                          , batch_first = True, nonlinearity= 'tanh', dropout = dropout if num_layers > 1 else 0.0)
        self.head = nn.Sequential(nn.LayerNorm(hidden_size), nn.Linear(hidden_size, 64), nn.ReLU(),
                                  nn.Dropout(dropout), nn.Linear(64, 1))
    def forward(self, x):
        B = x.size(0)
        h0 = torch.zeros(self.rnn.num_layers, B, self.rnn.hidden_size, device = x.device)
        out, _ = self.rnn(x, h0) #out : (B, L, hidden_size)
        last = out[:, -1, :]
        return self.head(last)

In [None]:
set_seed(2026)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
csv_path = "./dataset/kospi.csv"
df = pd.read_csv(csv_path)
if 'Date' in df.columns:
    df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
    df = df.sort_values('Date').reset_index(drop=True)
 
FEATS = ['Open', 'High', 'Low', 'Close']
TARGET = ['Close']

In [None]:
test_ratio = 0.3
test_start = int(len(df) * (1 - test_ratio))
trainval_df = df.iloc[:test_start].copy()
test_df = df.iloc[test_start:].copy()

val_ratio = 0.2
val_start = int(len(trainval_df) * (1 - val_ratio))
train_df = trainval_df.iloc[:val_start].copy()
val_df = trainval_df.iloc[val_start:].copy()
train_df, val_df = fill_missing_after_split(train_df, val_df,  cols = FEATS)
trainval_df, test_df = fill_missing_after_split(trainval_df, test_df, cols = FEATS)

In [None]:
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()

X_train = scaler_x.fit_transform(train_df[FEATS].values)
X_val = scaler_x.transform(val_df[FEATS].values)
X_test = scaler_x.transform(test_df[FEATS].values)
X_train
# array([[0.23348791, 0.2330902 , 0.22130596, 0.22017413],
#        [0.12607878, 0.13132982, 0.12446595, 0.12910621],
#        [0.11628783, 0.1221546 , 0.11416516, 0.12365438],
#        ...,
#        [0.21824269, 0.21755404, 0.2196509 , 0.21625878],
#        [0.21816626, 0.21748705, 0.21687001, 0.21420059],
#        [0.21197648, 0.21184884, 0.21360321, 0.2108979 ]], shape=(2527, 4))

In [None]:
y_train = scaler_y.fit_transform(train_df[TARGET].values)
y_val = scaler_y.transform(val_df[TARGET].values)
y_test = scaler_y.transform(test_df[TARGET].values)

In [None]:
L = 20
X_train_seq, y_train_seq = make_seq(X_train, y_train, L)
X_val_seq, y_val_seq = make_seq(X_val, y_val, L)
X_test_seq, y_test_seq = make_seq(X_test, y_test, L)
print(X_train_seq.shape, y_train_seq.shape)
print(X_val_seq.shape, X_val_seq.shape)
print(X_test_seq.shape, X_test_seq.shape)

In [None]:
batch_size = 64

train_loader = DataLoader(TensorDataset(X_train_seq, y_train_seq), batch_size = batch_size, shuffle = True)
val_loader = DataLoader(TensorDataset(X_val_seq, y_val_seq), batch_size = batch_size, shuffle = False)
test_loader = DataLoader(TensorDataset(X_test_seq, y_test_seq), batch_size = batch_size, shuffle = False)

In [None]:
model = RNNRegressor(input_size = X_train_seq.size(2), hidden_size=64, num_layers = 2, dropout = 0.1).to(device)  
model

# RNNRegressor(
#   (rnn): RNN(4, 64, num_layers=2, batch_first=True, dropout=0.1)
#   (head): Sequential(
#     (0): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
#     (1): Linear(in_features=64, out_features=64, bias=True)
#     (2): ReLU()
#     (3): Dropout(p=0.1, inplace=False)
#     (4): Linear(in_features=64, out_features=1, bias=True)
#   )
# )

In [None]:
criterion = nn.MSELoss()
optimizer = optim.AdamW(model.parameters(), lr = 2e-3, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = 'min', factor = 0.5, patience = 10)

In [None]:
clip_norm = 1.0
epochs = 300
early = EarlyStopping(patience=30, min_delta=1e-6)

In [None]:
def run_epoch(loader, train=True):
    model.train(train)
    total = 0.0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        if train:
            optimizer.zero_grad()
        pred = model(xb)
        loss = criterion(pred, yb)
        if train:
            loss.backward()
            # clip_norm: 모든 파라미터의 기울기를 모아서, 크기가 max_norm을 넘으면 강제로 줄임
            # 예) L2 norm 계산해서 현재 norm = 5.0 라면 max_norm이 1.0 일 때 모든 gradient에 1/5를 곱함
            # 0.5 ~ 5.0 사이가 안정적
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=clip_norm)
            optimizer.step()
        total += loss.item()
    return total / len(loader)

In [None]:
for epoch in range(1, epochs + 1):
    tr_loss = run_epoch(train_loader, train=True)
    va_loss = run_epoch(val_loader, train=False)
    scheduler.step(va_loss)
    if epoch == 1 or epoch % 20 == 0:
        lr = optimizer.param_groups[0]['lr']
        print(f"[{epoch:03d}/{epochs}] train MSE: {tr_loss:.6f} | val MSE: {va_loss:.6f} | lr: {lr:.2e}")
    if early.step(va_loss, model):
        print(f'Early stop at epoch {epoch}. BEST val MSE: {early.best:.6f}')
        break
early.load_best(model, device)

# [001/300] train MSE: 0.022115 | val MSE: 0.011054 | lr: 2.00e-03
# [020/300] train MSE: 0.003933 | val MSE: 0.014996 | lr: 1.00e-03
# Early stop at epoch 31. BEST val MSE: 0.011054

In [None]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

In [None]:
model.eval()
pred_list, true_list = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        pred = model(xb)
        pred_list.append(pred.cpu().numpy())
        true_list.append(yb.cpu().numpy())
pred_scaled = np.vstack(pred_list) # 배치 단위로 예측한 결과를 하나의 배열로 합침
true_scaled = np.vstack(true_list)

pred_unscaled = scaler_y.inverse_transform(pred_scaled) # 예측값을 원래 스케일로 복원
true_unscaled = scaler_y.inverse_transform(true_scaled)
mse = mean_squared_error(true_unscaled, pred_unscaled)
rmse = np.sqrt(mse)
mae = mean_absolute_error(true_unscaled, pred_unscaled)
r2 = r2_score(true_unscaled, pred_unscaled)
print(f"Test MSE: {mse:.6f} | RMSE: {rmse:.6f} | MAE: {mae:.6f} | R2: {r2:.6f}")

# Test MSE: 19748.472656 | RMSE: 140.529259 | MAE: 127.858833 | R2: 0.571647


In [None]:
import matplotlib.pyplot as plt
t0 = test_start
N_pred = len(pred_unscaled) #1334: 예측값의 개수
y_actual_test = df['Close'].values[t0 + L : t0 + L + N_pred] # 실제값 (L=20일 이후부터 예측값 개수만큼)
y_pred_test = pred_unscaled.ravel() # 예측값을 1차원 배열로 변환
x_test = np.arange(t0 + L, t0 + L + N_pred)

plt.figure(figsize=(20, 8))
plt.plot(df["Close"].values, alpha=0.25, label="full series")
plt.plot(x_test, y_actual_test, label="actual (test)")
plt.plot(x_test, y_pred_test, linewidth=0.9, label="prediction")
plt.legend()
plt.title("KOSPI Close — Test segment (aligned)")
plt.show()