In [1]:
import numpy as np

## convolution and pooling operations

In [2]:
'''
- expected input volume to be an array of 3d images
- filters is a list of 3d array filters
- biases is a list of bias terms, one for each filter
- maintain_depth: convolve channel by channel instead of one volume becoming a single depth slice
--> if true, does not use bias, since it should not be applied to each depth slice
'''
def convolution(input_volume, filters, biases, stride=1, zero_padding=0, maintain_depth=False):
    
    # assume square images
    num_images, num_channels, _, img_dim_orig = input_volume.shape
    num_filters, _, __, filter_dim = filters.shape
    
    
    # zero padding adds zeroes around the input, but not along the depth dimension of each image
    image = input_volume
    if zero_padding != 0:
        image = np.zeros(shape=(num_images, num_channels, img_dim_orig + 2 * zero_padding, img_dim_orig + 2 * zero_padding))
        image[:, :, zero_padding:-zero_padding, zero_padding:-zero_padding] = input_volume
    
    img_dim = img_dim_orig + 2 * zero_padding
    
    
    # im2col 3d from:
    # https://stackoverflow.com/questions/50292750/python-the-implementation-of-im2col-which-takes-the-advantages-of-6-dimensional
    img_stride, channel_stride, row_stride, col_stride = image.strides
    out_dim = (img_dim - filter_dim) // stride + 1
    col = np.lib.stride_tricks.as_strided(image, shape=(num_images, out_dim, out_dim, num_channels, filter_dim, filter_dim), strides=(img_stride, stride * row_stride, stride * col_stride, channel_stride, row_stride, col_stride)).astype(float)
    
    if maintain_depth:
        col = col.reshape((num_images * out_dim ** 2 * num_channels, filter_dim ** 2))
    else:
        col = col.reshape(np.multiply.reduceat(col.shape, (0, 3)))
    
    # each 2d slice of col has rows containing each extended receptive field
    # similarly, the filters will be flattened into a 2d array (col: each filter stretched out)
    filt_stride, filt_depth_stride, filt_row_stride, filt_col_stride = filters.strides
                        
    filt_col = None
    if (maintain_depth):
        filt_col = np.lib.stride_tricks.as_strided(filters, 
                                                   shape=(filter_dim ** 2, num_channels * num_filters), 
                                                   strides=(filt_col_stride, filt_depth_stride))
    else:
        filt_col = np.lib.stride_tricks.as_strided(filters, 
                                                   shape=(num_channels * filter_dim ** 2, num_filters), 
                                                   strides=(filt_col_stride, filt_stride))
                              
    # perform matrix multiplication
    # each col is a different filter; every out_dim^2 rows corresponds to one image's convolved activations
    conv = np.dot(col, filt_col)
                          
    if maintain_depth:
        # conv contains convolutions of depth slices with other slices, so the correct ones must be extracted
        # has #columns = num_filters * num_channels
        # up to num_channel th column, shift 1st column up 0, 2nd up 1, 3rd up 2; then repeat for each filter's columns
        # then take every num_channel th row
        rows, cols = conv.shape
        for col in range(cols):
            shift = col % num_channels
            if shift != 0:
                conv[:-shift, [col]] = conv[shift:, [col]]
            
        conv = conv[np.arange(0, rows, step=num_channels), :]
        
        # reshape into a 5d array of outputs
        # 5th dim contains the result for each image 
        # 4th dimension contains convolutions maintaining depth, for each filter
        # 3rd, 2nd, 1st dimensions are the outputs with the depths maintained
        conv_row_stride, conv_col_stride = conv.strides
        conv = np.lib.stride_tricks.as_strided(conv, 
                                               shape=(num_images, num_filters, num_channels, out_dim, out_dim),
                                               strides=(out_dim ** 2 * conv_row_stride, num_channels * conv_col_stride, conv_col_stride, out_dim * conv_row_stride, conv_row_stride))
        
    else:
        # add bias term (each filter should have one)
        conv += biases
    
        # reshape into list of activation volumes (1 volume per image)
        conv_row_stride, conv_col_stride = conv.strides
        conv = np.lib.stride_tricks.as_strided(conv, shape=(num_images, num_filters, out_dim, out_dim), strides=(out_dim ** 2 * conv_row_stride, conv_col_stride, out_dim * conv_row_stride, conv_row_stride))

    return conv
        

