In [None]:
import torch

In [None]:
# Fix seed
torch.manual_seed(0)
# 재현 가능한 결과를 위해 랜덤 시드를 고정

# input data: batch size (num. samples) = 2, seq_len = 5, input_size (dimension) = 1
x = torch.tensor([
    [[1.0], [2.0], [3.0], [4.0], [5.0]],      # sample 1
    [[6.0], [7.0], [8.0], [9.0], [10.0]]      # sample 2
])  # shape: (2, 5, 1)
# 2개의 샘플, 각 샘플은 길이 5의 시계열로 구성되어 있음
# 각 시점의 feature는 1개 → x.shape = (2, 5, 1)

In [None]:
batch_size, seq_len, input_size = x.shape
output_size = 1

print(batch_size, seq_len, input_size, output_size)
hidden_size = 1 # select hidden size (len(h))

# initialize trainable parameters with Normal distribution (N(0,1))
W_xh = torch.nn.Parameter(torch.randn(input_size, hidden_size))
W_hh = torch.nn.Parameter(torch.randn(hidden_size, hidden_size))
b_h = torch.nn.Parameter(torch.randn(hidden_size))


W_hy = torch.nn.Parameter(torch.randn(hidden_size, output_size))
b_y = torch.nn.Parameter(torch.randn(output_size))
# W_xh: 입력 → 은닉, W_hh: 은닉 → 은닉 (순환 연결)
# b_h: 은닉 상태의 bias, W_hy: 은닉 → 출력
# b_y: 출력 bias



2 5 1 1


In [129]:
print(W_xh)
print(W_hh)
print(b_h)
print(W_hy)
print(b_y)

Parameter containing:
tensor([[1.4593]], requires_grad=True)
Parameter containing:
tensor([[1.3651]], requires_grad=True)
Parameter containing:
tensor([-0.9279], requires_grad=True)
Parameter containing:
tensor([[0.4106]], requires_grad=True)
Parameter containing:
tensor([0.5223], requires_grad=True)


In [None]:
#RNN layer
h = torch.zeros(batch_size, hidden_size) # h0

t=0
x_t = x[:, t, :]  # (batch, input_size) # x1
print(x_t)
h = torch.tanh(x_t @ W_xh + h @ W_hh + b_h)  # h1
y_t = h @ W_hy + b_y #y_1 (=x2 if you want to make a next step prediction model)
# 시점 t=0에서 은닉 상태 h를 업데이트하고 출력값 y_t 계산

tensor([[1.],
        [6.]])


In [None]:
t=1
x_t = x[:, t, :] #x2
print(x_t)
h = torch.tanh(x_t @ W_xh + h @ W_hh + b_h)
y_t = h @ W_hy + b_y
# 시점 t=1에서도 동일한 방식으로 계산 반복

tensor([[2.],
        [7.]])


In [132]:
optimizer = torch.optim.Adam([W_xh, W_hh, b_h, W_hy, b_y], lr=0.01)

In [133]:
loss_fn = torch.nn.MSELoss()

In [None]:
y_target = x[:,1:,:] # next step prediction y_t = x_t+1
# 출력 y_t는 다음 시점의 입력 x_{t+1}이 되도록 학습
# 예: t=0 → x=1 → y=2

In [None]:

for epoch in range(5000):
    # 각 epoch마다 h를 초기화하고 전체 시퀀스를 순회하면서 y_t를 예측
    h = torch.zeros(batch_size, hidden_size)
    outputs = []

    for t in range(seq_len - 1):  # predict 1 step ahead
        x_t = x[:, t, :]  # (batch, input_size)
        h = torch.tanh(x_t @ W_xh + h @ W_hh + b_h)
        y_t = h @ W_hy + b_y
        outputs.append(y_t.unsqueeze(1))

    y_pred = torch.cat(outputs, dim=1)  # (batch, seq_len-1, output_size)
    loss = loss_fn(y_pred, y_target)
    # oss는 MSELoss를 사용하고, 예측한 y_pred와 실제 x_{t+1}과의 차이를 줄이는 방향으로 학습

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if epoch % 200 == 0:
        print(f"Epoch {epoch} | Loss: {loss.item():.4f}")

