In [21]:
import torch 
import numpy as np
import ipytest
ipytest.autoconfig()

In [22]:
def Conv2D(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros'):
  def convolution(matrix):
    if (in_channels % groups != 0) or (out_channels % groups != 0):
     raise ValueError('in_channels and out_channels must be divisible by groups')

    # Generate bias values
    bias_values = torch.rand(out_channels) if bias else torch.zeros(out_channels)

    # Padding mode exceptions
    valid_padding_modes = ['zeros', 'reflect', 'replicate', 'circular']
    if padding_mode not in valid_padding_modes:
        raise ValueError(f'Invalid padding_mode. Supported modes are: {valid_padding_modes}')    

    def generate_kernel(out_channels, in_channels, kernel_size, groups):
      if isinstance(kernel_size, tuple):
          filters = torch.rand(out_channels, in_channels // groups, kernel_size[0], kernel_size[1])
      elif isinstance(kernel_size, int):
          filters = torch.rand(out_channels, in_channels // groups, kernel_size, kernel_size)
      else:
          raise ValueError('Invalid kernel_size type. Supported types are: int or tuple of ints')

      return filters

      # Generate padding
    if padding_mode == 'zeros':
        pad = torch.nn.ZeroPad2d(padding)
    elif padding_mode == 'reflect':
        pad = torch.nn.ReflectionPad2d(padding)
    elif padding_mode == 'replicate':
        pad = torch.nn.ReplicationPad2d(2)
    elif padding_mode == 'circular':
        pad = torch.nn.CircularPad2d(padding)
    else:
        raise ValueError('Invalid padding_mode. Supported modes are: "zeros", "reflect", "replicate", "circular"')
    matrix = pad(matrix)
    
  # Generate kernel
    filters = generate_kernel(out_channels, in_channels, kernel_size, groups)

    result_list = []
    for l in range(out_channels):

      feature_map = np.array([])  # Инициализация пустой карты признаков

      # Цикл по ширине входной карты признаков с учетом размера фильтра, дилатации и шага
      for i in range(0, matrix.shape[1] - ((filters.shape[2] - 1) *
                                            dilation + 1) + 1, stride):

        # Цикл по высоте входной карты признаков с учетом размера фильтра, дилатации и шага
        for j in range(0, matrix.shape[2] - ((filters.shape[3] - 1) *
                                              dilation + 1) + 1, stride):

          # Инициализация переменной для хранения суммы сверток для текущей позиции
          total = 0
          for c in range(in_channels // groups):
             # Выбор соответствующего среза входной матрицы для операции свертки
            if groups > 1:
              val = matrix[l * (in_channels // groups) + c][i:i + (filters.shape[2] - 1) * dilation + 1:dilation,
                                                             j:j + (filters.shape[3] - 1) * dilation + 1:dilation]
            else:
              # Извлекаем подматрицу из входной матрицы matrix для одного канала (c)
              # с использованием трехмерного среза по ширине, высоте и глубине
              # Срез происходит вдоль каждого измерения с шагом dilation
              # Начальные индексы для каждого измерения определяются переменными i, j
              # и учитывают размеры фильтра и дилатацию
              val = matrix[c]                              [i:i + (filters.shape[2] - 1) * dilation + 1:dilation,
                                                             j:j + (filters.shape[3] - 1) * dilation + 1:dilation]
              
            # Подсчет суммы элементов после умножения на веса фильтра
            local_sum = (val * filters[l][c]).sum()
            total = total + local_sum
            
          #расширяем массив feature_map, добавляя новое значение, которое является суммой total и смещения для конкретного выходного канала
          feature_map = np.append(feature_map, float(total + bias_values[l]))

      # Добавление feature_map в result_list с учетом изменения формы
      result_list.append(feature_map.reshape(
        (matrix.shape[1] - ((filters.shape[2] - 1) * 
                            dilation + 1)) // stride + 1,  # вычитаем из размера второй оси входной матрицы размер свертки с учетом дилатации. Полученное значение представляет собой "виртуальную" длину, на которую мы можем сдвинуть фильтр вдоль входных каналов.
        (matrix.shape[2] - ((filters.shape[3] - 1) * 
                            dilation + 1)) // stride + 1)) # выражение вычитает из размера третьей оси входной матрицы размер свертки с учетом дилатации вдоль ширины. Полученное значение представляет собой "виртуальную" длину, на которую мы можем сдвинуть фильтр вдоль ширины.

    return np.array(result_list), np.array(filters), np.array(bias_values)

  return convolution

In [23]:
def create_random_tensor(shape):
    return torch.rand(shape)

def compare_convolutions(custom_conv, torch_conv, input_tensor):
    result, kernel, bias_val = custom_conv(input_tensor)
    torch_conv.weight.data = torch.tensor(kernel)
    torch_conv.bias.data = torch.tensor(bias_val)

    custom_result = str(np.round(result, 2))
    torch_result = str(np.round(np.array(torch_conv(input_tensor).data), 2))

    assert torch_result == custom_result

In [24]:
def test_1():
    tensor = create_random_tensor((10, 28, 28))
    conv = Conv2D(               in_channels=10, out_channels=2, kernel_size=3, stride=1, padding=0, dilation=1, groups=2, bias=True, padding_mode='zeros')
    torch_conv = torch.nn.Conv2d(in_channels=10, out_channels=2, kernel_size=3, stride=1, padding=0, dilation=1, groups=2, bias=True, padding_mode='zeros')
    compare_convolutions(conv, torch_conv, tensor)

def test_2():
    tensor = create_random_tensor((3, 100, 100))
    conv = Conv2D(               in_channels=3, out_channels=1, kernel_size=4, stride=2, padding=0, dilation=2, groups=1, bias=True, padding_mode='zeros')
    torch_conv = torch.nn.Conv2d(in_channels=3, out_channels=1, kernel_size=4, stride=2, padding=0, dilation=2, groups=1, bias=True, padding_mode='zeros')
    compare_convolutions(conv, torch_conv, tensor)

def test_3():
    tensor = create_random_tensor((4, 4, 4))
    conv = Conv2D(               in_channels=4, out_channels=4, kernel_size=1, stride=1, padding=0, dilation=1, groups=4, bias=True, padding_mode='zeros')
    torch_conv = torch.nn.Conv2d(in_channels=4, out_channels=4, kernel_size=1, stride=1, padding=0, dilation=1, groups=4, bias=True, padding_mode='zeros')
    compare_convolutions(conv, torch_conv, tensor)
    
def test_4():
    tensor = create_random_tensor((3, 128, 128))
    conv = Conv2D(               in_channels=3, out_channels=1, kernel_size=1, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')
    torch_conv = torch.nn.Conv2d(in_channels=3, out_channels=1, kernel_size=1, stride=1, padding=0, dilation=1, groups=1, bias=True, padding_mode='zeros')
    compare_convolutions(conv, torch_conv, tensor)

In [25]:
ipytest.run()

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                         [100%][0m
[32m[32m[1m4 passed[0m[32m in 2.85s[0m[0m


<ExitCode.OK: 0>