<a href="https://colab.research.google.com/github/kane-youn/PYT_study/blob/main/TS_250822.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =========================
# 0) Imports & Seed
# =========================
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

np.random.seed(42)
tf.random.set_seed(42)

# =========================
# 1) Synthetic time series
# =========================
T = 1200
t = np.arange(T)
series_raw = (
    np.sin(2*np.pi*t/50)       # 주기 50
    + 0.5*np.sin(2*np.pi*t/12) # 주기 12
    + 0.01*t                   # 완만한 추세
)
series_raw += 0.25*np.random.randn(T)  # 잡음 조금 더 강하게 (극값 학습 확인용)

# 전역 표준화
series = (series_raw - series_raw.mean()) / (series_raw.std() + 1e-8)

# =========================
# 2) Windowing (multi-step)
# =========================
def make_windows(data, window_size=96, horizon=24):
    X, y = [], []
    for i in range(len(data) - window_size - horizon + 1):
        X.append(data[i:i+window_size])
        y.append(data[i+window_size:i+window_size+horizon])
    X = np.array(X)[..., np.newaxis]  # (B, W, 1)
    y = np.array(y)                   # (B, H)
    return X, y

window_size = 96
horizon     = 24
X, y = make_windows(series, window_size, horizon)

# train/val/test = 60/20/20
n = len(X)
n_train = int(n*0.6)
n_val   = int(n*0.2)
X_train, y_train = X[:n_train], y[:n_train]
X_val,   y_val   = X[n_train:n_train+n_val], y[n_train:n_train+n_val]
X_test,  y_test  = X[n_train+n_val:], y[n_train+n_val:]

