In [0]:
import numpy as np
import cv2
import sys
import keras
from keras import backend as K
from keras import activations
from keras import regularizers
from keras.models import Model, Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense,Input
from keras.engine import InputSpec, Layer
from keras.optimizers import SGD, Adam
from keras.utils.conv_utils import conv_output_length

In [0]:
class Normalized_Correlation_Layer(Layer):
    '''
    Builds Custom Normalized Correlation layer by Model Subclassing
    '''
    def __init__(self, patch_size=(5,5), dim_ordering='tf', border_mode='same',
                 stride=(1, 1), activation=None, **kwargs):

        self.kernel_size = patch_size
        self.subsample = stride
        self.dim_ordering = dim_ordering
        self.border_mode = border_mode
        self.activation = activations.get(activation)
        super(Normalized_Correlation_Layer, self).__init__(**kwargs)


    def compute_output_shape(self, input_shape):
        inp_rows = input_shape[0][1]
        inp_cols = input_shape[0][2]
        if self.border_mode != "same":
            rows = conv_output_length(inp_rows, self.kernel_size[0],
                                       self.border_mode, 1)
            cols = conv_output_length(inp_cols, self.kernel_size[1],
                                       self.border_mode, 1)
        else:
            rows = inp_rows
            cols = inp_cols

        return (input_shape[0][0], rows, cols, self.kernel_size[0]*cols*input_shape[0][-1])


    def call(self, x):
        input_1, input_2 = x
        stride_row, stride_col = self.subsample
        inp_shape = input_1._keras_shape
        output_shape = self.compute_output_shape([inp_shape, inp_shape])

        padding_row = (int(self.kernel_size[0] / 2),int(self.kernel_size[0]))
        padding_col = (int(self.kernel_size[1] / 2),int(self.kernel_size[1]))
        input_1 = K.spatial_2d_padding(input_1, padding =(padding_row,padding_col))
        input_2 = K.spatial_2d_padding(input_2, padding = ((padding_row[0]*2, padding_row[1]*2),padding_col))

        output_row = output_shape[1]
        output_col = output_shape[2]

        output = []
        for k in range(inp_shape[-1]):
            xc_1 = []
            xc_2 = []
            for i in range(padding_row[0]):
                for j in range(output_col):
                    xc_2.append(K.reshape(input_2[:, i:i+self.kernel_size[0], j:j+self.kernel_size[1], k],
                                          (-1, 1,self.kernel_size[0]*self.kernel_size[1])))
            for i in range(output_row):
                slice_row = slice(i, i + self.kernel_size[0])
                slice_row2 = slice(i+padding_row[0], i +self.kernel_size[0]+padding_row[0])
                for j in range(output_col):
                    slice_col = slice(j, j + self.kernel_size[1])
                    xc_2.append(K.reshape(input_2[:, slice_row2, slice_col, k],
                                          (-1, 1,self.kernel_size[0]*self.kernel_size[1])))
                    if i % stride_row == 0 and j % stride_col == 0:
                        xc_1.append(K.reshape(input_1[:, slice_row, slice_col, k],
                                              (-1, 1,self.kernel_size[0]*self.kernel_size[1])))
            for i in range(output_row, output_row+padding_row[0]):
                for j in range(output_col):
                    xc_2.append(K.reshape(input_2[:, i:i+ self.kernel_size[0], j:j+self.kernel_size[1], k],
                                          (-1, 1,self.kernel_size[0]*self.kernel_size[1])))

            xc_1_aggregate = K.concatenate(xc_1, axis=1) # batch_size x w'h' x (k**2*d), w': w/subsample-1
            xc_1_mean = K.mean(xc_1_aggregate, axis=-1, keepdims=True)
            xc_1_std = K.std(xc_1_aggregate, axis=-1, keepdims=True)
            xc_1_aggregate = (xc_1_aggregate - xc_1_mean) / xc_1_std

            xc_2_aggregate = K.concatenate(xc_2, axis=1) # batch_size x wh x (k**2*d), w: output_row
            xc_2_mean = K.mean(xc_2_aggregate, axis=-1, keepdims=True)
            xc_2_std = K.std(xc_2_aggregate, axis=-1, keepdims=True)
            xc_2_aggregate = (xc_2_aggregate - xc_2_mean) / xc_2_std
            xc_1_aggregate = K.permute_dimensions(xc_1_aggregate, (0, 2, 1))
            block = []
            len_xc_1= len(xc_1)
            for i in range(len_xc_1):
                sl1 = slice(int(i/inp_shape[1])*inp_shape[1],
                        int(i/inp_shape[1])*inp_shape[1]+inp_shape[1]*self.kernel_size[0])
                block.append(K.reshape(K.batch_dot(xc_2_aggregate[:,sl1,:],
                                      xc_1_aggregate[:,:,i]),(-1,1,1,inp_shape[1]*self.kernel_size[0])))

            block = K.concatenate(block, axis=1)
            block = K.reshape(block,(-1,output_row,output_col,inp_shape[1]*self.kernel_size[0]))
            output.append(block)
        output = K.concatenate(output, axis=-1)
        output = self.activation(output)
        return output


    def get_config(self):
        config = {'patch_size': self.kernel_size,
                  'activation': self.activation.__name__,
                  'border_mode': self.border_mode,
                  'stride': self.subsample,
                  'dim_ordering': self.dim_ordering}
        base_config = super(Correlation_Layer, self).get_config()

        return dict(list(base_config.items()) + list(config.items()))

In [0]:
def normalized_X_corr_model():
    a = Input((300, 300, 3))
    b = Input((300, 300, 3))

    model = Sequential()
    model.add(Conv2D(kernel_size = (5,5), filters = 20,input_shape = (160,60,3), activation = 'relu'))
    model.add(MaxPooling2D((2,2)))
    model.add(Conv2D(kernel_size = (5,5), filters =  25, activation = 'relu'))
    model.add(MaxPooling2D((2,2)))
    model1 = model(b)
    model2 = model(a)

    normalized_layer = Normalized_Correlation_Layer(stride = (1,1), patch_size = (5, 5))([model1, model2])
    conv_layer = Conv2D(kernel_size=(1,1), filters=25, activation='relu')(normalized_layer)
    conv_layer = Conv2D(kernel_size=(3,3), filters=25, activation=None)(conv_layer)
    pool_layer = MaxPooling2D((2,2))(conv_layer)
    fc_layer = Dense(500)(pool_layer)
    output_layer = Dense(2, activation = "softmax")(fc_layer)
    x_corr_model = Model(inputs=[a,b], outputs=output_layer)

    return x_corr_model


In [0]:
if __name__ == "__main__":
    test_model = normalized_X_corr_model()
    img1  = cv2.imread('/mnt/azmnt/code/Users/sachin.choudhary58/1.jpeg')
    X1 = cv2.resize(img1, (300, 300))

    img2  = cv2.imread('/mnt/azmnt/code/Users/sachin.choudhary58/3.jpeg')
    X2 = cv2.resize(img2, (300, 300))

    X1 = np.asarray(X1)
    X2 = np.asarray(X2)
    X1 = X1.astype('float32')
    X2 = X2.astype('float32')
    X1 = np.expand_dims(X1, axis= 0)
    X2 = np.expand_dims(X2, axis= 0)

    Y1 = test_model.predict([X1, X2])   # Predict output
    print(Y1)
