In [None]:
## library import
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
import os

print(tf.__version__)
print(keras.__version__)

# RNN을 이용하여 MNIST Classification 학습하기 
  

MNIST Image를 한 line씩 RNN에 입력하여 classification을 하는 실습을 해보겠습니다.

In [None]:
## Hyper-parameters
learning_rate = 0.001
N_EPOCHS = 20
N_BATCH = 100
N_CLASS = 10

In [None]:
## Data 준비
## MNIST Dataset #########################################################
mnist = keras.datasets.mnist
class_names = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
##########################################################################

## Fashion MNIST Dataset #################################################
#mnist = keras.datasets.fashion_mnist
#class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
##########################################################################

In [None]:
## Dataset 만들기
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

N_TRAIN = train_images.shape[0]
N_TEST = test_images.shape[0]
print(train_images.shape, test_images.shape)

In [None]:
# pixel값을 0~1사이 범위로 조정
train_images = train_images.astype(np.float32) / 255.
test_images = test_images.astype(np.float32) / 255.

# label을 onehot-encoding
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)    

# Dataset 구성
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(
                buffer_size=100000).batch(N_BATCH)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(N_BATCH)

In [None]:
## Model 만들기
def create_model():
    model = keras.Sequential()
    model.add(layers.LSTM(units=256, return_sequences=False, input_shape=(28,28)))
    model.add(layers.Dense(units=10, activation='softmax'))
    return model

In [None]:
## Create model, compile & summary
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate),
              loss='categorical_crossentropy',
              metrics=['accuracy'])
model.summary()

In [None]:
## 학습 전에 결과 확인
model.evaluate(test_dataset)

In [None]:
## Training
history = model.fit(train_dataset, epochs=N_EPOCHS, 
                    validation_data=test_dataset)

In [None]:
## 결과 확인
def plot_image(i, predictions_array, true_label, img):
    predictions_array, true_label, img = predictions_array[i], true_label[i], img[i]
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])

    plt.imshow(img,cmap=plt.cm.binary)

    predicted_label = np.argmax(predictions_array)
    if predicted_label == true_label:
        color = 'blue'
    else:
        color = 'red'

    plt.xlabel("{} {:2.0f}% ({})".format(class_names[predicted_label],
                                100*np.max(predictions_array),
                                class_names[true_label]),
                                color=color)

def plot_value_array(i, predictions_array, true_label):
    predictions_array, true_label = predictions_array[i], true_label[i]
    plt.grid(False)
    #plt.xticks([])
    plt.xticks(range(N_CLASS), class_names, rotation=90)
    plt.yticks([])
    thisplot = plt.bar(range(N_CLASS), predictions_array, color="#777777")
    plt.ylim([0, 1]) 
    predicted_label = np.argmax(predictions_array)
 
    thisplot[predicted_label].set_color('red')
    thisplot[true_label].set_color('blue')

