In [107]:
%load_ext autoreload
%autoreload 2

import numpy as np
import tensorflow as tf

from utilsSimpleConv2D import*
from tensorflow.keras.layers import Layer
from typing import Tuple,List,Any,Dict

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [95]:
## Data

In [108]:
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train, x_test=x_train.reshape(-1,28,28,1), x_test.reshape(-1,28,28,1)

x_train, x_test = x_train / 255.0, x_test / 255.0

flat_train = np.reshape(x_train, [x_train.shape[0], 28 * 28])
flat_test = np.reshape(x_test, [x_test.shape[0], 28 * 28])

In [109]:
## Class PaddingJacobiens

In [116]:
class PaddingJacobiens(Layer):

    def __init__(self,
                 kernel_size=3,
                 strides=1,
                 padding='VALID'):

        super(PaddingJacobiens, self).__init__()
        self.strides = strides
        self.padding = padding
        self.kernel_size = kernel_size


    def build(self, input_shape):
        input_shape = tf.TensorShape(input_shape)
        input_channel = input_shape[-1]
        self.pad = math.floor((self.kernel_size - 1) / 2)
    
        # -----------------------------------------matrix_pad-----------------------------------------------
        if self.padding == "SAME":
            if self.strides > 1:
                raise Exception("Not implemented: paddind=SAME and strides>1. if padding=SAME, strides=1")
            # Right_shape
            self.Right_shape: Tuple = input_shape[1] + 2 * self.pad, input_shape[2] + 2 * self.pad
            inputShape = input_shape[1], input_shape[2]
            # matrix_pad
            self.matrix_pad = build_matrix_padding(input_shape=inputShape, pad=self.pad)
            # Jacobien_strides
            self.Build_J()

        elif self.padding == "VALID":
            # Right_shape
            self.Right_shape: Tuple = input_shape[1], input_shape[2]
            # matrix_pad
            self.matrix_pad = matrix = tf.constant(np.identity(input_shape[1] * input_shape[2]), dtype="float32")
    
            if self.strides>self.kernel_size:
                raise Exception("Not implemented")
            else:
                # Jacobien_strides
                self.Build_J()
        else:
            raise Exception("Padding not found")
        # ------------------------------------------------------------------------------------

        # --------------------------------------------------------------------------------------
        self.build = True
        # --------------------------------------------------------------------------------------

    def Build_J(self,*args):
        out_shape1:int=math.floor((self.Right_shape[0]-self.kernel_size)/self.strides) + 1
        out_shape2:int=math.floor((self.Right_shape[1]-self.kernel_size)/self.strides) + 1
    
        row = tf.Variable(np.zeros(shape=(self.Right_shape[1],1)), dtype=tf.float32, trainable=False)
        row[0,0].assign(1)
        for j in range(out_shape2):
            for k in range(self.kernel_size):
                try:
                    new_line=shift_(row,k,axis=0)
                    self.J1 = tf.concat([self.J1, new_line], 1)
                except:
                    self.J1 = row
            row=shift_(row, self.strides,axis=0)
            
        del new_line
        
        col = tf.Variable(np.zeros(shape=(1,self.Right_shape[0])), dtype=tf.float32, trainable=False)
        col[0,0].assign(1)
        
        for i in range(out_shape1):
            for l in range(self.kernel_size):
                try:
                    new_line=shift_(col,l,axis=1)
                    self.J2 = tf.concat([self.J2, new_line], 0)
                except:
                    self.J2 = col
            col=shift_(col, self.strides,axis=1)

    def call(self, inputs):

        # -----------------------------------------------------------------------------------------------------
        flatten = tf.reshape(inputs, shape=(-1, inputs.shape[1] * inputs.shape[2], inputs.shape[3]))
        upFlatten = tf.matmul(a=self.matrix_pad, b=flatten)
        inputs_x=tf.reshape(upFlatten, shape=(-1, inputs.shape[3],self.Right_shape[0] , self.Right_shape[0]))
        inputs_y=tf.matmul(a=self.J2,b=tf.matmul(a=inputs_x,b=self.J1))
        inputs_y=tf.reshape(inputs_y,shape=(-1, inputs_y.shape[2], inputs_y.shape[3], inputs_y.shape[1]))
        # -----------------------------------------------------------------------------------------------------
        
        # -----------------------------------------------------------------------------------------------------
        return inputs_y

In [130]:
## class SpecConv2D

