In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_sample_image
from tensorflow import keras
from tensorflow.keras import layers

# A Sample Time Series (Dataset) - p7

## 1. Random Data 생성하기(예측하기 어려운 sin + cos 함수)

In [13]:
def generate_time_series(batch_size, n_steps) :
  freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1) # 0 ~ 1까지 균일분포인 (4, batch_size, 1) shape matrix 생성
  # print(f'freq1 : {freq1}, freq2 : {freq2}, offsets1 : {offsets1}, offsets2 : {offsets2}')
  time = np.linspace(0, 1, n_steps) # X축은 50개(n_steps 크기)
  series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10)) # wave1, shape = (10000, 51)
  series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20)) # wave2
  series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5) # noise
  return series[..., np.newaxis].astype(np.float32) # shape = (10000, 51, 1)

In [14]:
np.random.seed(42)
n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train, y_train = series[:7000, :n_steps], series[:7000, -1] # X : 앞의 7000개(각각 50개를 보고) => Y : 다음 51번 째 예측(7000개의 예측값이 있는 거)
X_valid, y_valid = series[7000:9000, :n_steps], series[7000:9000, -1]
X_test, y_test = series[9000:, :n_steps], series[9000:, -1]

In [4]:
X_train.shape, y_train.shape

((7000, 50, 1), (7000, 1))

## 2 - 1 Option1. RNN Model 생성(Deep RNN)

In [22]:
model = keras.models.Sequential([
  keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
  keras.layers.SimpleRNN(20), # Dense 전 계층에서는 return_sequence=False
  keras.layers.Dense(1) # 하나씩 예측하므로 1이다
])

In [23]:
model.compile(loss='mse', optimizer='adam')
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [24]:
# 10개 값 예측 - Option1. Sequence to Vector of Size #1 (1개씩 예측, 예측한 값을 넣어서 다시 다음 값 예측)
np.random.seed(43) # 이전과 다른 random seed (그래야 진짜 예측 하는지 볼 수 있으니까)
n_steps = 50 # 넣어주는 데이터 갯수
series = generate_time_series(1, n_steps + 10) # batch가 1개, 원래 data 50개 + 예측할 10개
X_new, Y_new = series[:, :n_steps], series[:, n_steps:] # X는 50개(넣어주는 값), Y는 10개(예측값)
X = X_new
print(X.shape) # (1, 50, 1)

for step_ahead in range(10) :
  # y_pred = model.predict(X[:, step_ahead:]) # shape : (1, 1)
  # print(f'y_pred shape : {y_pred.shape}')
  # y_pred_one = y_pred[:, np.newaxis, :] # shape : (1, 1, 1) 원래 원본에서 중간에 newaxis추가 (하나씩 예측했다는 뜻)
  # print(f'y_pred_one shape : {y_pred_one.shape}')
  y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis, :]
  X = np.concatenate([X, y_pred_one], axis=1)
print(X.shape) # (1, 60, 1) 예측값 10개가 추가된 것
Y_pred = X[:, n_steps:] # 예측값 10개만 짤라냄
Y_pred.shape # (1, 10, 1) 하나의 batch 에 대해 10개를 예측했다는 것

(1, 50, 1)
(1, 60, 1)


(1, 10, 1)

## 2 - 2 Option2. RNN Model 생성

In [25]:
# 10개 값 예측 - Option2. Sequence to Vector of Size 10 (10개를 한번에 예측) - 마지막 time step에서만 다음 10개를 예측한다
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential
model = keras.models.Sequential([
  keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]), # True면 time step마다 값 내보냄
  keras.layers.SimpleRNN(20), # dense 쓰기 전에 return_sequence=False로 둬야함
  keras.layers.Dense(10)
])

In [26]:
model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [31]:
# Option2 test
np.random.seed(43)
series = generate_time_series(1, 50 + 10)
X_new, Y_new = series[:, :50, :], series[:, -10:, :]
Y_pred = model.predict(X_new)[..., np.newaxis]
Y_pred.shape

(1, 10, 1)

## 2 - 3 Option3. RNN Model 생성

In [32]:
np.random.seed(42)
n_steps = 50
series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
X_valid = series[7000:9000, :n_steps]
Y = np.empty((10000, n_steps, 10)) # 각 time step마다 10차원 벡터가 나와야 한다
for step_ahead in range(1, 10 + 1) :
  Y[..., step_ahead - 1] = series[..., step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]

In [33]:
np.random.seed(42)
tf.random.set_seed(42)
model = keras.models.Sequential([
  keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
  keras.layers.SimpleRNN(20, return_sequences=True),
  keras.layers.TimeDistributed(keras.layers.Dense(10)) # dense가 아니라 TimeDistribute
])

In [34]:
def last_time_step_mse(Y_true, Y_pred) :
  return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01), metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20, validation_data=(X_valid, Y_valid))

  "The `lr` argument is deprecated, use `learning_rate` instead.")


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