In [3]:
'''
- expected input volume to be an array of 3d images
- (square) sliding window dimensions
- modes: max pooling, min pooling, mean pooling
- stride: for sliding viewing window
- zero_padding: zero padding
'''
def pool(input_volumes, filter_dim, mode='max', stride=1, zero_padding=0): 
    # assume square images
    num_images, num_channels, _, img_dim_orig = input_volumes.shape   
    
    # zero padding adds zeroes around the input, but not along the depth dimension of each image
    image = input_volumes
    if zero_padding != 0:
        image = np.zeros(shape=(num_images, num_channels, img_dim_orig + 2 * zero_padding, img_dim_orig + 2 * zero_padding))
        image[:, :, zero_padding:-zero_padding, zero_padding:-zero_padding] = input_volume
    
    img_dim = img_dim_orig + 2 * zero_padding
    
    # im2col 3d from:
    # https://stackoverflow.com/questions/50292750/python-the-implementation-of-im2col-which-takes-the-advantages-of-6-dimensional
    img_stride, channel_stride, row_stride, col_stride = image.strides
    out_dim = (img_dim - filter_dim) // stride + 1
    col = np.lib.stride_tricks.as_strided(image, shape=(num_images, out_dim, out_dim, num_channels, filter_dim, filter_dim), strides=(img_stride, stride * row_stride, stride * col_stride, channel_stride, row_stride, col_stride)).astype(float)
    # col = col.reshape((num_images, num_channels * out_dim ** 2, filter_dim ** 2))
    col = col.reshape((num_images * num_channels * out_dim ** 2, filter_dim ** 2))
    
    # store the indices of max or min values for use in backpropagation 
    # in col receptive areas rolled out horizontally, with each depth in a different row, and then each image's rows stacked
    mask_idx = None
    if mode == 'max':
        mask_idx = np.argmax(col, axis=1)
    elif mode == 'min':
        mask_idx= np.argmin(col, axis=1)
    
    # perform the pooling operations
    result = None
    if mode == 'max':
        result = col.max(axis=1)
    elif mode == 'min':
        result = col.min(axis=1)
    elif mode == 'mean':
        result = col.mean(axis=1)
        
    # reshape result into list of images
    data_stride = result.strides[0]
    result = np.lib.stride_tricks.as_strided(result, 
                                             shape=(num_images, num_channels, out_dim, out_dim), 
                                             strides=(out_dim ** 2 * num_channels * data_stride, data_stride, num_channels * out_dim * data_stride, num_channels * data_stride))
    if mode == 'min' or mode == 'max':
        return result, mask_idx
    
    return result

## forward propagation

In [4]:
# layer class to group information about layers
class Layer:
    '''
    layer_type: 'conv' or 'pool'
    filters_shape: shape tuple if layer_type is 'conv' (expected 4D); single dimension for square window if layer_type is 'pool'
    stride, zero_padding: constants representing the stride and amount of zeroes added to the border of an input
    pooling_mode: method used in pooling: 'max', 'min', or 'mean'
    '''
    def __init__(self, layer_type, filters_shape, stride=1, zero_padding=0, pooling_mode='max'):
        self.layer_type = layer_type
        
        # initialize filter weights
        self.filters = None
        self.filter_dim = None
        
        if layer_type == 'conv':
            self.filters = np.random.normal(size=filters_shape)
            self.biases = np.random.normal(size=self.filters.shape[0])
        elif layer_type == 'pool':
            self.filter_dim = filters_shape
            
        self.stride = stride
        self.zero_padding = zero_padding
        self.pooling_mode = pooling_mode
        
    def __str__(self):
        if self.layer_type == 'conv':
            return f'conv(filters_shape=({self.filters.shape}), stride={self.stride}, zero_padding={self.zero_padding})'
            
        elif self.layer_type == 'pool':
            return f'pool(filter_dim={self.filter_dim}, stride={self.stride}, zero_padding={self.zero_padding}, pooling_mode={self.pooling_mode})'

In [5]:
# makeshift way of specifying structure of the layers
# separate layers with a pipe: |
# start each layer with the type of layer and a semi colon: ie conv; or pool;
# no spaces?
# separate parameters with a semicolon: ;
# conv params: filter f=shape tuple; stride s=num; zero padding z=num
# - note that the depth of the shape will be overridden by the previous layer's depth, since the filter extends through the input volume
# pooling params: filter dimension fdim=num; mode m='max' (or mean or min); stride s=num, zero padding z=num

