<a href="https://colab.research.google.com/github/park-hoyeon/park-hoyeon.github.io/blob/master/skt_7_02_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 순환 신경망(Recurrent Neural Network)
- 시퀀스	데이터(음성,	자연어,	시계열	데이터	등)를	학습시키기	위한	신경망
- RNN의 가장 큰 특징: 은닉층이 이전 데이터를 참조하도록 서로 연결된다.
- 	셀(Cell)	:	은닉층에서	결과를	출력하는	노드를	말한다.	셀은	이전의	값을	기억하려고	하는	일종의	메모리	역할을	수행
- 	은닉	상태(hidden	state)	:	메모리	셀이	출력층	방향	또는	다음	시점인	t+1의	자신에게	보내는	값

### 주가 예측 모델

In [None]:
 pip install -U finance-datareader

In [None]:
import FinanceDataReader as fdr
samsung = fdr.DataReader('005930', '1998-09-01', '2022-10-30')
samsung.shape

In [None]:
samsung.head()

In [None]:
# 데이터 시각화
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(12, 6))
sns.lineplot(y=samsung['Close'], x=samsung.index)
plt.show()

In [None]:
# 특성 추가 - rolling() 함수를 사용해 3일 평균과 5일 평균의 피처를 추가한다.
import numpy as np
samsung['3MA'] = np.around(samsung['Close'].rolling(window=3).mean(), 0)
samsung['5MA'] = np.around(samsung['Close'].rolling(window=5).mean(), 0)

In [None]:
# 최고와 최저의 중간값을 특성으로 추가한다.
samsung['Mid'] = (samsung['High'] + samsung['Low'])/2
samsung.head()

In [None]:
# 이상치 처리 - 거래량 'Voiume'이 0인 데이터는 이상치로 판단하고, Volime이 0인 행만 추출해서 확인한다.
samsung[samsung['Volume'] == 0]

In [None]:
# Volume이 0인 값을 NaN값으로 변경해서 결측치로 처리하기 위함
samsung['Volume'] = samsung['Volume'].replace(0, np.nan)
samsung.isna().sum(axis=0)

In [None]:
# 결측치가 포함된 행을 삭제하고 확인한다.
samsung = samsung.dropna()
samsung.isna().sum(axis=0)

In [None]:
# 데이터 정규화
from sklearn.preprocessing import MinMaxScaler
import pandas as pd # Import pandas
scaler = MinMaxScaler()
scale_cols = ['Close', '3MA', '5MA', 'Mid','Volume']
df_scaled = scaler.fit_transform(samsung[scale_cols])
df_scaled = pd.DataFrame(df_scaled)
df_scaled.columns = scale_cols
print(df_scaled[:5])

In [None]:
# 시퀀스 데이터로 변형
def make_sequene_dataset(feature, label, window_size):
  feature_list = []
  label_list = []
  for i in range(len(feature)-window_size):
    feature_list.append(feature[i:i+window_size])
    label_list.append(label[i+window_size])
  return np.array(feature_list), np.array(label_list)

In [None]:
# 데이터 프레임을 생성한다.
feature_cols = ['3MA', '5MA', 'Mid','Volume']
label_cols = [ 'Close' ]
npX = pd.DataFrame(df_scaled, columns=feature_cols).values
npY = pd.DataFrame(df_scaled, columns=label_cols).values
print(npX.shape, npY.shape)

In [None]:
# 윈도우 사이즈는 타임스텝의 길이가 된다.
window_size = 20
X_data, Y_data = make_sequene_dataset(npX, npY, window_size)
print(X_data.shape, Y_data.shape)

In [None]:
# 테스트 데이터 분리 - 넘파이 슬라이싱 기능을 사용해 데이터를 분리한다.
split = int(len(X_data)*0.8)
X_train = X_data[0:split]
y_train = Y_data[0:split]
X_test = X_data[split:]
y_test = Y_data[split:]
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)


In [None]:
# 모델 만들기
from tensorflow import keras
from tensorflow.keras import layers
model = keras.Sequential()
model.add(layers.LSTM(64, activation='tanh', input_shape=(20, 4)))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(1))
model.summary()

In [None]:
# 모델 학습
from tensorflow.keras.losses import Huber
from keras.callbacks import EarlyStopping
model.compile(loss='mse', optimizer='adam', metrics=['mae'])
early_stop = EarlyStopping(monitor='val_loss', patience=10)
EPOCHS = 100
BATCH_SIZE = 16
history = model.fit(X_train, y_train,
           epochs=EPOCHS,
           batch_size=BATCH_SIZE,
           validation_data=(X_test, y_test),
           callbacks=[early_stop])