# =========================
# 3) Positional Encoding
# =========================
def positional_encoding(length, d_model):
    pos = np.arange(length)[:, np.newaxis].astype(np.float32)      # (W,1)
    i   = np.arange(d_model)[np.newaxis, :].astype(np.float32)     # (1,D)
    angle_rates = 1.0 / np.power(10000.0, (2*(i//2))/d_model)      # (1,D)
    angles = pos * angle_rates                                     # (W,D)
    pe = np.zeros((length, d_model), dtype=np.float32)
    pe[:, 0::2] = np.sin(angles[:, 0::2])
    pe[:, 1::2] = np.cos(angles[:, 1::2])
    return tf.constant(pe)  # (W,D)

# =========================
# 4) Transformer Encoder (Encoder-only)
# =========================
class TransformerEncoder(layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super().__init__()
        self.mha = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=d_model//num_heads, dropout=dropout
        )
        self.dropout1 = layers.Dropout(dropout)
        self.norm1 = layers.LayerNormalization(epsilon=1e-6)

        self.ffn = keras.Sequential([
            layers.Dense(dff, activation="relu"),
            layers.Dense(d_model),
        ])
        self.dropout2 = layers.Dropout(dropout)
        self.norm2 = layers.LayerNormalization(epsilon=1e-6)

    def call(self, x, training=False, mask=None):
        attn_output = self.mha(query=x, value=x, key=x, attention_mask=mask, training=training)
        out1 = self.norm1(x + self.dropout1(attn_output, training=training))
        ffn_output = self.ffn(out1)
        out2 = self.norm2(out1 + self.dropout2(ffn_output, training=training))
        return out2

# =========================
# 5) Quantile Loss (pinball) & Non-crossing head
# =========================
def pinball_loss(y_true, y_pred, q):
    # y_true, y_pred: (B, H)
    e = y_true - y_pred
    return tf.reduce_mean(tf.maximum(q*e, (q-1.0)*e))

class MultiQuantileHead(layers.Layer):
    """
    q10 <= q50 <= q90 를 강제:
    q50 = base
    q10 = base - softplus(delta_lo)
    q90 = base + softplus(delta_hi)
    """
    def __init__(self, horizon):
        super().__init__()
        self.base = layers.Dense(horizon, activation="linear")      # q50
        self.delta_lo = layers.Dense(horizon, activation="linear")  # for q10
        self.delta_hi = layers.Dense(horizon, activation="linear")  # for q90

    def call(self, x):
        base = self.base(x)                    # (B,H)
        lo   = base - tf.nn.softplus(self.delta_lo(x))
        hi   = base + tf.nn.softplus(self.delta_hi(x))
        return lo, base, hi                    # q10, q50, q90

def quantile_model(window_size, d_model=128, num_heads=8, dff=256,
                   num_layers=3, dropout=0.1, horizon=24):
    inp = layers.Input(shape=(window_size, 1))   # (B,W,1)
    x = layers.Dense(d_model)(inp)
    x = x + positional_encoding(window_size, d_model)

    for _ in range(num_layers):
        x = TransformerEncoder(d_model, num_heads, dff, dropout)(x)

    x = x[:, -1, :]                               # last token
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(dropout)(x)

    head = MultiQuantileHead(horizon)
    q10, q50, q90 = head(x)                       # (B,H) each
    model = keras.Model(inp, [q10, q50, q90])
    return model

# 커스텀 훈련 루프 대신, keras loss 3개로 합산
q_list = [0.1, 0.5, 0.9]
def make_losses():
    def loss_q10(y_true, y_pred): return pinball_loss(y_true, y_pred, q_list[0])
    def loss_q50(y_true, y_pred): return pinball_loss(y_true, y_pred, q_list[1])
    def loss_q90(y_true, y_pred): return pinball_loss(y_true, y_pred, q_list[2])
    return loss_q10, loss_q50, loss_q90

# =========================
# 6) Build & Train
# =========================
model = quantile_model(window_size, d_model=128, num_heads=8, dff=256,
                       num_layers=3, dropout=0.1, horizon=horizon)
losses = make_losses()
model.compile(
    optimizer=keras.optimizers.Adam(5e-4),
    loss=list(losses),              # [loss_q10, loss_q50, loss_q90]
    loss_weights=[1.0, 2.0, 1.0]    # 중앙(0.5)에 가중치를 더 주어 안정화
)
model.summary()

es = keras.callbacks.EarlyStopping(patience=20, restore_best_weights=True, monitor="val_loss")
hist = model.fit(
    X_train, [y_train, y_train, y_train],    # 세 개의 타깃에 동일한 y
    validation_data=(X_val, [y_val, y_val, y_val]),
    epochs=300, batch_size=64, callbacks=[es], verbose=1
)

# =========================
# 7) Predict
# =========================
q10_pred, q50_pred, q90_pred = model.predict(X_test)   # 각 (Ntest, H)
y_true = y_test                                        # (Ntest, H)

# =========================
# 8) Metrics & Coverage
# =========================
# 중앙선 기준 점오차
mae_med  = np.mean(np.abs(y_true - q50_pred))
rmse_med = np.sqrt(np.mean((y_true - q50_pred)**2))
print(f"Median (q50) Overall MAE : {mae_med:.4f}")
print(f"Median (q50) Overall RMSE: {rmse_med:.4f}")

# 스텝별 MAE
mae_per_h = np.mean(np.abs(y_true - q50_pred), axis=0)
print("MAE per step:", np.round(mae_per_h, 4))

# 10~90% 밴드 커버리지 (이론상 ~0.8 근처가 이상적)
coverage_10_90 = np.mean((y_true >= q10_pred) & (y_true <= q90_pred))
print(f"Central 80% band coverage: {coverage_10_90:.3f}")

# =========================
# 9) Plots
# =========================
# (a) 한 윈도우의 H-step 경로
idx = min(50, len(X_test)-1)
xs = np.arange(horizon)
plt.figure(figsize=(9,4))
plt.plot(xs, y_true[idx], label="Actual (next H)")
plt.plot(xs, q50_pred[idx], label="q50 (median)")
plt.fill_between(xs, q10_pred[idx], q90_pred[idx], alpha=0.2, label="80% band (q10~q90)")
plt.title(f"Multi-step Quantile Forecast (idx={idx})")
plt.legend(); plt.tight_layout(); plt.show()

# (b) 겹치지 않는 롤링 백테스트
sel = np.arange(0, len(y_true), horizon)
true_seq = y_true[sel].reshape(-1)
q50_seq  = q50_pred[sel].reshape(-1)
q10_seq  = q10_pred[sel].reshape(-1)
q90_seq  = q90_pred[sel].reshape(-1)

plt.figure(figsize=(10,4))
plt.plot(true_seq, label="Actual")
plt.plot(q50_seq,  label="q50")
plt.fill_between(np.arange(len(q50_seq)), q10_seq, q90_seq, alpha=0.2, label="80% band")
plt.title(f"Rolling multi-step backtest (stride = horizon={horizon})")
plt.legend(); plt.tight_layout(); plt.show()





In [None]:
# plt.plot(true_seq, label="Actual")
# plt.plot(X, label="RAW")
plt.plot(series, label="STD")