structure_str = 'conv;f=4,1,3,3;s=1;z=1|conv;f=3,1,3,3;s=1;z=0|pool;fdim=2;m=max;s=2;z=0'
imgs = np.arange(108).reshape((3, 1, 6, 6))
prev_layer_depth = imgs.shape[1]

layers = np.asarray([])
for layer_str in structure_str.split('|'):
    param_str = layer_str.split(';')
    layer_type = param_str[0]
    
    # read in expected params for conv
    if layer_type == 'conv':
        shape = stride = padding = None
        
        for param in param_str[1:]:
            p, value = param.split('=')
            if p == 'f':
                shape = [int(v) for v in value.split(',')]
                # the depth of the filter is equal to the depth of the input volume; the depth of the lext layer will equal number of filters
                shape[1] = prev_layer_depth
                prev_layer_depth = shape[0] 
            elif p == 's':
                stride = int(value)
            elif p == 'z':
                padding = int(value)
                
        layers = np.append(layers, Layer('conv', shape, stride, padding, pooling_mode=None))
                
    # read in expected params for pool
    elif layer_type == 'pool':
        window_dim = stride = padding = pooling_method = None
        
        for param in param_str[1:]:
            p, value = param.split('=')
            if p == 'fdim':
                window_dim = int(value)
            elif p == 's':
                stride = int(value)
            elif p == 'z':
                padding = int(value)
            elif p == 'm':
                pooling_method = value
                
        layers = np.append(layers, Layer('pool', window_dim, stride, padding, pooling_method))
        
for l in layers:
    print(l)

conv(filters_shape=((4, 1, 3, 3)), stride=1, zero_padding=1)
conv(filters_shape=((3, 4, 3, 3)), stride=1, zero_padding=0)
pool(filter_dim=2, stride=2, zero_padding=0, pooling_mode=max)


In [6]:
def conv_fprop(imgs, layers):
    output = imgs.copy()
    activation_volumes = [output]
    for l in layers:
        if l.layer_type =='conv':
            output = convolution(output, l.filters, l.biases, l.stride, l.zero_padding)

            # i think this goes before the activation function, for use in backprop
            activation_volumes.append(output)
            # todo activation function
            
            # ie ReLU
            output[output < 0] = 0 
            
            
        elif l.layer_type == 'pool':
            output = pool(output, l.filter_dim, mode=l.pooling_mode, stride=l.stride, zero_padding=l.zero_padding)
            activation_volumes.append(output)
            
    return activation_volumes
        
# note that the first element contains the input images
activations = conv_fprop(imgs, layers) 
print(activations[-1].shape)
activations[-1]

(3, 3, 2, 2)


array([[[[ 20.97333621, 118.94082573],
         [133.63921631, 182.54240548]],

        [[145.67059479, 128.08008664],
         [114.21088904,  94.25781463]],

        [[  0.        ,   0.        ],
         [  0.        ,   0.        ]]],


       [[[221.66616401, 484.97551843],
         [393.29542472, 545.98130055]],

        [[222.50004556, 155.54086702],
         [ 28.72348642,  26.85107705]],

        [[  0.        ,   0.        ],
         [  0.        ,   0.        ]]],


       [[[431.38368445, 851.01021113],
         [652.95163312, 912.01599325]],

        [[304.26338517, 192.07550155],
         [  0.        ,   0.        ]],

        [[  0.        ,   0.        ],
         [  0.        ,   0.        ]]]])

