## 项目说明

光学不练，假把式；光练不写，傻把式。计算机二进制加法，从低位加到高位，高位的结果受低位运算结果的影响。也就是说，需要有“记忆”，记住前面的运算结果。因此，RNN 应该适合“学习”二进制的加法。

### 软件版本

* Python 3.5
* TensorFlow 1.4

### 数据集说明

随机产生两个整数 a 和 b，计算它们的和 c。这两个整数的二进制序列 A_0 和 B_0 作为输入数据，和的二进制序列作为输出标签 C_0。根据这个方法，随机生成若干数据，分成训练数据和测试数据。

二进制的序列做加法时，是从右往左进行加法；而序列的正常顺序是从左往右。因此，在 RNN “学习”之前，需要对二进制序列进行左右翻转。这里使用 `np.flip(arr, axis=None)` 函数进行操作。

#### 数据维度说明

RNN 单次运算，输入数据为 [a[i], b[i]]， 输出数据为 d[i]，本案例中的二进制序列为 8 位。使用 TensorFlow 时，输入张量为 [batch_size, 8, 2]，输出张量为  [batch_size, 8]。


## TensorFlow RNN 的正确打开方式

参看 [TensorFlow中RNN实现的正确打开方式](https://zhuanlan.zhihu.com/p/28196873)，采用 TensorFlow 封装好的 RNN 模块时，使用起来基本可以是这个套路。

TensorFlow debug 是很困难的，经过一天的各种报错折磨，终于跑通了代码。这里进行一些经验总结。

* 使用 RNN 模块时，需要指定 RNN cell 的记忆单元的数量。而记忆单元的数量，需要和输入数据张量的表示数量的第一维度相等。一般用 batch_size 表示这个维度，这就意味着，这里定义的计算图的输入张量的维度时固定的，必须是一个一个的 batch_size 的数据喂进去。这就使得学习过程变得很呆板了。（应该有解决办法，TODO）
* TensorFlow 进行运算时，对数据类型有要求。很多运算函数，要求输入变量是相同的类型。这个时候，可以使用 `tf.cast()` 来强行转换类型，如 `X = tf.cast(X, tf.float32)`

In [1]:
import numpy as np
import tensorflow as tf
import math
from tensorflow.contrib import rnn as rnn_cell

### 生成数据

In [2]:
def gen_binary_seq(binary_dim):
    int2binary = {}
    max_number = pow(2, binary_dim)
    int_list = range(max_number)
    binary = np.unpackbits(
        np.array([int_list], dtype=np.uint8).T, axis=1)
    for i in int_list:
        int2binary[i] = binary[i]
    return int_list, int2binary

def binary2int(binary_seq):
    value = 0
    for i, bit in enumerate(reversed(binary_seq)):
        value += bit * np.power(2, i)
    return value

def gen_data(n_samples, int_list, int2binary, seed=None):
    X, y = [], []
    if seed:
        np.random.seed(seed)
    for i in range(n_samples):
        a = np.random.choice(int_list[:len(int_list)//2])
        b = np.random.choice(int_list[:len(int_list)//2])
        c = a + b
        X.append([int2binary[a], int2binary[b]])
        y.append(int2binary[c])
    return np.array(X), np.array(y)


In [3]:
def shuffle_data(X, y, seed=None):
    """ Random shuffle of the samples in X and y """
    idx = np.arange(X.shape[0])
    if seed:
        np.random.seed(seed)
    np.random.shuffle(idx)
    return X[idx], y[idx]


def train_test_split(X, y, test_size=0.5, shuffle=True, seed=None):
    """ Split the data into train and test sets """
    if shuffle:
        X, y = shuffle_data(X, y, seed)
    split_id = int(math.ceil(X.shape[0] * (1 - test_size)))
    X_train, X_test = X[:split_id], X[split_id:]
    y_train, y_test = y[:split_id], y[split_id:]
    return X_train, X_test, y_train, y_test

In [4]:
binary_dim = 8
int_list, int2binary = gen_binary_seq(binary_dim)
n_samples = 1000
seed = 0
X, y = gen_data(n_samples, int_list, int2binary, seed)
X = np.flip(X, axis=-1)  # flip
y = np.flip(y, axis=-1)
X = X.transpose((0, 2, 1))  # [n, 2, 8] ==> [n, 8, 2]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, shuffle=True, seed=seed)

### 定义超参数

In [5]:
class HParam(object):
    batch_size = 64
    seq_length = 8
    num_layers = 2
    state_size = 16
    learning_rate = 0.01
args = HParam()

### 定义 RNN 模型

In [6]:
class RNN(object):
    def __init__(self, sess, args, seed=None):
        # initialize neural network weights
        if seed:
            tf.random.seed(seed)
        self.batch_size = args.batch_size
        self.seq_length = args.seq_length
        self.num_layers = args.num_layers
        self.state_size = args.state_size
        self.learning_rate = args.learning_rate
        self.sess = sess

        with tf.name_scope('inputs'):
            self.X = tf.placeholder(tf.float32, [self.batch_size, self.seq_length, 2])
            self.y = tf.placeholder(tf.float32, [self.batch_size, self.seq_length])

        with tf.name_scope('model'):
            self.cell = rnn_cell.BasicRNNCell(num_units=self.state_size)
            def _get_cell(state_size):
                return rnn_cell.BasicRNNCell(num_units=state_size)
            self.cell = rnn_cell.MultiRNNCell([_get_cell(self.state_size) for _ in range(self.num_layers)])
            self.initial_state = self.cell.zero_state(
                self.batch_size, tf.float32)
            outputs, last_state = tf.nn.dynamic_rnn(
                self.cell, self.X, initial_state=self.initial_state)

        with tf.name_scope('loss'):
            weights = tf.Variable(tf.truncated_normal([self.state_size, 1], stddev=0.01))
            bias = tf.zeros([1])
            outputs = tf.reshape(outputs, [-1, self.state_size])
            logits = tf.sigmoid(tf.matmul(outputs, weights) + bias)
            self.predictions = tf.reshape(logits, [-1, binary_dim])
            self.y_pred = tf.round(self.predictions)
            self.cost = tf.losses.mean_squared_error(self.y, self.predictions)
            # targets = tf.reshape(self.y, [-1])
            tf.summary.scalar('loss', self.cost)

        with tf.name_scope('accuracy'):
            correct_prediction = tf.equal(tf.cast(tf.reduce_sum(self.y_pred, axis=1), tf.float32),
                                          tf.cast(tf.reduce_sum(self.y, axis=1), tf.float32))
            self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

        with tf.name_scope('optimizer'):
            self.optimizer = tf.train.AdamOptimizer(self.learning_rate).minimize(self.cost)

    def predict(self, x):
        return self.sess.run(self.y_pred, feed_dict={self.X: x})

    def fit(self, X_train, y_train, n_epochs=10, display_epoch=10):

        self.sess.run(tf.global_variables_initializer())
        for i in range(1, n_epochs+1):
            X_train, y_train = shuffle_data(X_train, y_train)
            for j in range(0, X_train.shape[0], self.batch_size)[:-1]:
                x = X_train[j: j + self.batch_size]
                y = y_train[j: j + self.batch_size]
                loss, acc, _ = self.sess.run([self.cost, self.accuracy, self.optimizer],
                                             feed_dict={self.X: x, self.y: y})
            if i % display_epoch == 0:
                print('eproch {:<3}, training score: {}'.format(i, self.get_accuracy(X_train, y_train)))
        print('-------------------------------')
        print('Training Finished!')

    def get_accuracy(self, X_test, y_test):
        Acc = []
        for j in range(0, X_test.shape[0], self.batch_size)[:-1]:
            x = X_test[j: j + self.batch_size]
            y = y_test[j: j + self.batch_size]
            acc = self.sess.run(self.accuracy, feed_dict={self.X: x, self.y: y})
            Acc.append(acc)
        return sess.run(tf.reduce_mean(Acc))

### 训练

In [7]:
sess = tf.Session()
clf = RNN(sess=sess, args=args, seed=seed)
clf.fit(X_train, y_train, n_epochs=20, display_epoch=2)

eproch 2  , training score: 0.16875000298023224
eproch 4  , training score: 0.15937499701976776
eproch 6  , training score: 0.18125000596046448
eproch 8  , training score: 0.996874988079071
eproch 10 , training score: 1.0
eproch 12 , training score: 1.0
eproch 14 , training score: 1.0
eproch 16 , training score: 1.0
eproch 18 , training score: 1.0
eproch 20 , training score: 1.0
-------------------------------
Training Finished!


### 测试

In [8]:
print('\nTest accuray: {:.4g} %'.format(clf.get_accuracy(X_test, y_test) * 100))


Test accuray: 100 %


In [9]:
### 验证

In [11]:
y_pred = clf.predict(X_test[0: clf.batch_size])
y_pred = sess.run(clf.y_pred, feed_dict={clf.X: X_test[0: clf.batch_size]})
for i in range(5):
    x_1_binary = np.flip(X_test[i][:, 0], axis=0)
    x_2_binary = np.flip(X_test[i][:, 1], axis=0)
    y_sample_binary = np.flip(y_test[i], axis=0)
    y_pred_binary = np.flip(y_pred[i], axis=0).astype(np.int32)
    print('True binary:', str(y_sample_binary))
    print('Pred binary:', str(y_pred_binary))
    a, b, c, d = map(binary2int, [x_1_binary, x_2_binary, y_pred_binary, y_sample_binary])
    print('guess {} + {} ==> {}, right answer == {}'.format(a, b, c, d))
    print('-------------------------------')

True binary: [1 0 1 0 0 0 0 0]
Pred binary: [1 0 1 0 0 0 0 0]
guess 86 + 74 ==> 160, right answer == 160
-------------------------------
True binary: [0 1 0 0 1 1 0 1]
Pred binary: [0 1 0 0 1 1 0 1]
guess 12 + 65 ==> 77, right answer == 77
-------------------------------
True binary: [1 1 1 1 0 0 0 0]
Pred binary: [1 1 1 1 0 0 0 0]
guess 117 + 123 ==> 240, right answer == 240
-------------------------------
True binary: [1 0 0 1 0 1 1 1]
Pred binary: [1 0 0 1 0 1 1 1]
guess 28 + 123 ==> 151, right answer == 151
-------------------------------
True binary: [0 1 1 0 1 0 1 0]
Pred binary: [0 1 1 0 1 0 1 0]
guess 30 + 76 ==> 106, right answer == 106
-------------------------------