In [None]:
# 학습 곡선
import matplotlib.pyplot as plt
def plot_history(history):
  his_dict = history.history
  loss = his_dict['loss']
  val_loss = his_dict['val_loss']
  epochs = range(1, len(loss) + 1)
  fig = plt.figure(figsize = (12, 5))
  ax1 = fig.add_subplot(1, 2, 1)
  ax1.plot(epochs, loss, 'b-', label = 'train_loss')
  ax1.plot(epochs, val_loss, 'r-', label = 'val_loss')
  ax1.set_title('train and val loss')
  ax1.set_xlabel('epochs')
  ax1.set_ylabel('loss')
  ax1.legend()
  acc = his_dict['mae']
  val_acc = his_dict['val_mae']
  ax2 = fig.add_subplot(1, 2, 2)
  ax2.plot(epochs, acc, 'b-', label = 'train_mae')
  ax2.plot(epochs, val_acc, 'r-', label = 'val_mae')
  ax2.set_title('train and val mae')
  ax2.set_xlabel('epochs')
  ax2.set_ylabel('mae')
  ax2.legend()
  plt.show()
plot_history(history)

In [None]:
# 평가 및 예측 - 테스트 데이터를 사용해 평가한다.
loss, mae = model.evaluate(X_test, y_test)

In [None]:
# 테스트 데이터를 사용해 예측한다.
y_pred = model.predict(X_test)
for i in range(5):
  print('Close: ', y_test[i], ' Predict: ', y_pred[i])

In [None]:
# 결과 시각화
plt.figure(figsize=(12, 6))
plt.ylabel('Close')
plt.xlabel('period')
plt.plot(y_test[20:], label='actual')
plt.plot(y_pred, label='prediction')
plt.grid()
plt.legend(loc='best')
plt.show()

In [None]:
# MAPE(평균 절대값 백분율 오차)
# MAPE는	퍼센트	값을	가지며	0에	가까울수록	회귀	모형의	성능이	좋다

loss, mae = model.evaluate(X_test, y_test)

In [None]:
# 테스트 데이터를 사용해 예측한다.
mape1 = np.sum(abs(y_test-y_pred)/y_test) / len(X_test)
print(mape1)

# 멀티레이어 LSTM


In [None]:
# 모델 만들기
# LSTM을	여러층을	사용할	경우는	return_sequences=True의	설정이	필요
from tensorflow import keras
from tensorflow.keras import layers
model = keras.Sequential()
model.add(layers.LSTM(64, activation='tanh',
                      return_sequences=True,
                      input_shape=(20, 4)))
model.add(layers.LSTM(128, activation='tanh',
                      return_sequences=False))
model.add(layers.Dense(16, activation='relu'))
model.add(layers.Dense(1))
model.summary()

In [None]:
# 모델의 학습 - 시퀀스데이터의 학습에 성능이 좋은 Huber를 손실함수로 사용한다.
from tensorflow.keras.losses import Huber
from keras.callbacks import EarlyStopping
model.compile(loss=Huber(), optimizer='adam', metrics=['mae'])
early_stop = EarlyStopping(monitor='val_loss', patience=10)
EPOCHS = 100
BATCH_SIZE = 16
history = model.fit(X_train, y_train,
           epochs=EPOCHS,
           batch_size=BATCH_SIZE,
           validation_data=(X_test, y_test),
           callbacks=[early_stop])

In [None]:
# 예측
y_pred = model.predict(X_test)
for i in range(5):
   print('Close: ', y_test[i], ' Predict: ', y_pred[i])

In [None]:
# 결과의 시각화
plt.figure(figsize=(12, 6))
plt.ylabel('Close')
plt.xlabel('period')
plt.plot(y_test[20:], label='actual')
plt.plot(y_pred, label='prediction')
plt.grid()
plt.legend(loc='best')
plt.show()

In [None]:
# MAPE 계산
mape2 = np.sum(abs(y_test-y_pred)/y_test) / len(y_test)

In [None]:
# MAPE 비교
print('MAPE 1 : ', mape1)
print('MAPE 2 : ', mape2)

# RNN으로 이미지 분류

In [None]:
# 데이터 준비
from tensorflow.keras.datasets import mnist
(X_train, y_train), (X_test, y_test) = mnist.load_data()
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

In [None]:
# 데이터 표시
import matplotlib.pyplot as plt
_, axes = plt.subplots(nrows=1, ncols=3, figsize=(15, 7))
for ax, image, label in zip(axes, X_train, y_train):
    ax.set_axis_off()
    ax.imshow(image, cmap='gray')
    ax.set_title(label)

In [None]:
# 정규화
import numpy as np

X_train = X_train/255.
X_test = X_test/255.
print(np.min(X_train), np.max(X_train))

In [None]:
# 원-핫 인코딩
from tensorflow.keras.utils import to_categorical
y_train_oh = to_categorical(y_train)
y_test_oh = to_categorical(y_test)
y_train_oh[:5]

In [None]:
#  모델 만들기
from tensorflow import keras
from tensorflow.keras import layers
model = keras.Sequential([
   layers.LSTM(64, input_shape=(28, 28)),
   layers.Dense(10, activation='softmax')
])
model.summary()

In [None]:
# 모델 학습
model.compile(loss='categorical_crossentropy',
        optimizer='adam',
        metrics=['accuracy'])
epochs = 30
batch_size = 64
history = model.fit(X_train, y_train_oh,
           validation_data=(X_test, y_test_oh),
           epochs=epochs,
           batch_size=batch_size,
           verbose=1)

In [None]:
plot_history(history)