<a href="https://colab.research.google.com/github/jdowner212/cs577_addernet/blob/main/AdderNet_functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.python.ops.numpy_ops import np_config
np_config.enable_numpy_behavior()


In [314]:
'''
Functions described in paper
'''

def L1(a,b):
    return -1*np.abs(a-b)

def hard_tanh(array):
    array = np.where(array<-1,-1,array)
    array = np.where(array>1, 1, array)
    return array

'''Equation 1'''
# modified by Equation 2

'''Equation 5'''
def dY_dF_element(image,filters,m,n,i,j,k,t):
    return image[k,m+i,n+j] - filter[t,k,i,j]

'''Equation 6''' # clipped, full-precision gradient
def dY_dImage_element(image,filters,m,n,i,j,k,t):
    return hard_tanh(filter[t,k,i,j] - image[k,m+i,n+j])

def window_gradients(image,filters,m,n,t):
    dy_df_window = np.zeros_like(filters)
    dy_dx_window = np.zeros_like(image)

    num_filters, k_depth, k_height, k_width = filters.shape
    for k in range(k_depth):
        for j in range(k_width):
            for i in range(k_height):
                img_minus_f = dY_dF_element(image,filters,m,n,i,j,k,t)
                dy_df_window[t,k,i,j] = img_minus_f
                dy_dx_window[k,i,j]   = hard_tanh(img_minus_f)
                
    return dy_df_window, dy_dx_window

'''Equation 2'''
def Y_adder(image, F, m, n, t, similarity_f=L1): # image, group fo filters, row#, col#, filter#, similarity function
    sum_ = 0
    num_filters, k_depth, k_height, k_width = F.shape
    for k in range(k_depth):
        for j in range(k_width):
            for i in range(k_height):
                sum_ += similarity_f(image[k, m+i, n+j], F[t,k,i,j])
    return sum_

In [247]:
images = np.random.randint(0,10,(1,3,5,5))
filters = np.random.randint(0,10,(5,3,3,3))
forward_batch(images,filters,1,0)#.shape


array([[[[[ -85.,  -95.,  -71.],
          [-107.,  -90., -103.],
          [-109.,  -57., -101.]]],


        [[[ -96.,  -82.,  -90.],
          [ -88.,  -77.,  -82.],
          [ -94.,  -94.,  -86.]]],


        [[[ -92.,  -82.,  -96.],
          [ -90.,  -85.,  -86.],
          [-100.,  -82.,  -78.]]],


        [[[-105.,  -83.,  -95.],
          [ -93.,  -76., -111.],
          [ -95.,  -97., -101.]]],


        [[[ -94.,  -96., -100.],
          [ -74.,  -87.,  -84.],
          [ -88.,  -72.,  -84.]]]]])

In [256]:
model = tf.keras.models.Sequential()

In [None]:
model.add.layers(tf.keras.layers.Conv2D(filters))

In [260]:
tf.keras.layers.Conv2D

keras.layers.convolutional.conv2d.Conv2D

In [None]:

'''Equation 3'''
# ignore -- CNN formula

'''Equation 4'''
# ignore -- updated with equation 5

'''Equation 7'''
# ignore -- hard_tanh implemented previously

'''Equation 8'''
# ignore -- CNN formula

'''Equation 9'''
# def var_Y_adder(X,F,variance_f=torch.var):
    # check torch.var documentation: https://pytorch.org/docs/stable/generated/torch.var.html
    # not sure if we can call torch.var(X) with default parameters
    # or if we need to specify. Does this output a scalar or a tensor?
# Trying K.var as tensorflow substitute for torch.var -- make sure they work the same
# or tf.var?
def var_Y_adder(X,F,variance_f=K.var):
    var_X = variance_f(X)
    var_F = variance_f(F)
    ###
    _, c_in, d, _ = F.shape
    pi = np.pi

    return np.sqrt(pi/2)*(d**2)*(c_in)*(var_X + var_F)

'''Equation 10'''
def batch_norm(minibatch, gamma, beta):
    m = len(minibatch)
    mean = (1/m)*sum(minibatch)
    std = (1/m)*sum([(x_i-mean)**2 for x_i in minibatch])
    gamma*(minibatch-mean)/std + beta
    return gamma*(minibatch-mean)/std + beta

'''Equation 11'''
def dL_dMinibatch_i(minibatch,dL_dy,i,L,gamma):
    # In dL_dy, y is the result of applying batch_norm to the minibatch
    m = len(minibatch)
    mean = (1/m)*sum(minibatch)
    std = (1/m)*sum([(x_i-mean)**2 for x_i in minibatch])
    
    sum_ = 0
    for j in range(m):
        x_term = (minibatch[i]-minibatch[j])*(minibatch[j]-mean)/std
        sum_ += (dL_dy[i] - dL_dy[j]*(1 + x_term))
    sum_ *= gamma/((m**2)*std)
    
    return sum_

'''Equation 12'''
# update rule for F
def delta_F_l(adaptive_lr_l, dL_dF_l, gamma):
    # the update delta for the filter in layer l
    return gamma*adaptive_lr_l*dL_dF_l