Epoch 0 | Loss: 33.2401
Epoch 200 | Loss: 9.6892
Epoch 400 | Loss: 3.0524
Epoch 600 | Loss: 1.0645
Epoch 800 | Loss: 0.4837
Epoch 1000 | Loss: 0.2867
Epoch 1200 | Loss: 0.1962
Epoch 1400 | Loss: 0.1446
Epoch 1600 | Loss: 0.1111
Epoch 1800 | Loss: 0.0875
Epoch 2000 | Loss: 0.0700
Epoch 2200 | Loss: 0.0564
Epoch 2400 | Loss: 0.0458
Epoch 2600 | Loss: 0.0372
Epoch 2800 | Loss: 0.0303
Epoch 3000 | Loss: 0.0248
Epoch 3200 | Loss: 0.0204
Epoch 3400 | Loss: 0.0172
Epoch 3600 | Loss: 0.0145
Epoch 3800 | Loss: 0.0126
Epoch 4000 | Loss: 0.0114
Epoch 4200 | Loss: 0.0105
Epoch 4400 | Loss: 0.0099
Epoch 4600 | Loss: 0.0095
Epoch 4800 | Loss: 0.0091


In [135]:
y_pred # RNN output

tensor([[[2.0117],
         [3.1029],
         [3.9099],
         [4.9106]],

        [[7.0098],
         [8.1354],
         [9.0577],
         [9.8524]]], grad_fn=<CatBackward0>)

In [136]:
y_target #true x_t+1

tensor([[[ 2.],
         [ 3.],
         [ 4.],
         [ 5.]],

        [[ 7.],
         [ 8.],
         [ 9.],
         [10.]]])

In [137]:
#What if we get new sample?

In [None]:
x_new = torch.tensor([[[3.0], [4.0], [5.0], [6.0], [7.0]]])
# 새로운 입력 데이터 하나를 넣고, 학습된 모델이 x_t+1을 어떻게 예측하는지 출력

In [139]:
x_new.size()

torch.Size([1, 5, 1])

In [None]:
h = torch.zeros(x_new.size()[0], hidden_size)
for t in range(seq_len-1):
  x_t = x_new[:, t, :]  # (batch, input_size)
  print(x_t)
  h = torch.tanh(x_t @ W_xh + h @ W_hh + b_h)  # (batch, hidden)
  y_t = h @ W_hy + b_y
  print(y_t.unsqueeze(1))
  # 이때도 학습된 파라미터를 그대로 사용해 시퀀스를 따라가며 예측값 출력

tensor([[3.]])
tensor([[[3.4610]]], grad_fn=<UnsqueezeBackward0>)
tensor([[4.]])
tensor([[[4.9852]]], grad_fn=<UnsqueezeBackward0>)
tensor([[5.]])
tensor([[[5.9748]]], grad_fn=<UnsqueezeBackward0>)
tensor([[6.]])
tensor([[[7.0844]]], grad_fn=<UnsqueezeBackward0>)


In [141]:
##RNN with smaller number of sequence legnth? (Sliding Window, Rolling Window)

In [147]:
# input data: batch size (num. samples) = 2, seq_len = 5, input_size (dimension) = 1
x = torch.tensor([
    [[1.0], [2.0], [3.0], [4.0], [5.0]],      # sample 1
    [[6.0], [7.0], [8.0], [9.0], [10.0]]      # sample 2
])  # shape: (2, 5, 1)

In [None]:
window_size = 3

# 슬라이딩 윈도우로 시퀀스 분할
sequences = []
for j in range(len(x)):
  for i in range(len(x[0]) - window_size + 1):
      window = x[j][i:i + window_size] #sample idx * [window len]
      sequences.append(window)
# 슬라이딩 윈도우로 시계열을 분할해 짧은 길이의 시퀀스로 만들어줌
# 예: [1,2,3,4,5] → [1,2,3], [2,3,4], [3,4,5]

x_small = torch.stack(sequences)
# 결과 x_small의 shape: (샘플 수, 3, 1)

In [163]:
x_small.size()

torch.Size([6, 3, 1])

In [165]:
x_small

tensor([[[ 1.],
         [ 2.],
         [ 3.]],

        [[ 2.],
         [ 3.],
         [ 4.]],

        [[ 3.],
         [ 4.],
         [ 5.]],

        [[ 6.],
         [ 7.],
         [ 8.]],

        [[ 7.],
         [ 8.],
         [ 9.]],

        [[ 8.],
         [ 9.],
         [10.]]])