In [None]:
import numpy as np
import os
import time
from keras import layers, models, optimizers, callbacks, regularizers, activations, layers, initializers
from keras import backend as K
from PIL import Image
from keras.layers import Dense, Input, Activation, Reshape, Add
import tensorflow as tf
from keras.layers import Conv2D, BatchNormalization, Activation, Multiply, Dropout

In [None]:
K.set_image_data_format('channels_last')

class Mask(layers.Layer):
    def call(self, inputs, **kwargs):
        if type(inputs) is list:  # true label is provided with shape = [None, n_classes], i.e. one-hot code.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:
            x = K.sqrt(K.sum(K.square(inputs), -1))
            mask = K.one_hot(indices=K.argmax(x, 1), num_classes=x.get_shape().as_list()[1])

        masked = K.batch_flatten(inputs * K.expand_dims(mask, -1))
        return masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  # true label provided
            return tuple([None, input_shape[0][1] * input_shape[0][2]])
        else:  # no true label provided
            return tuple([None, input_shape[1] * input_shape[2]])

    def get_config(self):
        config = super(Mask, self).get_config()
        return config

class PrimaryCap(layers.Layer):
    def __init__(self, n_channels, dim_capsule, decrease_resolution=False, kernel_regularizer=None, **kwargs):
        super(PrimaryCap, self).__init__(**kwargs)

        self.n_channels = n_channels
        self.dim_capsule = dim_capsule
        if decrease_resolution == True:
            self.stride = 2
        else:
            self.stride = 1
        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        
    def build(self, input_shape):
        
        self.input_height = input_shape[1]
        self.input_width = input_shape[2]
        self.input_num_features = input_shape[3]
        
        self.convW_1 = self.add_weight(shape =[3, 3, self.input_num_features, self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='convW_1',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.bias_1 = self.add_weight(shape=[self.dim_capsule*self.n_channels], initializer='zeros', name='bias_1',trainable=True)
        self.CapsAct_W = self.add_weight(shape =[1, 1, self.dim_capsule, self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='CapsAct_W',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.CapsAct_B = self.add_weight(shape=[self.dim_capsule*self.n_channels], initializer='zeros', name='CapsAct_B',trainable=True)
        
        
    def call(self, inputs):
        conv1s = tf.nn.conv2d(inputs, self.convW_1, strides=[1, self.stride, self.stride, 1], padding='SAME')
        conv1s = tf.nn.bias_add(conv1s, self.bias_1)
        conv1s = tf.nn.relu(conv1s)
        conv1s = tf.split(conv1s, self.n_channels, axis=-1)
        
        CapsAct_ws = tf.split(self.CapsAct_W, self.n_channels, axis=-1)
        CapsAct_bs = tf.split(self.CapsAct_B, self.n_channels, axis=-1)
        
        def func(conv1, CapsAct_w, CapsAct_b):
            output = tf.nn.conv2d(conv1, CapsAct_w, strides=[1, 1, 1, 1], padding='SAME')
            output = tf.nn.bias_add(output, CapsAct_b)
            output = tf.expand_dims(output, axis=-1)
            return output
        
        outputs = [func(conv1, CapsAct_w, CapsAct_b) for conv1, CapsAct_w, CapsAct_b in zip(conv1s, CapsAct_ws, CapsAct_bs)]
        outputs = tf.concat(outputs, axis=-1)
        return outputs
        
    def compute_output_shape(self, input_shape):
        return tuple([None, int(self.input_height/self.stride), int(self.input_width/self.stride), self.dim_capsule, self.n_channels])
    
    def get_config(self):
        config = {
            'n_channels': self.n_channels,
            'dim_capsule': self.dim_capsule
        }
        base_config = super(PrimaryCap, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))
    
class ConvCaps(layers.Layer):
    def __init__(self, n_channels, dim_capsule, decrease_resolution=False, kernel_regularizer=None, **kwargs):
        super(ConvCaps, self).__init__(**kwargs)

        self.n_channels = n_channels
        self.dim_capsule = dim_capsule
        if decrease_resolution == True:
            self.stride = 2
            self.padding = 'VALID'
        else:
            self.stride = 1
            self.padding = 'SAME'
        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        
    def build(self, input_shape):
        self.height = input_shape[1]
        self.width = input_shape[2]
        self.input_dim = input_shape[3]
        self.input_ch = input_shape[4]
        
        self.Att_W = self.add_weight(shape=[1, 1, self.dim_capsule, self.input_ch, self.input_ch*self.n_channels],
                                    initializer='glorot_uniform',  name='Att_W',trainable=True)
        self.ConvTrans_W = self.add_weight(shape =[3, 3, self.input_dim, self.input_ch*self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='ConvTrans_W',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.ConvTrans_B = self.add_weight(shape=[self.input_ch*self.dim_capsule*self.n_channels],
                                           initializer='zeros', name='ConvTrans_B',trainable=True)
        self.CapsAct_W = self.add_weight(shape =[1, 1, self.dim_capsule, self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='CapsAct_W',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.CapsAct_B = self.add_weight(shape=[self.dim_capsule*self.n_channels], initializer='zeros', name='CapsAct_B',trainable=True)
        
        
    def call(self, inputs):
        inputs = Dropout(rate=0.5)(inputs)
        input_caps = tf.split(inputs, self.input_ch, axis=-1)
        ConvTrans_ws = tf.split(self.ConvTrans_W, self.input_ch, axis=-1)
        ConvTrans_bs = tf.split(self.ConvTrans_B, self.input_ch, axis=-1)
        
        # Convolutional Transform by 3x3 conv 
        conv1s = [tf.nn.conv2d(tf.squeeze(input_cap, axis=-1), ConvTrans_w, strides=[1, self.stride, self.stride, 1], padding='SAME')
                  for input_cap, ConvTrans_w in zip(input_caps, ConvTrans_ws)]
        conv1s = [tf.reshape(tf.nn.bias_add(conv1, ConvTrans_b), [-1, int(self.height/self.stride), int(self.width/self.stride), self.dim_capsule, self.n_channels, 1])
                  for conv1, ConvTrans_b in zip(conv1s, ConvTrans_bs)]
        conv1s = tf.concat(conv1s, axis=-1) 
        conv1s = tf.transpose(conv1s, [0,1,2,3,5,4])
        
        # Att_inputs shape : (n_ch, batch_sz, h, w, dim_cap, input_ch, 1)
        Att_inputs = tf.split(conv1s, self.n_channels, axis=-1)
        Att_ws = tf.split(self.Att_W, self.n_channels, axis=-1)
        CapsAct_ws = tf.split(self.CapsAct_W, self.n_channels, axis=-1)
        CapsAct_bs = tf.split(self.CapsAct_B, self.n_channels, axis=-1)
        
        def func(conv1, Att_w, CapsAct_w, CapsAct_b) :
            x = tf.squeeze(conv1, axis=-1) #x.shape = (batch_sz, height, width, dim_cap, input_ch)
            
            # Attention Routing
            # attentions shape =(batch_sz, height, width, 1, input_ch)
            attentions = tf.nn.conv3d(x, Att_w, strides=[1, 1, 1, 1, 1], padding='VALID')
            attentions = tf.nn.softmax(attentions, axis=-1)
            final_attentions = Multiply()([x, attentions])
            final_attentions = tf.reduce_sum(final_attentions, axis=-1) #final_attentions.shape = (batch_sz, height, width, dim_cap)
            
            conv3 = tf.nn.conv2d(final_attentions, CapsAct_w, strides=[1, 1, 1, 1], padding='SAME')
            conv3 = tf.nn.bias_add(conv3, CapsAct_b)
            conv3 = tf.expand_dims(conv3, axis=-1)
            return conv3
        
        outputs =  [func(Att_input, Att_w, CapsAct_w, CapsAct_b) for Att_input, Att_w, CapsAct_w, CapsAct_b in 
                   zip(Att_inputs, Att_ws, CapsAct_ws, CapsAct_bs)]
        outputs = tf.concat(outputs, axis=-1)
        return outputs
    
    def compute_output_shape(self, input_shape):
        return tuple([None, int(self.height/self.stride), int(self.width/self.stride), self.dim_capsule, self.n_channels])
    
    def get_config(self):
        config = {
            'n_channels': self.n_channels,
            'dim_capsule': self.dim_capsule
        }
        base_config = super(ConvCaps, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))
    
class FullyConvCaps(layers.Layer):
    def __init__(self, n_channels, dim_capsule, kernel_regularizer=None, **kwargs):
        super(FullyConvCaps, self).__init__(**kwargs)

        self.n_channels = n_channels
        self.dim_capsule = dim_capsule
        self.kernel_regularizer = regularizers.get(kernel_regularizer)
        
    def build(self, input_shape):
        self.height = input_shape[1]
        self.width = input_shape[2]
        self.input_dim = input_shape[3]
        self.input_ch = input_shape[4]
        
        
        self.Att_W = self.add_weight(shape=[1, 1, self.dim_capsule, self.input_ch, self.input_ch*self.n_channels],
                                     initializer='glorot_uniform',  name='Att_W',trainable=True)
        self.ConvTrans_W = self.add_weight(shape =[self.height, self.width, self.input_dim, self.input_ch*self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='ConvTrans_W',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.ConvTrans_B = self.add_weight(shape=[self.input_ch*self.dim_capsule*self.n_channels],
                                           initializer='zeros', name='ConvTrans_B',trainable=True)
        self.CapsAct_W = self.add_weight(shape =[1, 1, self.dim_capsule, self.dim_capsule*self.n_channels],
                                       initializer='glorot_uniform',  name='CapsAct_W',
                                       regularizer=self.kernel_regularizer, trainable=True)
        self.CapsAct_B = self.add_weight(shape=[self.dim_capsule*self.n_channels], initializer='zeros', name='CapsAct_B',trainable=True)
        
        
    def call(self, inputs):
        inputs = Dropout(rate=0.5)(inputs)
        input_caps = tf.split(inputs, self.input_ch, axis=-1)
        ConvTrans_ws = tf.split(self.ConvTrans_W, self.input_ch, axis=-1)
        ConvTrans_bs = tf.split(self.ConvTrans_B, self.input_ch, axis=-1)
                
        # Convolutional Transform by 3x3 conv 
        conv1s = [tf.nn.conv2d(tf.squeeze(input_cap, axis=-1), ConvTrans_w, strides=[1, 1, 1, 1], padding='VALID')
                  for input_cap, ConvTrans_w in zip(input_caps, ConvTrans_ws)]
        conv1s = [tf.reshape(tf.nn.bias_add(conv1, ConvTrans_b), [-1, 1, 1, self.dim_capsule, self.n_channels, 1])
                  for conv1, ConvTrans_b in zip(conv1s, ConvTrans_bs)]
        conv1s = tf.concat(conv1s, axis=-1)
        conv1s = tf.transpose(conv1s, [0,1,2,3,5,4])
        
        # Att_inputs shape : (n_ch, batch_sz, h, w, dim_cap, input_ch, 1)
        Att_inputs = tf.split(conv1s, self.n_channels, axis=-1)
        Att_ws = tf.split(self.Att_W, self.n_channels, axis=-1)
        CapsAct_ws = tf.split(self.CapsAct_W, self.n_channels, axis=-1)
        CapsAct_bs = tf.split(self.CapsAct_B, self.n_channels, axis=-1)
        
            
        def func(conv1, Att_w, CapsAct_w, CapsAct_b) :
            x = tf.squeeze(conv1, axis=-1) #x.shape = (batch_sz, height=1, width=1, dim, input_ch)
            
            # Attention Routing
            # attentions shape =(batch_sz, height, width, 1, input_ch)
            attentions = tf.nn.conv3d(x, Att_w, strides=[1, 1, 1, 1, 1], padding='VALID')
            attentions = tf.nn.softmax(attentions, axis=-1)
            final_attentions = Multiply()([x, attentions])
            final_attentions = tf.reduce_sum(final_attentions, axis=-1) #final_attentions.shape = (batch_sz, height, width, dim)
            
            conv3 = tf.nn.conv2d(final_attentions, CapsAct_w, strides=[1, 1, 1, 1], padding='SAME')
            conv3 = tf.nn.bias_add(conv3, CapsAct_b)
            return conv3
        
        outputs = [func(Att_input, Att_w, CapsAct_w, CapsAct_b) for Att_input, Att_w, CapsAct_w, CapsAct_b in 
                   zip(Att_inputs, Att_ws, CapsAct_ws, CapsAct_bs)]
        outputs = tf.concat(outputs, axis=-1)
        outputs = tf.reshape(outputs, [-1, self.dim_capsule, self.n_channels])
        outputs = tf.transpose(outputs, [0, 2, 1])
        return outputs
    
    def compute_output_shape(self, input_shape):
        return tuple([None, self.n_channels, self.dim_capsule])

    def get_config(self):
        config = {
            'n_channels': self.n_channels,
            'dim_capsule': self.dim_capsule
        }
        base_config = super(FullyConvCaps, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

def Conv2d_bn(input_tensor, filters, kernel_size=3, strides=1, padding='same', activation='relu', kernel_regularizer=None):
    x = Conv2D(filters, kernel_size = kernel_size, strides= strides, padding=padding, activation=None,
              kernel_regularizer=kernel_regularizer)(input_tensor)
    x = BatchNormalization(axis=-1)(x)
    x = Activation(activation)(x)
    return x


class Length(layers.Layer):
    """
    Compute the length of vectors. This is used to compute a Tensor that has the same shape with y_true in margin_loss.
    Using this layer as model's output can directly predict labels by using `y_pred = np.argmax(model.predict(x), 1)`
    inputs: shape=[None, num_vectors, dim_vector]
    output: shape=[None, num_vectors]
    """
    def build(self, input_shape):
        self.dim_capsule = input_shape[-1]
    
    def call(self, inputs, **kwargs):
        normalizing_tensor = tf.constant(np.sqrt(self.dim_capsule), dtype=tf.float32)
        output = K.sqrt(K.sum(K.square(inputs), -1)) / normalizing_tensor
        return output

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

    def get_config(self):
        config = super(Length, self).get_config()
        return config


def margin_loss(y_true, y_pred):
    """
    Margin loss for Eq.(4). When y_true[i, :] contains not just one `1`, this loss should work too. Not test it.
    :param y_true: [None, n_classes]
    :param y_pred: [None, num_capsule]
    :return: a scalar loss value.
    """
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))

    return K.mean(K.sum(L, 1))

In [None]:
K.set_image_data_format('channels_last')

def AR_CapsNet(input_shape, args):
    dim_caps = int(args.dimcaps)
    layernum = int(args.layernum)
    print('layer num : ', layernum)
    print('dim_caps : ', dim_caps)
    
    kernel_regularizer=regularizers.l2(0)
    input_layer = Input(shape=input_shape)
    conv1 = Conv2d_bn(input_tensor = input_layer, filters=32, kernel_size=3, strides=1, padding='same', activation='relu',
                     kernel_regularizer=kernel_regularizer)
    conv1 = Conv2d_bn(input_tensor = conv1, filters=32, kernel_size=3, strides=1, padding='same', activation='relu',
                     kernel_regularizer=kernel_regularizer)
    
    ## Primary Capsules
    primarycaps = PrimaryCap(n_channels=8, dim_capsule=16, decrease_resolution=True, kernel_regularizer=kernel_regularizer)(conv1)
    primarycaps = Activation('tanh')(primarycaps)
    print('primary caps shape : ', primarycaps.shape)
        
    ## Convolutional Capsules
    if layernum == 0:
        out = primarycaps
    elif layernum == 1:
        ConvCaps1 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = True, kernel_regularizer=kernel_regularizer)(primarycaps)
        ConvCaps1 = Activation('tanh')(ConvCaps1)
        print('ConvCaps1 shape : ', ConvCaps1.shape)
        out = ConvCaps1
        
    elif layernum == 2:
        ConvCaps1 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = True, kernel_regularizer=kernel_regularizer)(primarycaps)
        ConvCaps1 = Activation('tanh')(ConvCaps1)
        print('ConvCaps1 shape : ', ConvCaps1.shape)
        
        ConvCaps2 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = False, kernel_regularizer=kernel_regularizer)(ConvCaps1)
        ConvCaps2 = Activation('tanh')(Add()([ConvCaps1 , ConvCaps2]))
        print('ConvCaps2 shape : ', ConvCaps2.shape)
        out = ConvCaps2
        
    elif layernum == 3:
        ConvCaps1 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = True, kernel_regularizer=kernel_regularizer)(primarycaps)
        ConvCaps1 = Activation('tanh')(ConvCaps1)
        print('ConvCaps1 shape : ', ConvCaps1.shape)
        
        ConvCaps2 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = False, kernel_regularizer=kernel_regularizer)(ConvCaps1)
        ConvCaps2 = Activation('tanh')(Add()([ConvCaps1 , ConvCaps2]))
        print('ConvCaps2 shape : ', ConvCaps2.shape)
        
        ConvCaps3 = ConvCaps(n_channels=8, dim_capsule=dim_caps, decrease_resolution = False, kernel_regularizer=kernel_regularizer)(ConvCaps2)
        ConvCaps3 = Activation('tanh')(Add()([ConvCaps2 , ConvCaps3]))
        print('ConvCaps3 shape : ', ConvCaps3.shape)
        out = ConvCaps3
        
    ## Fully Convolutional Capsules
    output_dim_capsule = dim_caps 
    outputs = FullyConvCaps(n_channels=2, dim_capsule=output_dim_capsule, kernel_regularizer=kernel_regularizer)(out)
    outputs = Activation('tanh')(outputs)
    print('Final Routing caps shape : ', outputs.shape)
    
    ## Length Capsules
    real_outputs = Length()(outputs)
    print('Length shape : ', real_outputs.shape)

    n_class=2
    y = layers.Input(shape=(n_class,))
    masked_by_y = Mask()([outputs, y])  # The true label is used to mask the output of capsule layer. For training
    masked = Mask()(outputs)  # Mask using the capsule with maximal length. For prediction


    train_model = models.Model([input_layer],[real_outputs])

    train_model.compile(optimizer=optimizers.SGD(lr=0.01),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return train_model

In [None]:
class args:
    epochs=30
    batch_size=32
    debug=0
    save_dir='./result'
    augment='False'
    gpu='0'
    layernum=1
    dimcaps=16
    validratio=1
    shift_fraction=0.1
args= args()
print(args)

In [None]:
def resize(path):
    dirs = os.listdir( path )
    for item in dirs:
        if os.path.isfile(path+item):
            im = Image.open(path+item)
            f, e = os.path.splitext(path+item)
            imResize = im.resize((256,256), Image.ANTIALIAS)
            imResize.save(f + ' resized.jpg', 'JPEG', quality=90)

train_dir = "../input/balanced-capsnet/Data/Train"
val_dir = "../input/balanced-capsnet/Data/Test"
resize(os.path.join(train_dir,"Normal"))
resize(os.path.join(train_dir,"PNEUMONIA"))
resize(os.path.join(val_dir,"Normal"))
resize(os.path.join(val_dir,"PNEUMONIA"))

from keras.preprocessing.image import ImageDataGenerator
datagen = ImageDataGenerator(
        rescale=1./255
    )

train_generator=datagen.flow_from_directory(directory=train_dir,batch_size=32,color_mode='grayscale')
test_generator=datagen.flow_from_directory(directory=val_dir,batch_size=32, shuffle=False,color_mode='grayscale')

In [None]:
input_shape=(256,256,1)
model = AR_CapsNet(input_shape, args)

print(model.summary())

In [None]:
history = model.fit_generator(train_generator,epochs=10)
y_true = test_generator.classes

result = model.predict_generator(test_generator)

y_pred = np.argmax(result, axis=-1)
from sklearn.metrics import confusion_matrix
print(confusion_matrix(y_true, y_pred))

from sklearn.metrics import classification_report
print(classification_report(y_true,y_pred))