'''Equation 13'''
def adaptive_lr_l(dL_dF_l, eta, k):
    # k = number of elements in F_l -- I think equal to len(dL_dF_1)
    # in which case we don't need to explicitly provide it
    
    
    # l2_norm = torch.sqrt([g**2 for g in dL_dF_l])
    l2_norm = K.sqrt([g**2 for g in dL_dF_l]) # make sure torch.sqrt and K.sqrt are equivalents
    
    
    return eta*np.sqrt(k)/l2_norm

In [386]:
# dimensions: NxHxWxC

def relu(array):
    return np.where(array>=0,array,0)

def addernet_single_step(window, filter, b=None, similarity_f=L1):
    """
    window -- k_h x k_w x k_d
    filter -- k_h x k_w x k_d
    b      -- 1x1x1
    Z      -- scalar
    """
    k_h,k_w,k_d = filter.shape
    out=0
    for h in range(k_h):
        for w in range(k_w):
            for d in range(k_d):
                out += similarity_f(window[h,w,d], filter[h,w,d])

    if not b:
        b = np.zeros((1,1,1))
    out += b.astype(float)

    return out

class addernet_layer:
    def __init__(self,F,X,stride=1,padding=0,activation=relu,B=None):
        self.F = F
        self.X = X
        self.s = stride
        self.p = padding
        self.act = activation
        B = np.zeros((F.shape[0],1,1,1)) if not B else B
        self.B = B

    def forward(self):
        """    
        X -- n_tensors x H x W x c_in
        F -- c_out x k_H x k_W x c_in
        b -- c_out x 1 x 1 x 1
        Z -- n_tensors x H_new x W_new, c_out
        cache -- info needed for backward pass
        """
        X,F,s,p,act,B = self.X, self.F, self.s, self.p, self.act, self.B
        n_tensors, H,   W,   c_in = X.shape
        c_out,     k_H, k_W, c_in = F.shape
        n_filters = c_out

        H_new = int((H + 2*p - k_H)/s)+1
        W_new = int((W + 2*p - k_W)/s)+1

        
        Z = np.zeros([n_tensors, H_new, W_new, c_out])
        X_padded = np.pad(X, ((0,0), (p,p), (p,p), (0,0)), 'constant', constant_values = (0,0))
        
        for i in range(n_tensors):           # traverse batch
            this_img = X_padded[i,:,:,:]     # select ith image in batch
            for f in range(n_filters):       # traverse filters
                this_filter = F[f,:,:,:]
                this_B = B[f,:,:,:]
                for h in range(H_new):       # traverse height
                    for w in range(W_new):   # traverse width
                        
                        v0 = h*s
                        v1 = h*s + k_H
                        h0 = w*s 
                        h1 = w*s + k_W
                        
                        this_window = this_img[v0:v1,h0:h1,:]

                        Z[i, h, w, f] = addernet_single_step(this_window, this_filter, this_B) 

        Z = act(Z)                             
        assert Z.shape == (n_tensors, H_new, W_new, n_filters)
        cache = (X, W, B, s, p)
        return Z, cache

    def backward(upstream_g, cache):
        """
        upstream_g (dL/dZ) -- n_tensors x H_up x W_up x c_up
        cache (values from previous layers) -- (X, W, B, s, p)               
        
        Output:
        dX -- dL/dX, shape n_tensors x H_down x W_down x c_down
        dW -- dL/dW, shape k x k x k x n_filters
        dB -- dL/dB, shape 1 x 1 x 1 x n_filters
        """
        
        X, W, B, s, p = cache
        n_tensors, H_down, W_down, c_down = X.shape
        k, k, k, n_filters = W.shape
        
        n_tensors, H_up, W_up, c_up = upstream_g.shape
        

        dX_down = np.zeros((n_tensors, H_down, W_down, c_down))                           
        dW = np.zeros((k, k, k, n_filters))
        dB = np.zeros((1, 1, 1, n_filters))

        X_padded = np.pad(X, ((0,0), (p,p), (p,p), (0,0)), 'constant', constant_values = (0,0))
        dX_down_padded = np.pad(dX_down, ((0,0), (p,p), (p,p), (0,0)), 'constant', constant_values = (0,0))
        
        for i in range(n_tensors):                       
            x = X_padded[i]
            dx = dX_down_padded[i]
            
            for h in range(H_up):                   # loop over vertical axis of the output volume
                for w in range(W_up):               # loop over horizontal axis of the output volume
                    for c in range(c_up):           # loop over the channels of the output volume
                        
                        v0,v1 = h,h+k
                        h0,h1 = w,w+k
                        
                        x_window = x[v0:v1, h0:v1, :]
                        dx_local = x_window-W[:,:,:,c]
                        dw_local = hard_tanh(W[:,:,:,c]-x_window)

                        dx[v0:v1, v0:v1, :] += dx_local * upstream_g[i, h, w, c]
                        dW[:,:,:,c] += dw_local * upstream_g[i, h, w, c]
                        dB[:,:,:,c] += upstream_g[i, h, w, c]
                        
            dX_down[i, :, :, :] = dx[p:-p, p:-p, :]
        
        assert(dX_down.shape == (n_tensors, H_down, W_down, c_down))
        return dX_down, dW, dB

In [387]:
layer = addernet_layer(filters,images,1,1,relu,None)

In [388]:
Z,cache = layer.forward()
Z.shape

(1, 5, 5, 5)