## 1. 导入相关包

In [1]:
import tensorflow as tf
tfe = tf.contrib.eager
# 启动eager execution模式
tf.enable_eager_execution()

import numpy as np
import matplotlib.pyplot as plt
import time

## 2. 设置超参数

In [2]:
# 总时间步长 = 图像行数
TIME_STEPS = 28
# 输入维度  = 图像列数
INPUT_SIZE = 28
# 隐含状态单元数
HIDDEN_SIZE = 100
# 输出维度  = 图像类别数
OUTPUT_SIZE = 10

# Batch size
BATCH_SIZE = 50
# epoches数
NUM_EPOCH = 1

# 学习率
LEARNING_RATE = 0.001

## 3. 准备数据

### 3.1. 导入数据集

In [3]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(path='mnist.npz')

print('Training set:')
print('       Data :\t shape:', np.shape(x_train), '\t type:', x_train.dtype)
print('       Label:\t shape:', np.shape(y_train), '\t\t type:', y_train.dtype)
print('Testing set :')
print('       Data :\t shape:', np.shape(x_test), '\t type:', x_test.dtype)
print('       Label:\t shape:', np.shape(y_test), '\t\t type:', y_test.dtype)

Training set:
       Data :	 shape: (60000, 28, 28) 	 type: uint8
       Label:	 shape: (60000,) 		 type: uint8
Testing set :
       Data :	 shape: (10000, 28, 28) 	 type: uint8
       Label:	 shape: (10000,) 		 type: uint8


### 3.2. 数据预处理

In [4]:
# 转化为 float 型并归一化
x_train = x_train.astype(np.float32)/255
x_test = x_test.astype(np.float32)/255
print(np.shape(x_train), np.shape(x_test))

# 标签转化为 ont hot 向量
y_train = tf.one_hot(y_train, 10, dtype=tf.float32)
y_test = tf.one_hot(y_test, 10, dtype=tf.float32)
print(y_train, y_test)

(60000, 28, 28) (10000, 28, 28)
tf.Tensor(
[[0. 0. 0. ... 0. 0. 0.]
 [1. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 1. 0.]], shape=(60000, 10), dtype=float32) tf.Tensor(
[[0. 0. 0. ... 1. 0. 0.]
 [0. 0. 1. ... 0. 0. 0.]
 [0. 1. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]], shape=(10000, 10), dtype=float32)


### 3.3. 生成Dataset对象

## 4. 创建网络

### 4.1. 创建LSTM

In [33]:
class Model(tf.keras.Model):
    def __init__(self, hidden_size, output_size):
        super(Model, self).__init__()
        self.units = hidden_size
        
        # 如果有GPU资源, 使用 'tf.contrib.cudnn_rnn.CudnnLSTM'
        self.lstm = tf.nn.rnn_cell.LSTMCell(num_units = hidden_size,   # units = HIDDEN_SIZE
                                            forget_bias=1.0,
                                            name='basic_lstm_cell'
                                            )

        self.fc = tf.keras.layers.Dense(output_size)
        # 如果需要输出概率，添加：
        # self.softmax = tf.keras.layers.Softmax()

    def call(self, inputs):
        # output shape == (BATCH_SIZE, HIDDEN_SIZE) 
        outputs, final_state = tf.nn.dynamic_rnn(self.lstm, inputs,dtype=tf.float32)
        # final_state = (c_t, h_t)
        
        # output shape after the dense layer == (seq_length * batch_size, vocab_size)
        prediction = self.fc(final_state[1])
        
        # 如果需要输出概率，添加：
        #     prediction = self.softmax(fc)
        return prediction

### 4.2. 损失函数

In [30]:
def Loss(y_pred, y_true):
    '''
    Input:
        y_pred - [BATCH_SIZE, NUM_CLASS]
        y      - [BATCH_SIZE, NUM_CLASS]
    '''    
    # 如果标签不是one-hot向量，使用sparse_softmax_cross_entropy
    return tf.losses.softmax_cross_entropy(onehot_labels=y_true, logits=y_pred)

### 4.3. 评估函数

In [8]:
def Accuracy(y_pred, y_true):
    '''
    Input:
        y_pred - [BATCH_SIZE, NUM_CLASS]
        y      - [BATCH_SIZE, NUM_CLASS]
    '''
    accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(y_true,1), tf.argmax(y_pred,1)),tf.float32))
    return accuracy

### 4.4. 优化器

In [9]:
# 使用adam优化器，默认参数
optimizer = tf.train.AdamOptimizer(LEARNING_RATE)

### 4.5. 训练

In [11]:
model = Model(HIDDEN_SIZE, OUTPUT_SIZE)

# 开始训练
for epoch in range(NUM_EPOCH):
    start = time.time()
    
    # 在每个epoch开始时初始化隐藏状态，一开始为None
    hidden = model.reset_states()
    
    for (batch, (x, y)) in enumerate(TrainDataset):
        with tf.GradientTape() as tape:
            # 前向传播
            y_pred = model(x)
            loss = Loss(y_pred, y)
        # 反向传播
        grads = tape.gradient(loss, model.variables)
        optimizer.apply_gradients(zip(grads, model.variables))

        if batch % 500 == 0:
            print ('Epoch {:2d} - Batch {:5d} Loss {:.4f}'.format(epoch+1,
                                                          batch,
                                                          loss))
           
    print ('Epoch {:2d} Loss {:.4f}'.format(epoch+1, loss))
    print ('Time taken for 1 epoch {} sec'.format(time.time() - start))
    
    
    # Testing at the end of every epoch
    y_pred = model(x_test)
    cost = Loss(y_pred, y_test)
    accuracy = Accuracy(y_pred, y_test)
    print('Test Cost: ', np.mean(cost))
    print('Test Accuracy: ', np.mean(accuracy))

Epoch  1 - Batch     0 Loss 2.2991
Epoch  1 - Batch   500 Loss 0.4631
Epoch  1 - Batch  1000 Loss 0.1054
Epoch  1 Loss 0.0913
Time taken for 1 epoch 126.14080810546875 sec
Test Cost:  0.1692553
Test Accuracy:  0.9489
