In [5]:
import numpy as np

In [47]:
class ConvLayer1D:
    
    
    '''
    
    Input Dims (X): (C, feats per channel)
    
    ''' 
    
    def __init__(self, seed):
        self.seed = seed
        self._set_seed() 
        
    def forward(self, X, output_channels, kernel_size, padding = 0, stride = 1, dilation_rate = 1):
        self.padding = padding
        self.X = self._pad(X)
        self.output_channels = output_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.dilation_rate = dilation_rate
        
        Y = self._forward_util()
        
        return Y
        
    def _forward_util(self):
       
        self._create_kernel()
        
        if len(self.kernel.shape) == 1 and len(self.X.shape) != 1:
         
            '''
          
            where the kernel is shape 1, meaning 1 output channel, but we have multiple input channels in X:  
            
            '''
          
            output_size = int(((self.X.shape[1] - self.kernel.size) / self.stride) + 1)  
            Y = np.zeros(shape = (output_size))

            for in_ch in range(self.X.shape[0]):
                for i in range(Y.size):
                    conv_out = []
                    current_slice = self.X[in_ch, i * self.stride : (i * self.stride + self.kernel.size)]
                    if current_slice.size != self.kernel.size:
                        break
                    conv_ch = current_slice * self.kernel
                    conv_out.append(conv_ch)
                  
                    for in_ch in range(self.kernel.shape[1]): 
                        Y[i] = np.sum(current_slice * self.kernel[1, in_ch])
                        
        elif len(self.kernel.shape) == 1 and len(self.X.shape) == 1:
            output_size = int(((self.X.size - self.kernel.size) / self.stride) + 1)  
            Y = np.zeros(shape = (output_size))

            for i in range(Y.size):
                current_slice = self.X[i * self.stride : (i * self.stride + self.kernel.size)]
                Y[i] = np.sum(current_slice * self.kernel)
        
        elif len(self.kernel.shape) == 3 and len (self.X.shape) != 1:
            
            output_size = int(((self.X.shape[1] - self.kernel.shape[2]) / self.stride) + 1)
            Y = np.zeros(shape = (self.output_channels, output_size)) 

            for out_ch in range(Y.shape[0]):
                for in_ch in range(self.X.shape[0]):
                    for i in range(Y.shape[1]):
                  
                        slice_idx = i * self.stride
                        current_slice = self.X[in_ch, slice_idx:(slice_idx + self.kernel.shape[2])]
                        conv_out = []
                    
                        if current_slice.size != self.kernel.shape[2]:
                            break
                        
                        for in_ch in range(self.kernel.shape[1]):
                            conv_ch = current_slice * self.kernel[out_ch, in_ch]
                            conv_out.append(conv_ch)
                            
                        Y[out_ch, i] = np.sum(conv_out)

        elif len(self.kernel.shape) == 3 and len(self.X.shape) == 1:
            
            '''
            If we have multiple output channels but a single input channel...
            '''
            
            output_size = int(((self.X.size - self.kernel.shape[2]) / self.stride) + 1)
            Y = np.zeros(shape = (self.output_channels, output_size)) 

            for out_ch in range(Y.shape[0]):
                for i in range(Y.shape[1]):
                    
                    slice_idx = i * self.stride
                    current_slice = self.X[slice_idx:(slice_idx+self.kernel.shape[2])]
                    
                    if current_slice.size != self.kernel.shape[2]:
                        break
                    
                    Y[out_ch, i] = np.sum(current_slice * self.kernel[out_ch])

        return Y
        
    def _create_kernel(self):
        
        if self.dilation_rate != 1 and self.output_channels > 1:
            kernel_mask = np.random.random_sample(size = (self.output_channels, self.X.shape[0], self.kernel_size))
            self.kernel = self._dilate_multiple_channels(kernel_mask)

        elif self.dilation_rate != 1 and self.output_channels == 1:
            kernel_mask = np.random.random_sample(size = (self.kernel_size))
            self.kernel = self._dilate(kernel_mask)
         
        elif self.dilation_rate == 1 and self.output_channels > 1: 
            self.kernel = np.random.random_sample(size = (self._output_channels, self.X.shape[0], self.kernel_size))
           
        elif self.dilation_rate == 1 and self.output_channels == 1 and len(self.X.shape) != 1:
            self.kernel = np.random.random_sample(size = (1, self.X.shape[0], self.kernel_size))
            
        elif self.dilation_rate == 1 and self.output_channels == 1:
            self.kernel = np.random.random_sample(size = (self.kernel_size)) 
            
          
    def _dilate(self, kernel_mask):
       
        if len(kernel_mask) == 1:
            return kernel_mask
       
        dilation_rate = self.dilation_rate - 1  
        i = 0
        
        while i < len(kernel_mask):
            if kernel_mask[i] != 0:
                kernel_mask = np.concatenate((kernel_mask[:i+1], [0 for _ in range(dilation_rate)], kernel_mask[i+1:]))
                i += dilation_rate
            i+= 1
            if i == (len(kernel_mask) - 1):
                return kernel_mask
            
    def _dilate_multiple_channels(self, kernel_mask):
        
        if kernel_mask.shape[2] == 1:
            return kernel_mask
        
        out_kernel = np.zeros(shape = (kernel_mask.shape[0], kernel_mask.shape[1], (kernel_mask.shape[2] * self.dilation_rate - (self.dilation_rate - 1))))
        
        dilation_rate = self.dilation_rate - 1
        
        for out_ch in range(kernel_mask.shape[0]):
            for in_ch in range(kernel_mask.shape[1]):
                i = 0
                dilated_row = kernel_mask[out_ch, in_ch, :]
                while i < len(dilated_row):
                    if dilated_row[i] != 0:
                        dilated_row = np.concatenate((dilated_row[:i+1], [0 for _ in range(dilation_rate)], dilated_row[i+1:]))
                        i += dilation_rate
                        
                    i += 1
                    
                    if i == (len(dilated_row) - 1):
                        out_kernel[out_ch, in_ch, :] = dilated_row
                        break
      
        return out_kernel 
          
    def _pad(self, X):
        x = np.pad(X, pad_width = self.padding) 
        return x
           
    def _set_seed(self):
        if self.seed is not None:
            np.random.seed(self.seed)
            
    @property
    def dilation_rate(self):
        return self._dilation_rate
    
    @dilation_rate.setter
    def dilation_rate(self, dilation_rate):
        assert dilation_rate >= 1, ValueError('Dilation cannot be less than 1 for the Kernel!')
        self._dilation_rate = dilation_rate
    
    @property
    def output_channels(self):
        return self._output_channels
    
    @output_channels.setter
    def output_channels(self, output_channels):
        assert output_channels >= 1, ValueError('Output Channels cannot be less than 1!')
        self._output_channels = output_channels
        
    @property
    def kernel_size(self):
        return self._kernel_size     
   
    @kernel_size.setter
    def kernel_size(self, kernel_size):
        assert self.X.size >= kernel_size, ValueError('Kernel cannot be greater than input_vector!')
        assert isinstance(kernel_size, int), ValueError('kernel_size must be int for 1D Conv')
        self._kernel_size = kernel_size

