## Layer definition

In [1]:
from keras import backend as K
import tensorflow as tf
from keras.engine.topology import Layer
import numpy as np
from keras.utils import conv_utils


class CapsulePooling2D(Layer):
    def __init__(self, pool_size, num_steps, **kwargs):
        self.pool_size = pool_size
        self.strides = pool_size
        self.num_steps = num_steps
        super(CapsulePooling2D, self).__init__(**kwargs)

    def get_config(self):
        config = {
            'pool_size': self.pool_size,
            'num_steps': self.num_steps
        }
        base_config = super(CapsulePooling2D, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

    def build(self, input_shape):
        super(CapsulePooling2D, self).build(input_shape)

    @staticmethod
    def squash_fm(fm):
        v_lenght = tf.norm(fm, axis=-1, keep_dims=True)
        v_lenght = v_lenght + K.epsilon()
        fm = fm / (1 + v_lenght)
        return fm

    @staticmethod
    def normalize_pool_map(score_map, pool_size=(2, 2), strides=(2, 2), temperature=1):
        # compute safe softmax by pool operation
        padding = 'same'
        input_shape = K.int_shape(score_map)

        pooled_max_map = K.pool2d(
            score_map, pool_size=pool_size, strides=strides, pool_mode='max',
            padding=padding
        )
        max_map = K.resize_images(
            pooled_max_map,
            pool_size[0],
            pool_size[1],
            data_format='channels_last'
        )

        max_map = max_map[:, :input_shape[1], :input_shape[2], :]
        max_map = K.stop_gradient(max_map)
        score_map = score_map - max_map
        score_map = tf.exp(score_map / temperature)

        pooled_score_map = K.pool2d(
            score_map, pool_size=pool_size, strides=strides, pool_mode='avg',
            padding=padding
        )

        avg_score_map = K.resize_images(
            pooled_score_map,
            pool_size[0],
            pool_size[1],
            data_format='channels_last'
        )
        # fit caps columns to feature map
        avg_score_map = avg_score_map[:, :input_shape[1], :input_shape[2], :]

        return score_map / (avg_score_map + K.epsilon())

    @staticmethod
    def routing_step(feature_map, pool_size, strides, score_map):

        # Compute average vector
        if score_map is None:
            # in first iteration we cannot compute weights average
            # just take input feature map 
            weighted_fm = feature_map
        else:
            # normalize scores
            normalized_scores = CapsulePooling2D.normalize_pool_map(
                score_map, pool_size=pool_size, strides=strides, temperature=2
            )
            weighted_fm = normalized_scores * feature_map
            

        avg_feature_map = K.pool2d(
            weighted_fm,
            pool_size=pool_size,
            strides=strides,
            pool_mode='avg', padding='same'
        )

        avg_feature_map = CapsulePooling2D.squash_fm(avg_feature_map)
        avg_feature_columns = K.resize_images(
            avg_feature_map,
            pool_size[0],
            pool_size[1],
            data_format='channels_last'
        )
        # fit caps columns to feature map
        input_shape = K.int_shape(feature_map)
        avg_feature_columns = avg_feature_columns[
                              :, :input_shape[1], :input_shape[2], :input_shape[3]]

        # compute V * Vs
        new_score_map = K.sum(feature_map * avg_feature_columns, -1, keepdims=True)

        if score_map is None:
            score_map = new_score_map
        else:
            score_map = score_map + new_score_map

        return score_map, weighted_fm

    @staticmethod
    def routing_pool(feature_map, pool_size, strides, num_steps):

        score_map = None
        steps_history = []
        weighted_fm = None
        for i in range(num_steps):
            score_map, weighted_fm = CapsulePooling2D.routing_step(
                feature_map,
                pool_size=pool_size,
                strides=strides,
                score_map=score_map
            )
            steps_history.append(score_map)

        output_feature_map = K.pool2d(
            weighted_fm,
            pool_size=pool_size,
            strides=strides,
            pool_mode='avg', padding='same'
        )
        return output_feature_map, steps_history

    def call(self, x):
        pool_size = self.pool_size
        strides = self.strides
        num_steps = self.num_steps
        x, _ = self.routing_pool(x, pool_size, strides, num_steps)
        return x

    def compute_output_shape(self, input_shape):
        padding = 'same'
        rows = input_shape[1]
        cols = input_shape[2]
        rows = conv_utils.conv_output_length(rows, self.pool_size[0],
                                             padding, self.strides[0])
        cols = conv_utils.conv_output_length(cols, self.pool_size[1],
                                             padding, self.strides[1])
        return (input_shape[0], rows, cols, input_shape[3])

Using TensorFlow backend.
  return f(*args, **kwds)


## Load MNIST Dataset

In [2]:
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization
from keras import backend as K

batch_size = 128
num_classes = 10
epochs = 12

# input image dimensions
img_rows, img_cols = 28, 28

# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

x_train shape: (60000, 28, 28, 1)
60000 train samples
10000 test samples


# Train model with CapsulePooling2D

In [3]:
num_routing_steps = 2

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(CapsulePooling2D(pool_size=(3, 3), num_steps=num_routing_steps))
model.add(Dropout(0.4))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(CapsulePooling2D(pool_size=(3, 3), num_steps=num_routing_steps))
model.add(Dropout(0.4))
model.add(Conv2D(256, (3, 3), activation='relu'))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation='softmax'))