In [1]:
# adjust layer weights and biases
learning_rate = 0.1
def conv_bprop_gradient(layers, incoming_errors, activations):
    # https://towardsdatascience.com/backpropagation-in-a-convolutional-layer-24c8d64d8509
    # https://medium.com/@mayank.utexas/backpropagation-for-convolution-with-strides-8137e4fc2710
    
    curr_error = incoming_errors
    bias_grad = filter_grad = next_errors = None
    
    # todo account for activation function
    
    for l, activation in reversed(zip(layers, activations[:-1])):
        if l.layer_type == 'conv':
            # **bias gradient** for each filter is the sum of the errors for that filter (across all images)
            bias_grad = np.sum(curr_error, axis=(0, 2, 3))
            
            # incoming error depth slices are associated with each filter and each depth slice of the image... so each slice 
            # will be stacked to match the depth of the image, and then used as a "filter" since the gradient can be computed
            # using a convolution
            num_images, num_filters, rows, cols = curr_error.shape
            new_dim = cols + (cols - 1) * (l.stride - 1)
            
            # **weight and input gradients**
            
            # add stride - 1 zeroes between error elements so that the convolutions work out
            strided_error = curr_error
            if l.stride > 1:
                # array with new dimensions (same number of rows and cols)
                err = np.zeros(shape=(num_images, num_filters, new_dim, new_dim))

                for i in range(rows):
                    for j in range(cols):
                        err[:, :, [i * l.stride], [j * l.stride]] = curr_error[:, :, [i], [j]]

                strided_error = err
            
        
            # 5d array: first 3 dims are filter, 4th dim is each image's incoming error, 5th dim is each "stacked" slice
            image_stride, channel_stride, row_stride, col_stride = strided_error.strides
            depth = l.weights.shape[1]
            strided_error = np.lib.stride_tricks.as_strided(strided_error,
                                                            shape=(num_filters, num_images, depth, new_dim, new_dim), 
                                                            strides=(channel_stride, image_stride, 0, row_stride, col_stride))
            
            
            # calculate the next layer's error:
            new_shape = strided_error.shape
            padding = l.filters[0].shape[-1] - 1
            
            input_grad = np.zeros(shape=activation.shape)
            for f in range(len(l.filters)):
                w = np.rot90(l.filters[f], 2, axes=(1, 2))
                w = np.asarray([w])
                dx = convolution(strided_error[f], w, None, stride=1, zero_padding=padding, maintain_depth=True)
                input_grad += dx[:, 0, :, :, :]
            

            # calculate the weight gradient
            
            # each incoming error is associated with one image only, and so that error should only 
            # be convolved with its respective image
            weight_error = np.zeros(l.filters.shape)

            # iterate through images.. might be slow :(
            for i in range(len(num_images)):
                dw = convolution(activation, strided_error[:, i, :, :, :], biases=None, stride=1, zero_padding=0, maintain_depth=True)
                weight_error += dw
                
            # todo update the weights and biases using the calculated gradient
            l.biases -= learning_rate * bias_grad
            l.filters -= learning_rate * weight_error
        
            curr_error = input_grad
            

SyntaxError: invalid syntax (<ipython-input-1-6d15ce88bb9d>, line 30)

In [13]:
# testing for backprop code
strided_error = np.random.normal(size=(2, 3, 3, 3))
stride = 2
curr_error = strided_error

num_images, num_filters, rows, cols = strided_error.shape
new_dim = cols + (cols - 1) * (stride - 1)

# array with new dimensions (same number of rows and cols)
err = np.zeros(shape=(num_images, num_filters, new_dim, new_dim))

for i in range(rows):
    for j in range(cols):
        err[:, :, [i * stride], [j * stride]] = curr_error[:, :, [i], [j]]

strided_error = err
                
image_stride, channel_stride, row_stride, col_stride = strided_error.strides
depth = 3
strided_error = np.lib.stride_tricks.as_strided(strided_error,
                                                shape=(4, 2, depth, new_dim, new_dim), 
                                                strides=(channel_stride, image_stride, 0, row_stride, col_stride))

strided_error

array([[[[[-2.10889924e-001,  0.00000000e+000, -4.81065959e-001,
            0.00000000e+000, -4.43586104e-001],
          [ 0.00000000e+000,  0.00000000e+000,  0.00000000e+000,
            0.00000000e+000,  0.00000000e+000],
          [ 1.47904212e+000,  0.00000000e+000, -2.53627079e-001,
            0.00000000e+000,  1.53193938e+000],
          [ 0.00000000e+000,  0.00000000e+000,  0.00000000e+000,
            0.00000000e+000,  0.00000000e+000],
          [ 1.64205935e+000,  0.00000000e+000, -1.44471887e+000,
            0.00000000e+000,  6.55718226e-001]],

         [[-2.10889924e-001,  0.00000000e+000, -4.81065959e-001,
            0.00000000e+000, -4.43586104e-001],
          [ 0.00000000e+000,  0.00000000e+000,  0.00000000e+000,
            0.00000000e+000,  0.00000000e+000],
          [ 1.47904212e+000,  0.00000000e+000, -2.53627079e-001,
            0.00000000e+000,  1.53193938e+000],
          [ 0.00000000e+000,  0.00000000e+000,  0.00000000e+000,
            0.00000000e+000, 