# Addition_Subtraction_rnn

In [None]:
from __future__ import print_function
from keras.models import Sequential
from keras import layers
import numpy as np
from six.moves import range

## Data Representation

In [None]:
class CharacterTable(object):
    """Given a set of characters:
    + Encode them to a one-hot integer representation
    + Decode the one-hot or integer representation to their character output
    + Decode a vector of probabilities to their character output
    """
    def __init__(self, chars):
        """Initialize character table.
        # Arguments
            chars: Characters that can appear in the input.
        """
        self.chars = sorted(set(chars))
        self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
        self.indices_char = dict((i, c) for i, c in enumerate(self.chars))

    def encode(self, C, num_rows):
        """One-hot encode given string C.
        # Arguments
            C: string, to be encoded.
            num_rows: Number of rows in the returned one-hot encoding. This is
                used to keep the # of rows for each data the same.
        """
        x = np.zeros((num_rows, len(self.chars)))
        for i, c in enumerate(C):
            x[i, self.char_indices[c]] = 1
            
        #print(x)
        return x

    def decode(self, x, calc_argmax=True):
        """Decode the given vector or 2D array to their character output.
        # Arguments
            x: A vector or a 2D array of probabilities or one-hot representations;
                or a vector of character indices (used with `calc_argmax=False`).
            calc_argmax: Whether to find the character index with maximum
                probability, defaults to `True`.
        """
        if calc_argmax:
            x = x.argmax(axis=-1)
        return ''.join(self.indices_char[x] for x in x)

In [None]:
class colors:
    ok = '\033[92m'
    fail = '\033[91m'
    close = '\033[0m'

In [None]:
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = True

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS + 1 + DIGITS

# All the numbers, plus sign and space for padding.
chars = '0123456789+- '
ctable = CharacterTable(chars)
print(ctable)

## Data Generation

In [None]:
def Generate_Data(DIGITS = 3, DATA_SIZE = 50000, MAXLEN):
    questions = []
    expected = []
    seen = set()
    print('Generating data...')
    while len(questions) < TRAINING_SIZE / 2:    ## the amount of generated data is TRAINING_SIZE
        f = lambda: int(''.join(np.random.choice(list('0123456789'))
                        for i in range(np.random.randint(1, DIGITS + 1))))
        a, b, c = f(), f(), f()

        if(a+b>=c):   #####
            # Skip any addition questions we've already seen
            # Also skip any such that x+Y == Y+x (hence the sorting).
            key = tuple(sorted((a, b, c)))
            if key in seen:
                continue
            seen.add(key)
            # Pad the data with spaces such that it is always MAXLEN.
            q = '{}+{}-{}'.format(a, b, c)    #####
            query = q + ' ' * (MAXLEN - len(q))
            ans = str(a + b - c)   #####
            # Answers can be of maximum size DIGITS + 1.
            ans += ' ' * (DIGITS + 1 - len(ans))
            if REVERSE:
                # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
                # space used for padding.)
                query = query[::-1]
            questions.append(query)
            expected.append(ans)

    while len(questions) >= TRAINING_SIZE / 2 and len(questions) < TRAINING_SIZE:    ## the amount of generated data is TRAINING_SIZE
        f = lambda: int(''.join(np.random.choice(list('0123456789'))
                        for i in range(np.random.randint(1, DIGITS + 1))))
        a, b, c = f(), f(), f()

        if(a>b):   #####
            # Skip any addition questions we've already seen
            # Also skip any such that x+Y == Y+x (hence the sorting).
            key = tuple(sorted((a, b, c)))
            if key in seen:
                continue
            seen.add(key)
            # Pad the data with spaces such that it is always MAXLEN.
            q = '{}-{}+{}'.format(a, b, c)    #####
            query = q + ' ' * (MAXLEN - len(q))
            ans = str(a - b + c)   #####
            # Answers can be of maximum size DIGITS + 1.
            ans += ' ' * (DIGITS + 1 - len(ans))
            if REVERSE:
                # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
                # space used for padding.)
                query = query[::-1]
            questions.append(query)
            expected.append(ans)
    print('Total addition questions:', len(questions))
    
    return questions, expected

## Feature Engineering

In [None]:
def Vectorization(questions, expected):
    print('Vectorization...')
    x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool)
    y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool)
    for i, sentence in enumerate(questions):
        x[i] = ctable.encode(sentence, MAXLEN)
    for i, sentence in enumerate(expected):
        y[i] = ctable.encode(sentence, DIGITS + 1)

    return x, y

## Get Training Data and Validation Data
* total data = 50000, 45000 for training, 5000 for validation

In [None]:
questions, expected = Generate_Data(DIGITS = DIGITS, DATA_SIZE = TRAINING_SIZE, MAXLEN = MAXLEN)
x, y = Vectorization(questions, expected)