model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_1 (Conv2D)            (None, 26, 26, 32)        320       
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 24, 24, 64)        18496     
_________________________________________________________________
batch_normalization_1 (Batch (None, 24, 24, 64)        256       
_________________________________________________________________
capsule_pooling2d_1 (Capsule (None, 8, 8, 64)          0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 8, 8, 64)          0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 8, 8, 128)         73856     
_________________________________________________________________
conv2d_4 (Conv2D)            (None, 8, 8, 128)         147584    
__________

In [4]:
for lr in [0.001, 0.0001, 0.00001]:

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=keras.optimizers.Adam(lr),
                  metrics=['accuracy'])

    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=15,
              verbose=1,
              validation_data=(x_test, y_test))
    score = model.evaluate(x_test, y_test, verbose=0)
    print("LR: ", lr)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    print()

Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  0.001
Test loss: 0.0237082752926
Test accuracy: 0.9935

Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  0.0001
Test loss: 0.0205039734615
Test accuracy: 0.996

Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  1e-05
Test loss: 0.0205864349503
Test accuracy: 0.9962



# Train same model with MaxPooling2D

In [5]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(3, 3), padding='same'))
model.add(Dropout(0.4))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(3, 3), padding='same'))
model.add(Dropout(0.4))
model.add(Conv2D(256, (3, 3), activation='relu'))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(num_classes, activation='softmax'))

model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_6 (Conv2D)            (None, 26, 26, 32)        320       
_________________________________________________________________
conv2d_7 (Conv2D)            (None, 24, 24, 64)        18496     
_________________________________________________________________
batch_normalization_3 (Batch (None, 24, 24, 64)        256       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 8, 8, 64)          0         
_________________________________________________________________
dropout_4 (Dropout)          (None, 8, 8, 64)          0         
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 8, 8, 128)         73856     
_________________________________________________________________
conv2d_9 (Conv2D)            (None, 8, 8, 128)         147584    
__________

In [6]:
for lr in [0.001, 0.0001, 0.00001]:

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=keras.optimizers.Adam(lr),
                  metrics=['accuracy'])

    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=15,
              verbose=1,
              validation_data=(x_test, y_test))
    
    score = model.evaluate(x_test, y_test, verbose=0)
    print("LR: ", lr)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])

Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  0.001
Test loss: 0.029453239757
Test accuracy: 0.9913
Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  0.0001
Test loss: 0.0159289752842
Test accuracy: 0.9964
Train on 60000 samples, validate on 10000 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
LR:  1e-05
Test loss: 0.0174079063178
Test accuracy: 0.9964