In [48]:
# This portion was written by my PHD-level teacher, GPT


# Initialize ConvLayer1D with a fixed seed for reproducibility
op = ConvLayer1D(seed=1)

# Test Case 1: Single Channel Input (flattened)
x1 = np.array([1, 2, 3])  # Shape (3,)
output1 = op.forward(X=x1, output_channels=2, kernel_size=2, padding=0, stride=1, dilation_rate=1)
print("Output Shape 1 (Single Channel):", output1.shape)  # Expected shape: (2, 2)

# Test Case 2: Multiple Channels
x2 = np.array([[1, 2, 3], [4, 5, 6]])  # Shape (2, 3) - 2 channels
output2 = op.forward(X=x2, output_channels=3, kernel_size=2, padding=0, stride=1, dilation_rate=1)
print("Output Shape 2 (Multiple Channels):", output2.shape)  # Expected shape: (3, 2)

# Test Case 3: Single Channel Input with Padding
x3 = np.array([1, 2, 3])  # Shape (3,)
output3 = op.forward(X=x3, output_channels=1, kernel_size=2, padding=1, stride=1, dilation_rate=1)
print("Output Shape 3 (Single Channel with Padding):", output3.shape)  # Expected shape: (4, )

# Test Case 4: Multiple Channels with Stride > 1
x4 = np.array([[1, 2, 3, 4], [5, 6, 7, 8]])  # Shape (2, 4) - 2 channels
output4 = op.forward(X=x4, output_channels=1, kernel_size=2, padding=0, stride=2, dilation_rate=1)
print("Output Shape 4 (Multiple Channels with Stride):", output4.shape)  # Expected shape: (2, )
print(output4)

# Test Case 5: Dilation with Multiple Channels
x5 = np.array([1, 2, 3, 4, 5])  # Shape (5, ) - 1 channel
output5 = op.forward(X=x5, output_channels=1, kernel_size=2, padding=0, stride=1, dilation_rate=2)
print("Output Shape 5 (Dilation with Multiple Channels):", output5.shape)  # Expected shape: (3, )

# Test Case 6: Larger Kernel Size with Single Channel
x6 = np.array([1, 2, 3, 4, 5])  # Shape (5,)
output6 = op.forward(X=x6, output_channels=1, kernel_size=3, padding=0, stride=1, dilation_rate=1)
print("Output Shape 6 (Larger Kernel Single Channel):", output6.shape)  # Expected shape: (3, )

# Test Case 7: Single Channel Input - Edge Case
x7 = np.array([1])  # Shape (1,)
output7 = op.forward(X=x7, output_channels=1, kernel_size=1, padding=0, stride=1, dilation_rate=1)
print("Output Shape 7 (Edge Case Single Element):", output7.shape)  # Expected shape: (1, )

# Test Case 8: Multiple Channels - Edge Case with Kernel Larger than Input
try:
    x8 = np.array([[1, 2]])  # Shape (1, 2) - 1 channel, too small for kernel_size 3
    output8 = op.forward(X=x8, output_channels=1, kernel_size=3, padding=0, stride=1, dilation_rate=1)
except ValueError as e:
    print("Output Shape 8 (Error):", e)  # Should raise an error


Output Shape 1 (Single Channel): (2, 2)
Output Shape 2 (Multiple Channels): (3, 2)
Output Shape 3 (Single Channel with Padding): (4,)
Output Shape 4 (Multiple Channels with Stride): (1, 2)
[[6.77755687 9.12170071]]
Output Shape 5 (Dilation with Multiple Channels): (3,)
Output Shape 6 (Larger Kernel Single Channel): (3,)
Output Shape 7 (Edge Case Single Element): (1,)


AssertionError: Kernel cannot be greater than input_vector!