# Shuffle (x, y) in unison as the later parts of x will almost all be larger
# digits.
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

## Build Model

In [None]:
def Build_Model(RNN, HIDDEN_SIZE, BATCH_SIZE, LAYERS, MAXLEN, DIGITS, chars):
    print('Build model...')
    model = Sequential()
    
    model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars))))
    model.add(layers.Dense((DIGITS + 1) * 512))
    model.add(layers.Reshape(((DIGITS + 1), 512)))
    model.add(RNN(HIDDEN_SIZE, return_sequences=True))
    model.add(layers.TimeDistributed(layers.Dense(len(chars), activation='softmax')))

    model.compile(loss='categorical_crossentropy',
                  optimizer='adam',
                  metrics=['accuracy'])
    model.summary()
    
    return model

In [None]:
def Train_Step(model, epoch, REVERSE = True):
    for iteration in range(1, epoch):
        print()
        print('-' * 50)
        print('Iteration', iteration)
        model.fit(x_train, y_train,
                  batch_size=BATCH_SIZE,
                  epochs=1,
                  validation_data=(x_val, y_val))
        # Select 10 samples from the validation set at random so we can visualize
        # errors.
        for i in range(10):
            ind = np.random.randint(0, len(x_val))
            rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
            preds = model.predict_classes(rowx, verbose=0)
            q = ctable.decode(rowx[0])
            correct = ctable.decode(rowy[0])
            guess = ctable.decode(preds[0], calc_argmax=False)
            print('Q', q[::-1] if REVERSE else q, end=' ')
            print('T', correct, end=' ')
            if correct == guess:
                print(colors.ok + '☑' + colors.close, end=' ')
            else:
                print(colors.fail + '☒' + colors.close, end=' ')
            print(guess)

## String Matching
* training 300 epochs

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 128
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 300)

## Testing

In [None]:
## Generate Testing Data
questions_test, expected_test = Generate_Data(DIGITS = DIGITS, DATA_SIZE = 1000, MAXLEN = MAXLEN)
x_test, y_test = Vectorization(questions_test, expected_test)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

## Result
### * 利用LSTM做encoder-decoder的seq2seq架構來實現加減混合
### * Training data總數為50000筆，45000為training，5000為validation
### * Batch size = 128
### * RNN hidden layer = 512
### * Optimizer使用adam
### * Epochs = 300
### * Validation result可達99.97%
### * Testing data總數為1000筆，Testing accuracy可達 99%

## Other Discussion
* 實驗使用不同的epoch和batch size訓練
* 實驗不同位數的數字
* 實驗"more number subtract"

## 1.1 實驗使用不同的epoch和batch size訓練
* with batch = 64, epoch = 300

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 64
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 300)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

### 1.2 實驗使用不同的epoch和batch size訓練
* with batch = 256, epoch = 300

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 256
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 300)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

### 1.3 實驗使用不同的epoch和batch size訓練
* with batch = 128, epoch = 150

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 128
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 150)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

## 2. 實驗不同位數的數字
* The digits of input number = 4

In [None]:
# Generate different data

# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 4
REVERSE = True

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS + 1 + DIGITS

# All the numbers, plus sign and space for padding.
chars = '0123456789+- '
ctable = CharacterTable(chars)
print(ctable)

questions, expected = Generate_Data(DIGITS = DIGITS, DATA_SIZE = TRAINING_SIZE, MAXLEN = MAXLEN)
x, y = Vectorization(questions, expected)

# Shuffle (x, y) in unison as the later parts of x will almost all be larger
# digits.
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 128
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 300)

In [None]:
## Generate Testing Data
questions_test, expected_test = Generate_Data(DIGITS = DIGITS, DATA_SIZE = 1000, MAXLEN = MAXLEN)
x_test, y_test = Vectorization(questions_test, expected_test)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

## 3. 三個數字相加相減
* Add/subtract 3 number (3 digits) together