In [218]:
class SpecConv2D(Layer):

    def __init__(self, filters,
                 kernel_size=3,
                 use_lambda_in=True,
                 use_bias=False,
                 activation="relu"):

        super(SpecConv2D, self).__init__()

        self.filters = filters
        self.use_bias = use_bias
        self.kernel_size = kernel_size
        self.use_lambda_in = use_lambda_in
        self.activation = activations.get(activation)

        
        self.initializer = initializers.RandomUniform(-1, 1)

    def build(self, input_shape):
        input_shape = tf.TensorShape(input_shape)
        self.input_channel = input_shape[-1]
        # ------------------------------out_in_shape_phi_indices---------------------------
        self.set_indices_phi(N=input_shape[1],M=input_shape[2])
        # -----------------------------------------------------------------------------------
    
        # --------------------------------noyau_of_phi----------------------------------------
        self.noyau_of_phi = tf.constant(np.ones((self.filters,
                                                self.kernel_size * self.kernel_size)),
                                                dtype="float32")
        
        # -----------------------------------kernel-------------------------------------------
        kernel = tf.repeat(self.noyau_of_phi, repeats=self.output_lenght, axis=0, name=None)
        
        kernel = tf.reshape(kernel, shape=(-1, self.filters * self.output_lenght * self.kernel_size * self.kernel_size))
        
        kernel = tf.sparse.SparseTensor(
        indices=self.indices, values=kernel[0],
        dense_shape=(self.filters, self.output_lenght, input_shape[1]*input_shape[2])
        )
        
        self.kernel = tf.sparse.to_dense(kernel)
        # -------------------------------------------------------------------------------------

        # Lambda_in
        if self.use_lambda_in:
            self.Lambda_in = self.add_weight(
                name='Lambda_in',
                shape=(1, input_shape[1]*input_shape[2]),
                initializer=self.initializer,
                dtype=tf.float32,
                trainable=self.use_lambda_in)

        else:
            raise Exception("Not implemented.")

        # --------------------------------------------bias---------------------------------------
        if self.use_bias:
            self.bias = self.add_weight(
                name='bias',
                shape=(self.filters,),
                dtype=tf.float32,
                trainable=self.use_bias)
        else:
            self.bias = None
        # ---------------------------------------------------------------------------------------

        # ---------------------------------------------------------------------------------------
        self.build = True
        # ---------------------------------------------------------------------------------------
    
    def set_indices_phi(self,N:int,M:int, *args):
        self.indices: List[Tuple] = list()
    
        self.out_shape1: int = math.floor((N - self.kernel_size) / self.kernel_size) + 1
        self.out_shape2: int = math.floor((M - self.kernel_size) / self.kernel_size) + 1
        self.output_lenght: int = self.out_shape1 * self.out_shape2
    
        for filters in range(self.filters):
            count: int = 1
            shift: int = 0
            for i in range(self.output_lenght):
                if i == count * (self.out_shape2):
                    count += 1
                    shift += self.kernel_size + (self.kernel_size - 1) * M
                else:
                    if shift:
                        shift += self.kernel_size
                    else:
                        shift += 1
                for block in range(self.kernel_size):
                    for j in range(self.kernel_size):
                        self.indices.append((filters, i, block * M + shift - 1 + j))

    def get_indices_phi(self, *args):
        return self.indices

    def call(self, inputs):

        
        # -----------------------------------------------------------------------------------------------------
        flatten = tf.reshape(inputs, shape=(-1, inputs.shape[1] * inputs.shape[2], inputs.shape[3]))
        # -----------------------------------------------------------------------------------------------------
        # -----------------------------------------------------------------------------------------------------
        if 0<self.filters<2:
            outputs = tf.matmul(a=tf.linalg.matmul(self.kernel, tf.linalg.diag(self.Lambda_in[0, :], k=0)), b=flatten)
            print(outputs.shape)
            outputs = tf.reshape(outputs, shape=(-1, self.out_shape1, self.out_shape2,self.input_channel))
            # -----------------------------------------------------------------------------------------------------#
        else:
             raise Exception("Not implemented for this filter.")

       
        if self.use_bias:
            outputs = tf.nn.bias_add(outputs, self.bias)

        if self.activation is not None:
            outputs = self.activation(outputs)
        else:
            pass

        return outputs


    def get_kernel(self, *args):
        return self.kernel
        



In [220]:
kernel_size=3
inputs=tf.keras.layers.Input(shape=(28,28,2))
layer1=PaddingJacobiens(kernel_size=kernel_size,
                       strides=1,
                        padding='VALID')
outputs1=layer1(inputs)
print(f"Outputs1: {outputs1}")
layer2=SpecConv2D(filters=1,
                 kernel_size=kernel_size,
                 use_lambda_in=True,
                 use_bias=False,
                 activation="relu")
outputs2=layer2(outputs1)
print(f"Outputs2: {outputs2}")

Outputs1: KerasTensor(type_spec=TensorSpec(shape=(None, 78, 78, 2), dtype=tf.float32, name=None), name='padding_jacobiens_20/Reshape_2:0', description="created by layer 'padding_jacobiens_20'")
(None, 676, 2)
Outputs2: KerasTensor(type_spec=TensorSpec(shape=(None, 26, 26, 2), dtype=tf.float32, name=None), name='spec_conv2d_45/Relu:0', description="created by layer 'spec_conv2d_45'")