In [None]:
rnd_idx = np.random.randint(1, N_TEST//N_BATCH)
img_cnt = 0
for images, labels in test_dataset:
    img_cnt += 1
    if img_cnt != rnd_idx:
        continue
    predictions = model(images, training=False)
    num_rows = 5
    num_cols = 3
    num_images = num_rows*num_cols
    labels = tf.argmax(labels, axis=-1)
    plt.figure(figsize=(3*2*num_cols, 4*num_rows))
    plt.subplots_adjust(hspace=1.0)
    for i in range(num_images):
        plt.subplot(num_rows, 2*num_cols, 2*i+1)
        plot_image(i, predictions.numpy(), labels.numpy(), images.numpy())
        plt.subplot(num_rows, 2*num_cols, 2*i+2)
        plot_value_array(i, predictions.numpy(), labels.numpy())        
    break

# Simple Language Model with RNN

RNN을 이용하여 간단한 language model을 학습시켜 보겠습니다.

In [None]:
## 학습시킬 문장
sentence = ("if you want to build a ship, don't drum up people together to "
            "collect wood and don't assign them tasks and work, but rather "
            "teach them to long for the endless immensity of the sea.")

## index를 주면 charcter로 바꿔주는 list
idx2char = list(set(sentence))
## character를 주면 index로 바꿔주는 dictionary
char2idx = {w: i for i, w in enumerate(idx2char)}

In [None]:
idx2char, len(idx2char)

In [None]:
char2idx

In [None]:
## HyperParameters
data_dim = len(idx2char)
hidden_size = len(idx2char)
num_classes = len(idx2char)
sequence_length = 10  # Any arbitrary number
learning_rate = 0.1
training_epochs = 200
print(num_classes)

In [None]:
## Dataset
dataX = []
dataY = []
for i in range(0, len(sentence) - sequence_length):
    x_str = sentence[i:i + sequence_length]
    y_str = sentence[i + 1: i + sequence_length + 1]
    print(i, x_str, '->', y_str)

    x = [char2idx[c] for c in x_str]  # x str to index
    y = [char2idx[c] for c in y_str]  # y str to index

    dataX.append(x)
    dataY.append(y)

batch_size = len(dataX)

In [None]:
dataX = np.array(to_categorical(dataX, num_classes))
dataY = np.array(to_categorical(dataY, num_classes))
print(dataX.shape, dataY.shape)
print(batch_size)

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((dataX, dataY)).shuffle(
                buffer_size=1000).batch(batch_size)

In [None]:
## Model 만들기
def create_model():
    model = keras.Sequential()
    model.add(layers.LSTM(units=hidden_size, return_sequences=True,
                                     input_shape=(dataX.shape[1],dataX.shape[2])))
    model.add(layers.LSTM(units=hidden_size, return_sequences=True))
    model.add(layers.Dense(units=num_classes, activation='softmax'))
    return model

In [None]:
model = create_model()
model.compile(optimizer=keras.optimizers.Adam(learning_rate),
              loss='categorical_crossentropy')
model.summary()

In [None]:
## Training
model.fit(train_dataset, epochs=training_epochs)

In [None]:
## 결과 확인
results = model.predict(dataX, steps=1)
for j, result in enumerate(results):
    index = np.argmax(result, axis=1)
    if j is 0:  # print all for the first result to make a sentence
        print(''.join([idx2char[t] for t in index]), end='')
    else:
        print(idx2char[index[-1]], end='')

# Stock Prediction with RNN  
RNN을 이용한 간단한 주식 예측 모델을 학습해보겠습니다.

In [None]:
## google drive 연동
from google.colab import drive
drive.mount('/content/drive')

In [None]:
## HyperParameters
seq_length = 7
data_dim = 5
hidden_size = 10
output_dim = 1
learning_rate = 0.001
training_epochs = 100
batch_size = 32

In [None]:
## Data Preprocessing
def MinMaxScaler(data):
    ''' Min Max Normalization
    Parameters
    ----------
    data : numpy.ndarray
        input data to be normalized
        shape: [Batch size, dimension]
    Returns
    ----------
    data : numpy.ndarry
        normalized data
        shape: [Batch size, dimension]
    References
    ----------
    .. [1] http://sebastianraschka.com/Articles/2014_about_feature_scaling.html
    '''
    numerator = data - np.min(data, 0)
    denominator = np.max(data, 0) - np.min(data, 0)
    print(denominator)
    print(np.min(data, 0))
    # noise term prevents the zero division
    result = numerator / (denominator + 1e-7)
    return result, denominator[-1], np.min(data, 0)[-1]

In [None]:
## Data 준비
# Open, High, Low, Volume, Close
xy = np.loadtxt('/content/drive/My Drive/ssac/data/hyundai.csv', delimiter=',')
xy, denom, min = MinMaxScaler(xy)
xy = xy.astype(np.float32)
x = xy
y = xy[:, [-1]]  # Close as label

In [None]:
# build a dataset
dataX = []
dataY = []
for i in range(0, len(y) - seq_length):
    _x = x[i:i + seq_length]
    _y = y[i + seq_length]  # Next close price
    if i % 100 == 0:
      print(_x, "->", _y)
    dataX.append(_x)
    dataY.append(_y)

In [None]:
## Train/Test Split
train_size = int(len(dataY) * 0.7-2)
test_size = len(dataY) - train_size
trainX, testX = np.array(dataX[0:train_size]), np.array(
    dataX[train_size:len(dataX)])
trainY, testY = np.array(dataY[0:train_size]), np.array(
    dataY[train_size:len(dataY)])
print(trainX.shape, trainY.shape)
print(testX.shape, testY.shape)

In [None]:
## Dataset 만들기
train_dataset = tf.data.Dataset.from_tensor_slices((trainX, trainY)).shuffle(
                buffer_size=3000).prefetch(buffer_size=batch_size).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((testX, testY)).prefetch(
                buffer_size=batch_size).batch(batch_size)

In [None]:
## Model 만들기
def create_model():
    model = keras.Sequential()
    model.add(keras.layers.LSTM(units=hidden_size, return_sequences=True,
                                     input_shape=(trainX.shape[1],trainX.shape[2])))
    model.add(keras.layers.LSTM(units=hidden_size))
    model.add(keras.layers.Dense(units=output_dim))
    return model

In [None]:
model = create_model()
model.compile(optimizer=keras.optimizers.RMSprop(learning_rate),
              loss='mse',
              metrics=[keras.metrics.RootMeanSquaredError()])
model.summary()

In [None]:
## Training
model.fit(train_dataset, epochs=training_epochs,
          validation_data=test_dataset)

In [None]:
## 결과확인
prediction = model.predict(test_dataset)
plt.figure(figsize=(15,7))
plt.plot(testY)
plt.plot(prediction)
plt.xlabel("Time Period")
plt.ylabel("Stock Price")
plt.legend(['real', 'prediction'])
plt.show()

In [None]:
test_input = np.reshape(x[-7:], (-1, 7, 5))
test_input.shape

In [None]:
final_predict = model.predict(test_input)

In [None]:
final_predict * (denom + 1e-7) + min