In [None]:
def Generate_OtherData(DIGITS = 3, DATA_SIZE = 50000, MAXLEN):
    questions = []
    expected = []
    seen = set()
    print('Generating data...')
    while len(questions) < DATA_SIZE / 2:
        f = lambda: int(''.join(np.random.choice(list('0123456789'))
                        for i in range(np.random.randint(1, DIGITS + 1))))
        a, b, c = f(), f(), f()
        # Skip any addition questions we've already seen
        # Also skip any such that x+Y == Y+x (hence the sorting).
        if((a+b)>=c):
            key = tuple(sorted((a, b, c)))
            if key in seen:
                continue
            seen.add(key)
            # Pad the data with spaces such that it is always MAXLEN.
            q = '{}+{}-{}'.format(a, b, c)
            query = q + ' ' * (MAXLEN - len(q))
            ans = str(a + b - c)
            # Answers can be of maximum size DIGITS + 1.
            ans += ' ' * (DIGITS + 1 - len(ans))
            if REVERSE:
                # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
                # space used for padding.)
                query = query[::-1]
            questions.append(query)
            expected.append(ans)
            
    while len(questions) >= TRAINING_SIZE / 2 and len(questions) < TRAINING_SIZE:    ## the amount of generated data is TRAINING_SIZE
    f = lambda: int(''.join(np.random.choice(list('0123456789'))
                    for i in range(np.random.randint(1, DIGITS + 1))))
    a, b, c = f(), f(), f()

    if(a>b):   #####
        # Skip any addition questions we've already seen
        # Also skip any such that x+Y == Y+x (hence the sorting).
        key = tuple(sorted((a, b, c)))
        if key in seen:
            continue
        seen.add(key)
        # Pad the data with spaces such that it is always MAXLEN.
        q = '{}-{}+{}'.format(a, b, c)    #####
        query = q + ' ' * (MAXLEN - len(q))
        ans = str(a - b + c)   #####
        # Answers can be of maximum size DIGITS + 1.
        ans += ' ' * (DIGITS + 1 - len(ans))
        if REVERSE:
            # Reverse the query, e.g., '12+345  ' becomes '  543+21'. (Note the
            # space used for padding.)
            query = query[::-1]
        questions.append(query)
        expected.append(ans)
    print('Total addition questions:', len(questions))
    
    return questions, expected

In [None]:
# Generate different data

# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 3
REVERSE = True

# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS + 1 + DIGITS

# All the numbers, plus sign and space for padding.
chars = '0123456789+- '
ctable = CharacterTable(chars)
print(ctable)

questions, expected = Generate_OtherData(DIGITS = DIGITS, DATA_SIZE = TRAINING_SIZE, MAXLEN = MAXLEN)
x, y = Vectorization(questions, expected)

# Shuffle (x, y) in unison as the later parts of x will almost all be larger
# digits.
indices = np.arange(len(y))
np.random.shuffle(indices)
x = x[indices]
y = y[indices]

# Explicitly set apart 10% for validation data that we never train over.
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]

print('Training Data:')
print(x_train.shape)
print(y_train.shape)

print('Validation Data:')
print(x_val.shape)
print(y_val.shape)

In [None]:
RNN = layers.LSTM
HIDDEN_SIZE = 512   #128
BATCH_SIZE = 128
LAYERS = 1

model = Build_Model(RNN = RNN, HIDDEN_SIZE = HIDDEN_SIZE, BATCH_SIZE = BATCH_SIZE, LAYERS = LAYERS, MAXLEN = MAXLEN, DIGITS = DIGITS, chars = chars)

In [None]:
Train_Step(model, epoch = 300)

In [None]:
## Generate Testing Data
questions_test, expected_test = Generate_OtherData(DIGITS = DIGITS, DATA_SIZE = 1000, MAXLEN = MAXLEN)
x_test, y_test = Vectorization(questions_test, expected_test)

In [None]:
## Testing
count_correct = 0
preds = model.predict_classes(x_test, verbose=0)
print("Visualize 10 Data (Total Testing Data = 1000)")
for i in range(1000):
    q = ctable.decode(x_test[i])
    correct = ctable.decode(y_test[i])
    guess = ctable.decode(preds[i], calc_argmax=False)
    if correct == guess:
        count_correct += 1
    if i < 10:
        print('Q', q[::-1] if REVERSE else q, end=' ')
        print('T', correct, end=' ')
        if correct == guess:
            print(colors.ok + '☑' + colors.close, end=' ')
        else:
            print(colors.fail + '☒' + colors.close, end=' ')
        print(guess)
    
print("Testing Accuracy : ",(float)(count_correct / len(x_test)))

# 總結
## 在Addition_Subtraction_rnn中採用lstm實現加減混合
## 實驗:
### 1. 採用四種不同的batch size和訓練epochs (兩個三位數相加相減)
###     * [ batch size = 128, epoch = 300 ] : Validation acc = 99.98% / Testing acc (1000 testing data) = 0.99
###     * [ batch size = 64,   epoch = 300 ] : Validation acc = 99.98% / Testing acc (1000 testing data) = 1.0
###     * [ batch size = 256, epoch = 300 ] : Validation acc = 99.97% / Testing acc (1000 testing data) = 1.0
###     * [ batch size = 128, epoch = 150 ] : Validation acc = 99.97% / Testing acc (1000 testing data) = 1.0
### 2. 兩個四位數相加相減
###     * [ batch size = 128, epoch = 300 ] : Validation acc = 99.58% / Testing acc (1000 testing data) = 0.96
### 3. 三個三位數相加相減
###     * [ batch size = 128, epoch = 300 ] : Validation acc = 99.24% / Testing acc (1000 testing data) = 0.98 