230426 수요일 군AI 실습 자료입니다.   
본 내용은 IIPL (Intelligent Information Processing Lab) 소속 석사과정 유승욱 조교가 작성하였습니다.   
참고 자료: MIT Introduction to Deep Learning 6.S191: Lecture 2

> Lecture 2   
- Custom RNN
- Tensorflow RNN
- Advanced Example: Predict the Next Word


### Libraries

In [None]:
import numpy as np
import tensorflow as tf

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Dense, SimpleRNN

### Custom Recurrent Neural Network (RNN)

In [None]:
""" TODO: Make your custom RNN layer """

class OurRNNLayers(tf.keras.layers.Layer):
  def __init__(self, rnn_units, input_dim, output_dim):
    super(OurRNNLayers, self).__init__()

    # Initialize weight matrices
    self.W_xh = self.add_weight(shape=[rnn_units, input_dim])
    self.W_hh = self.add_weight(shape=[rnn_units, rnn_units])
    self.W_hy = self.add_weight(shape=[output_dim, rnn_units])

    # Initialize hidden state to zeros
    self.h = tf.zeros([rnn_units, 1])

    # Get the tensor data types
    print(f'W_xh dtype: {self.W_xh.dtype}')
    print(f'W_hh dtype: {self.W_hh.dtype}')
    print(f'W_hy dtype: {self.W_hy.dtype}')
    print(f'W_h  dtype: {self.h.dtype}')

  def call(self, x):
    # Update the hidden state
    self.h = tf.math.tanh(self.W_hh * self.h + self.W_xh * x)

    # Compute the output
    output = self.W_hy * self.h

    # Return the current output and hidden state
    return output, self.h

In [None]:
custom_layer = OurRNNLayers(3, 3, 3)

x_input = tf.constant(np.arange(9), dtype=tf.float32, shape=[3, 3])
y       = custom_layer.call(x_input)

W_xh dtype: <dtype: 'float32'>
W_hh dtype: <dtype: 'float32'>
W_hy dtype: <dtype: 'float32'>
W_h  dtype: <dtype: 'float32'>


In [None]:
print('input      :')
print('{}'.format(x_input))
print('input shape: {}\n'.format(x_input.shape))

print('output      :')
print('{}'.format(y[0].numpy()))
print('output shape: {}\n'.format(y[0].numpy().shape))

print('hidden state      :')
print('{}'.format(y[1].numpy()))
print('hidden state shape: {}'.format(y[1].numpy().shape))

input      :
[[0. 1. 2.]
 [3. 4. 5.]
 [6. 7. 8.]]
input shape: (3, 3)

output      :
[[ 0.         -0.2840717   0.6898853 ]
 [-0.44379663 -0.88085365  0.42636508]
 [ 0.57483065 -0.7634045   0.33912504]]
output shape: (3, 3)

hidden state      :
[[-0.         -0.6978307   0.92345816]
 [-0.84835994  0.95663047 -0.9988465 ]
 [ 0.9999625   0.9999967  -0.99998415]]
hidden state shape: (3, 3)


### Tensorflow Recurrent Neural Network (RNN)

In [None]:
""" TODO: Make your RNN layer using TensorFlow """

class TFRNNLayers(tf.keras.Model):
  def __init__(self, rnn_units):
    super(TFRNNLayers, self).__init__()
    self.rnn_layer = tf.keras.layers.SimpleRNN(rnn_units,
                                               return_sequences=True,
                                               return_state=True)

  def call(self, x):
    # Forward propagate the inputs
    # Using a tensorflow RNN layers
    hidden_state, last_hidden_state = self.rnn_layer(x)
    return hidden_state, last_hidden_state

In [None]:
custom_layer = TFRNNLayers(3)

x_input = tf.constant(np.arange(9), dtype=tf.float32, shape=[1, 3, 3])
y       = custom_layer.call(x_input)

In [None]:
print('input      :')
print('{}'.format(x_input))
print('input shape: {}\n'.format(x_input.shape))

print('hidden state      :')
print('{}'.format(y[0].numpy()))
print('hidden state shape: {}\n'.format(y[0].numpy().shape))

print('last hidden state      :')
print('{}'.format(y[1].numpy()))
print('last hidden state shape: {}'.format(y[1].numpy().shape))

input      :
[[[0. 1. 2.]
  [3. 4. 5.]
  [6. 7. 8.]]]
input shape: (1, 3, 3)

hidden state      :
[[[-0.96983844  0.5901406  -0.53735435]
  [-1.          0.02622417  0.9913523 ]
  [-1.          0.05540484  0.99913245]]]
hidden state shape: (1, 3, 3)

last hidden state      :
[[-1.          0.05540484  0.99913245]]
last hidden state shape: (1, 3)


### Advanced Example: Predict the Next Word

In [None]:
# Define example sentences for training
sentences = ['저는 지금 배가 너무 고파요',
             '오늘은 배를 채우러 갈 시간이에요',
             '지금 점심을 먹으러 가고 싶어요',
             '오늘은 점심을 먹으러 갈 거에요',
             '배가 고프니까 저는 점심을 먹을래요',
             '오늘은 내가 짜파게티 요리사',
             '내가 만든 점심을 누가 먹고 싶나요',
             '배가 고프면 먹으러 갈까요 점심을',
             '저는 점심 시간이 너무 좋아요',
             '저는 짜파게티를 점심으로 먹고 싶어']

In [None]:
""" TODO: Make vocabulary using train data """

# Define tokenizer and make vocab
tokenizer = Tokenizer()
tokenizer.fit_on_texts(sentences)
vocab_size = len(tokenizer.word_index) + 1

print('vocab size: {}'.format(vocab_size))
print('all tokens in vocab: {}'.format(tokenizer.word_index))

vocab size: 33
all tokens in vocab: {'점심을': 1, '저는': 2, '배가': 3, '오늘은': 4, '먹으러': 5, '지금': 6, '너무': 7, '갈': 8, '내가': 9, '먹고': 10, '고파요': 11, '배를': 12, '채우러': 13, '시간이에요': 14, '가고': 15, '싶어요': 16, '거에요': 17, '고프니까': 18, '먹을래요': 19, '짜파게티': 20, '요리사': 21, '만든': 22, '누가': 23, '싶나요': 24, '고프면': 25, '갈까요': 26, '점심': 27, '시간이': 28, '좋아요': 29, '짜파게티를': 30, '점심으로': 31, '싶어': 32}


In [None]:
# Tokenize sentence (example)
sentence = sentences[0]
encoded  = tokenizer.texts_to_sequences([sentence])[0]

print('origin sentence: {}'.format(sentence))
print('encoded sentence: {}'.format(encoded))

origin sentence: 저는 지금 배가 너무 고파요
encoded sentence: [2, 6, 3, 7, 11]


In [None]:
# Make train data
sequences = list()
for line in sentences:
  encoded = tokenizer.texts_to_sequences([line])[0]
  for i in range(1, len(encoded)):
    sequence = encoded[:i+1]
    sequences.append(sequence)

print('how many training samples: {}'.format(len(sequences)))
print('training samples: {}'.format(sequences))

how many training samples: 40
training samples: [[2, 6], [2, 6, 3], [2, 6, 3, 7], [2, 6, 3, 7, 11], [4, 12], [4, 12, 13], [4, 12, 13, 8], [4, 12, 13, 8, 14], [6, 1], [6, 1, 5], [6, 1, 5, 15], [6, 1, 5, 15, 16], [4, 1], [4, 1, 5], [4, 1, 5, 8], [4, 1, 5, 8, 17], [3, 18], [3, 18, 2], [3, 18, 2, 1], [3, 18, 2, 1, 19], [4, 9], [4, 9, 20], [4, 9, 20, 21], [9, 22], [9, 22, 1], [9, 22, 1, 23], [9, 22, 1, 23, 10], [9, 22, 1, 23, 10, 24], [3, 25], [3, 25, 5], [3, 25, 5, 26], [3, 25, 5, 26, 1], [2, 27], [2, 27, 28], [2, 27, 28, 7], [2, 27, 28, 7, 29], [2, 30], [2, 30, 31], [2, 30, 31, 10], [2, 30, 31, 10, 32]]


In [None]:
""" TODO: Pad all train data with max length """

# Pad all train data
before_sequences = sequences
max_len = max(len(s) for s in sequences)
sequences = pad_sequences(sequences, maxlen=max_len, padding='pre')

print('max length of sequences: {}'.format(max_len))
print('before padding: {}'.format(before_sequences[0]))
print('after padding: {}'.format(sequences[0]))

max length of sequences: 6
before padding: [2, 6]
after padding: [0 0 0 0 2 6]


In [None]:
# Make labels for train data
sequences = np.array(sequences)
X = sequences[:, :-1]
y = sequences[:, -1]

print('input example: {}'.format(X[0]))
print('output example: {}'.format(y[0]))

input example: [0 0 0 0 2]
output example: 6


In [None]:
""" TODO: Convert labels to one-hot vectors using to_categorical() """

# Convert labels to one-hot vectors
before_y = y
y = to_categorical(y, num_classes=vocab_size)

print('before one-hot encoding: {}'.format(before_y[0]))
print('after one-hot encoding: {}'.format(y[0]))

before one-hot encoding: 6
after one-hot encoding: [0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [None]:
""" TODO: Change hyper-paremeters by your own """

# Set hyper-parameters
embedding_dim = 10
hidden_units  = 32

# Make model using RNN layer
model = Sequential()
model.add(Embedding(vocab_size, embedding_dim))
model.add(SimpleRNN(hidden_units))
model.add(Dense(vocab_size, activation='softmax'))

# Train model
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X, y, epochs=200, verbose=1)

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

<keras.callbacks.History at 0x7f1d90cacd30>

In [None]:
# Generate sentence using my RNN model
def generate_sent(model, tokenizer, max_len, current_word, n):
  init_word = current_word
  sent = ''

  for _ in range(n):
    # Tokenize and pad for current word 
    encoded = tokenizer.texts_to_sequences([current_word])[0]
    encoded = pad_sequences([encoded], maxlen=max_len, padding='pre')

    # Predict next word using current word
    result = model.predict(encoded, verbose=0)
    result = np.argmax(result, axis=1)

    # Match predicted word with vocab's word
    for word, idx in tokenizer.word_index.items():
      if idx == result: break

    # Add word into final results
    current_word = current_word + ' ' + word
    sent = sent + ' ' + word

  sent = init_word + sent
  return sent

In [None]:
generate_sent(model, tokenizer, max_len, '나는', 4)

'나는 먹으러 먹으러 먹으러 점심을'

In [None]:
generate_sent(model, tokenizer, max_len, '짜파게티', 3)

'짜파게티 먹으러 먹으러 점심을'

In [None]:
generate_sent(model, tokenizer, max_len, '지금', 5)

'지금 점심을 먹으러 가고 싶어요 점심을'