In [1]:
import numpy as np
from scipy import special

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network. You do not need to change anything here, just read the comments.

In [2]:
import numpy as np
from scipy import special

class Module(object):
    # Базовый класс для всех слоев
    def __init__ (self):
        self.output = None
        self.gradInput = None
        self.training = True

    def forward(self, input):
        return self.updateOutput(input)

    def backward(self, input, gradOutput):
        self.updateGradInput(input, gradOutput)
        self.accGradParameters(input, gradOutput)
        return self.gradInput

    def updateOutput(self, input):
        # Тут будет логика прямого прохода
        pass

    def updateGradInput(self, input, gradOutput):
        # Тут будет логика расчета градиента по входу
        pass

    def accGradParameters(self, input, gradOutput):
        # Тут будет логика расчета градиента по параметрам (если они есть)
        pass

    def zeroGradParameters(self):
        # Обнуление градиентов
        pass

    def getParameters(self):
        # Вернуть параметры слоя
        return []

    def getGradParameters(self):
        # Вернуть градиенты по параметрам
        return []

    def train(self):
        # Режим обучения
        self.training = True

    def evaluate(self):
        # Режим оценки (например для dropout, batchnorm)
        self.training = False

    def __repr__(self):
        return "Module"

# Sequential container

**Define** a forward and backward pass procedures.

In [3]:
class Sequential(Module):
    # Контейнер для слоев
    def __init__ (self):
        super(Sequential, self).__init__()
        self.modules = []
        self.inputs = [] # Запоминаем входы для backward

    def add(self, module):
        self.modules.append(module)

    def updateOutput(self, input):
        # Прямой проход по всем модулям
        self.inputs = [input]
        current_output = input
        for module in self.modules:
            current_output = module.forward(current_output)
            self.inputs.append(current_output) # Сохраняем вход для следующего слоя
        self.output = self.inputs.pop() # Последний выход
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Обратный проход
        current_grad = gradOutput
        for i in range(len(self.modules) - 1, -1, -1):
            current_grad = self.modules[i].backward(self.inputs[i], current_grad)
        self.gradInput = current_grad
        return self.gradInput

    def zeroGradParameters(self):
        for module in self.modules:
            module.zeroGradParameters()

    def getParameters(self):
        # Собираем параметры со всех слоев в один плоский список
        params = []
        for module in self.modules:
            params.extend(module.getParameters())
        return params

    def getGradParameters(self):
        # аналогично для градиентов
        grad_params = []
        for module in self.modules:
            grad_params.extend(module.getGradParameters())
        return grad_params

    def __repr__(self):
        string = "".join([repr(x) + '\n' for x in self.modules])
        return string

    def __getitem__(self, x):
        return self.modules.__getitem__(x)

    def train(self):
        # переключаем все слои в режим обучения
        self.training = True
        for module in self.modules:
            module.train()

    def evaluate(self):
        # Аналогично для режима оценки
        self.training = False
        for module in self.modules:
            module.evaluate()

# Layers

## 1 (0.2). Linear transform layer
Also known as dense layer, fully-connected layer, FC-layer, InnerProductLayer (in caffe), affine transform
- input:   **`batch_size x n_feats1`**
- output: **`batch_size x n_feats2`**

In [4]:
class Linear(Module):
    # Полносвязный слой
    def __init__(self, n_in, n_out):
        super(Linear, self).__init__()

        stdv = 1./np.sqrt(n_in)
        self.W = np.random.uniform(-stdv, stdv, size = (n_out, n_in))
        self.b = np.random.uniform(-stdv, stdv, size = n_out)

        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)

        self.input_cache = None

    def updateOutput(self, input):
        self.input_cache = input
        self.output = np.dot(input, self.W.T) + self.b
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.dot(gradOutput, self.W)
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        # Градиенты по весам и смещению
        self.gradW += np.dot(gradOutput.T, self.input_cache)
        self.gradb += np.sum(gradOutput, axis=0)

    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b]

    def getGradParameters(self):
        return [self.gradW, self.gradb]

    def __repr__(self):
        s = self.W.shape
        q = 'Linear %d -> %d' % (s[1], s[0])
        return q

## 2. (0.2) SoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{softmax}(x)_i = \frac{\exp x_i} {\sum_j \exp x_j}$

Recall that $\text{softmax}(x) == \text{softmax}(x - \text{const})$. It makes possible to avoid computing exp() from large argument.

In [5]:
class SoftMax(Module):
    def __init__(self):
         super(SoftMax, self).__init__()
         self.output_cache = None

    def updateOutput(self, input):
        # Вычитаем максимум чтоб не было больших чисел в exp
        input_stable = input - np.max(input, axis=1, keepdims=True)
        exp_input = np.exp(input_stable)
        self.output = exp_input / np.sum(exp_input, axis=1, keepdims=True)
        self.output_cache = self.output
        return self.output

    def updateGradInput(self, input, gradOutput):
        local_output = self.output_cache
        sum_grad_output = np.sum(gradOutput * local_output, axis=1, keepdims=True)
        self.gradInput = local_output * (gradOutput - sum_grad_output)
        return self.gradInput

    def __repr__(self):
        return "SoftMax"

## 3. (0.2) LogSoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{logsoftmax}(x)_i = \log\text{softmax}(x)_i = x_i - \log {\sum_j \exp x_j}$

The main goal of this layer is to be used in computation of log-likelihood loss.

In [6]:
class LogSoftMax(Module):
    def __init__(self):
         super(LogSoftMax, self).__init__()
         self.softmax_cache = None

    def updateOutput(self, input):
        # Аналогично софтмаксу
        input_stable = input - np.max(input, axis=1, keepdims=True)
        exp_input = np.exp(input_stable)
        sum_exp = np.sum(exp_input, axis=1, keepdims=True)
        log_sum_exp = np.log(sum_exp)

        self.output = input_stable - log_sum_exp
        self.softmax_cache = exp_input / sum_exp # сохраняем софтмакс для градиента
        return self.output

    def updateGradInput(self, input, gradOutput):
        softmax_output = self.softmax_cache
        sum_grad_output = np.sum(gradOutput, axis=1, keepdims=True)
        self.gradInput = gradOutput - softmax_output * sum_grad_output
        return self.gradInput

    def __repr__(self):
        return "LogSoftMax"

## 4. (0.3) Batch normalization
One of the most significant recent ideas that impacted NNs a lot is [**Batch normalization**](http://arxiv.org/abs/1502.03167). The idea is simple, yet effective: the features should be whitened ($mean = 0$, $std = 1$) all the way through NN. This improves the convergence for deep models letting it train them for days but not weeks. **You are** to implement the first part of the layer: features normalization. The second part (`ChannelwiseScaling` layer) is implemented below.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer should work as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
where $\mu$ and $\sigma$ - mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer should maintain exponential moving average values for mean and variance:
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance.

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

In [7]:
class BatchNormalization(Module):
    EPS = 1e-5
    def __init__(self, num_features, momentum = 0.1):
        super(BatchNormalization, self).__init__()
        self.momentum = momentum
        self.moving_mean = np.zeros(num_features)
        self.moving_variance = np.ones(num_features)

        # кэш
        self.input_reshaped = None
        self.x_centered = None
        self.std_inv = None
        self.normalized = None
        self.input_shape = None
        self.num_features = num_features

    def _reshape_input(self, input):
         # Вспомогательная функция чтоб обработать и 2d и 4d входы
         self.input_shape = input.shape
         if len(self.input_shape) == 2: # (N, C)
             return input, self.input_shape[1]
         elif len(self.input_shape) == 4: # (N, C, H, W) -> (N*H*W, C)
             N, C, H, W = self.input_shape
             return input.transpose(0, 2, 3, 1).reshape(-1, C), C
         else:
             raise ValueError("Input shape not supported")

    def _reshape_output(self, output_reshaped):
         # Возвращаем к исходной форме если надо
         if len(self.input_shape) == 4:
             N, C, H, W = self.input_shape
             return output_reshaped.reshape(N, H, W, C).transpose(0, 3, 1, 2)
         return output_reshaped

    def updateOutput(self, input):
        input_reshaped, num_features = self._reshape_input(input)
        if num_features != self.num_features:
             raise ValueError(f"Expected {self.num_features} features, but got {num_features}")

        if self.training:
            batch_mean = np.mean(input_reshaped, axis=0)
            batch_variance = np.var(input_reshaped, axis=0, ddof=0)
            self.std_inv = 1. / np.sqrt(batch_variance + self.EPS)
            self.x_centered = input_reshaped - batch_mean
            self.normalized = self.x_centered * self.std_inv

            # Обновляем скользящие средние
            self.moving_mean = (1.0 - self.momentum) * self.moving_mean + self.momentum * batch_mean
            n = input_reshaped.shape[0]
            unbiased_batch_variance = batch_variance * n / (n - 1) if n > 1 else batch_variance
            self.moving_variance = (1.0 - self.momentum) * self.moving_variance + self.momentum * unbiased_batch_variance

            output_reshaped = self.normalized
        else:
            # На инференсе используем накопленные статистики
            output_reshaped = (input_reshaped - self.moving_mean) / np.sqrt(self.moving_variance + self.EPS)

        self.output = self._reshape_output(output_reshaped)
        return self.output

    def updateGradInput(self, input, gradOutput):
        gradOutput_reshaped, _ = self._reshape_input(gradOutput)
        input_reshaped, _ = self._reshape_input(input)

        if self.training:
            N = gradOutput_reshaped.shape[0]
            if N == 1:
                 # Упрощенный градиент для батча=1
                 dmu = np.sum(gradOutput_reshaped * -self.std_inv, axis=0)
                 gradInput_reshaped = gradOutput_reshaped * self.std_inv + dmu / N
            else:
                 # Стандартные формулы градиента для батчнорма
                 dvar = np.sum(gradOutput_reshaped * self.x_centered * -0.5 * self.std_inv**3, axis=0)
                 dmu = np.sum(gradOutput_reshaped * -self.std_inv, axis=0) + \
                       dvar * np.sum(-2. * self.x_centered, axis=0) / N
                 gradInput_reshaped = gradOutput_reshaped * self.std_inv + \
                                     dvar * 2. * self.x_centered / N + \
                                     dmu / N
        else:
            # На инференсе градиент просто масштабируется
            std_eval_inv = 1. / np.sqrt(self.moving_variance + self.EPS)
            gradInput_reshaped = gradOutput_reshaped * std_eval_inv

        self.gradInput = self._reshape_output(gradInput_reshaped)
        return self.gradInput

    def __repr__(self):
        return f"BatchNormalization(num_features={self.num_features}, momentum={self.momentum})"

In [8]:
class ChannelwiseScaling(Module):
    # Аффинное преобразование в BatchNorm (gamma * x + beta)
    def __init__(self, num_features):
        super(ChannelwiseScaling, self).__init__()

        self.gamma = np.ones(num_features) # Инициализаци  как в PyTorch
        self.beta = np.zeros(num_features)

        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

        self.input_cache = None
        self.input_shape = None
        self.num_features = num_features

    def _reshape_params_grads(self, tensor):
        if len(self.input_shape) == 4: # (N, C, H, W) -> (1, C, 1, 1)
             return tensor.reshape(1, -1, 1, 1)
        elif len(self.input_shape) == 2: # (N, C) -> (1, C)
             return tensor.reshape(1, -1)
        return tensor

    def _sum_grads(self, grad):
         # Суммируем градиенты по всем осям кроме каналов
         if len(self.input_shape) == 4: # (N, C, H, W) -> (C,)
             return np.sum(grad, axis=(0, 2, 3))
         elif len(self.input_shape) == 2: # (N, C) -> (C,)
             return np.sum(grad, axis=0)
         return grad

    def updateOutput(self, input):
        self.input_cache = input
        self.input_shape = input.shape
        if input.shape[1] != self.num_features:
             raise ValueError(f"Expected {self.num_features} channels, but got {input.shape[1]}")

        gamma_reshaped = self._reshape_params_grads(self.gamma)
        beta_reshaped = self._reshape_params_grads(self.beta)

        self.output = input * gamma_reshaped + beta_reshaped
        return self.output

    def updateGradInput(self, input, gradOutput):
        gamma_reshaped = self._reshape_params_grads(self.gamma)
        self.gradInput = gradOutput * gamma_reshaped
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        # Считаем градиенты по гамме и бете
        self.gradBeta += self._sum_grads(gradOutput)
        self.gradGamma += self._sum_grads(gradOutput * self.input_cache)

    def zeroGradParameters(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)

    def getParameters(self):
        return [self.gamma, self.beta]

    def getGradParameters(self):
        return [self.gradGamma, self.gradBeta]

    def __repr__(self):
        return f"ChannelwiseScaling(num_features={self.num_features})"

Practical notes. If BatchNormalization is placed after a linear transformation layer (including dense layer, convolutions, channelwise scaling) that implements function like `y = weight * x + bias`, than bias adding become useless and could be omitted since its effect will be discarded while batch mean subtraction. If BatchNormalization (followed by `ChannelwiseScaling`) is placed before a layer that propagates scale (including ReLU, LeakyReLU) followed by any linear transformation layer than parameter `gamma` in `ChannelwiseScaling` could be freezed since it could be absorbed into the linear transformation layer.

## 5. (0.3) Dropout
Implement [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it should sample a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module should implement identity transform i.e. `self.output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

In [9]:
class Dropout(Module):
    # Дропаут
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        if not (0.0 <= p < 1.0):
             raise ValueError("p must be between 0 and 1")
        self.p = p
        self.mask = None
        self.scale = 1.0 / (1.0 - self.p) if self.p < 1.0 else 0.0 # Для inverted dropout

    def updateOutput(self, input):
        if self.training:
            self.mask = (np.random.rand(*input.shape) > self.p).astype(input.dtype)
            # Применяем маску и масштабируем
            self.output = input * self.mask * self.scale
        else:
            # На инференсе ничего не делаем
            self.output = input
            self.mask = None
        return self.output

    def updateGradInput(self, input, gradOutput):
        if self.training:
            # Градиент проходит только через активные нейроны
            if self.mask is None:
                 raise RuntimeError("backward called without forward in training mode")
            self.gradInput = gradOutput * self.mask * self.scale
        else:
            # На инференсе градиент проходит как есть
            self.gradInput = gradOutput
        return self.gradInput

    def __repr__(self):
        return f"Dropout(p={self.p})"

# 6. (2.0) Conv2d
Implement [**Conv2d**](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html). Use only this list of parameters: (in_channels, out_channels, kernel_size, stride, padding, bias, padding_mode) and fix dilation=1 and groups=1.

In [10]:
# Реализация на numpy - не самая быстрая но зато без циклов

def im2col_indices(x, kernel_h, kernel_w, stride_h=1, stride_w=1, pad_h=0, pad_w=0):
    N, C, H, W = x.shape
    out_h = (H + 2 * pad_h - kernel_h) // stride_h + 1
    out_w = (W + 2 * pad_w - kernel_w) // stride_w + 1

    img = np.pad(x, [(0,0), (0,0), (pad_h, pad_h), (pad_w, pad_w)], mode='constant')

    i0 = np.repeat(np.arange(kernel_h), kernel_w)
    i0 = np.tile(i0, C)
    i1 = stride_h * np.repeat(np.arange(out_h), out_w)
    j0 = np.tile(np.arange(kernel_w), kernel_h * C)
    j1 = stride_w * np.tile(np.arange(out_w), out_h)
    i = i0.reshape(-1, 1) + i1.reshape(1, -1)
    j = j0.reshape(-1, 1) + j1.reshape(1, -1)
    k = np.repeat(np.arange(C), kernel_h * kernel_w).reshape(-1, 1)

    cols = img[:, k, i, j]
    cols = cols.transpose(1, 2, 0).reshape(C * kernel_h * kernel_w, -1)
    return cols

# Код col2im_indices (оставим как есть)
def col2im_indices(cols, x_shape, kernel_h, kernel_w, stride_h=1, stride_w=1, pad_h=0, pad_w=0):
    N, C, H, W = x_shape
    H_padded, W_padded = H + 2 * pad_h, W + 2 * pad_w
    out_h = (H + 2 * pad_h - kernel_h) // stride_h + 1
    out_w = (W + 2 * pad_w - kernel_w) // stride_w + 1

    img = np.zeros((N, C, H_padded, W_padded), dtype=cols.dtype)

    i0 = np.repeat(np.arange(kernel_h), kernel_w)
    i0 = np.tile(i0, C)
    i1 = stride_h * np.repeat(np.arange(out_h), out_w)
    j0 = np.tile(np.arange(kernel_w), kernel_h * C)
    j1 = stride_w * np.tile(np.arange(out_w), out_h)
    i = i0.reshape(-1, 1) + i1.reshape(1, -1)
    j = j0.reshape(-1, 1) + j1.reshape(1, -1)
    k = np.repeat(np.arange(C), kernel_h * kernel_w).reshape(-1, 1)

    cols_reshaped = cols.reshape(C * kernel_h * kernel_w, -1, N)
    cols_reshaped = cols_reshaped.transpose(2, 0, 1)

    np.add.at(img, (slice(None), k, i, j), cols_reshaped)

    if pad_h > 0 or pad_w > 0:
        return img[:, :, pad_h:H_padded-pad_h, pad_w:W_padded-pad_w]
    return img


class Conv2d(Module):
    # Свертка 2D
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True, padding_mode='zeros'):
        super(Conv2d, self).__init__()
        if padding_mode != 'zeros':
             # пока только zero padding поддерживается в im2col/col2im
             print(f"Warning: Conv2d padding_mode='{padding_mode}' not supported, using 'zeros'.")
             self.padding_mode = 'zeros'
        else:
            self.padding_mode = padding_mode

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = (stride, stride) if isinstance(stride, int) else stride
        self.padding = (padding, padding) if isinstance(padding, int) else padding

        # Инициализйция весов (He)
        stdv = np.sqrt(2.0 / (self.in_channels * self.kernel_size[0] * self.kernel_size[1]))
        self.W = np.random.normal(0, stdv, size=(self.out_channels, self.in_channels, self.kernel_size[0], self.kernel_size[1]))
        self.gradW = np.zeros_like(self.W)

        self.bias = bias
        if bias:
            self.b = np.zeros(self.out_channels)
            self.gradb = np.zeros_like(self.b)
        else:
            self.b = None
            self.gradb = None

        self.input_shape = None
        self.input_col = None # кэш для backward

    def updateOutput(self, input):
        self.input_shape = input.shape
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding

        out_H = (H + 2 * pH - kH) // sH + 1
        out_W = (W + 2 * pW - kW) // sW + 1

        self.input_col = im2col_indices(input, kH, kW, sH, sW, pH, pW)
        W_rows = self.W.reshape(self.out_channels, -1)

        # Свертка как умножение матриц
        output_col = np.dot(W_rows, self.input_col)

        if self.bias:
            output_col += self.b.reshape(-1, 1) # broadcasting

        # reshape к (N, out_C, out_H, out_W)
        self.output = output_col.reshape(self.out_channels, out_H, out_W, N).transpose(3, 0, 1, 2)
        return self.output

    def updateGradInput(self, input, gradOutput):
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding

        # gradOutput в формат столбцов (out_C, N*out_H*out_W)
        gradOutput_col = gradOutput.transpose(1, 2, 3, 0).reshape(self.out_channels, -1)
        W_rows = self.W.reshape(self.out_channels, -1)

        # Градиент по входу (в виде столбцов)
        gradInput_col = np.dot(W_rows.T, gradOutput_col)

        # Преобразуем обратно в формат картинки
        self.gradInput = col2im_indices(gradInput_col, self.input_shape, kH, kW, sH, sW, pH, pW)
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
         kH, kW = self.kernel_size
         sH, sW = self.stride
         pH, pW = self.padding

         gradOutput_col = gradOutput.transpose(1, 2, 3, 0).reshape(self.out_channels, -1)

         # Градиент по весам
         # Нужен input_col из forward pass
         current_input_col = self.input_col
         if current_input_col is None:
             current_input_col = im2col_indices(input, kH, kW, sH, sW, pH, pW)

         gradW_flat = np.dot(gradOutput_col, current_input_col.T)
         self.gradW += gradW_flat.reshape(self.W.shape)

         # Градиент по смещению
         if self.bias:
             self.gradb += np.sum(gradOutput_col, axis=1)

    def zeroGradParameters(self):
        self.gradW.fill(0)
        if self.bias:
            self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b] if self.bias else [self.W]

    def getGradParameters(self):
        return [self.gradW, self.gradb] if self.bias else [self.gradW]

    def __repr__(self):
        return (f"Conv2d(in={self.in_channels}, out={self.out_channels}, k={self.kernel_size}, s={self.stride}, p={self.padding})")

# 7. (0.5) Implement [**MaxPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html) and [**AvgPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.AvgPool2d.html). Use only parameters like kernel_size, stride, padding (negative infinity for maxpool and zero for avgpool) and other parameters fixed as in framework.

In [11]:
class MaxPool2d(Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super(MaxPool2d, self).__init__()
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = (stride, stride) if isinstance(stride, int) else \
                      stride if stride is not None else self.kernel_size
        self.padding = (padding, padding) if isinstance(padding, int) else padding

        self.input_shape = None
        self.indices = None # кэш индексов максимумов

    def updateOutput(self, input):
        self.input_shape = input.shape
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding

        # Паддинг с -inf чтоб не мешал максимуму
        if pH > 0 or pW > 0:
            input_padded = np.pad(input, ((0, 0), (0, 0), (pH, pH), (pW, pW)),
                                  mode='constant', constant_values=-np.inf)
            H_pad, W_pad = H + 2*pH, W + 2*pW
        else:
            input_padded = input
            H_pad, W_pad = H, W

        out_H = (H_pad - kH) // sH + 1
        out_W = (W_pad - kW) // sW + 1

        self.output = np.zeros((N, C, out_H, out_W), dtype=input.dtype)
        self.indices = np.zeros((N, C, out_H, out_W), dtype=int) # плоские индексы в окне

        # Ищем максимумы и их индексы по окнам
        for n in range(N):
            for c in range(C):
                for h in range(out_H):
                    for w in range(out_W):
                        h_start, w_start = h * sH, w * sW
                        h_end, w_end = h_start + kH, w_start + kW
                        window = input_padded[n, c, h_start:h_end, w_start:w_end]

                        self.output[n, c, h, w] = np.max(window)
                        self.indices[n, c, h, w] = np.argmax(window)

        return self.output

    def updateGradInput(self, input, gradOutput):
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding
        _, _, out_H, out_W = gradOutput.shape

        H_pad, W_pad = H + 2*pH, W + 2*pW
        gradInput_padded = np.zeros((N, C, H_pad, W_pad), dtype=gradOutput.dtype)

        # Распределяем градиент по индексам максимумов
        for n in range(N):
            for c in range(C):
                for h in range(out_H):
                    for w in range(out_W):
                        flat_idx = self.indices[n, c, h, w]
                        h_idx = flat_idx // kW
                        w_idx = flat_idx % kW

                        h_start, w_start = h * sH, w * sW
                        abs_h_idx = h_start + h_idx
                        abs_w_idx = w_start + w_idx

                        gradInput_padded[n, c, abs_h_idx, abs_w_idx] += gradOutput[n, c, h, w]

        # Убираем паддинг
        if pH > 0 or pW > 0:
            self.gradInput = gradInput_padded[:, :, pH:-pH if pH > 0 else None, pW:-pW if pW > 0 else None]
        else:
            self.gradInput = gradInput_padded

        return self.gradInput

    def __repr__(self):
        return f"MaxPool2d(k={self.kernel_size}, s={self.stride}, p={self.padding})"

In [12]:
class AvgPool2d(Module):
    # Аналогично макспулу но усреднение
    def __init__(self, kernel_size, stride=None, padding=0):
        super(AvgPool2d, self).__init__()
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = (stride, stride) if isinstance(stride, int) else \
                      stride if stride is not None else self.kernel_size
        self.padding = (padding, padding) if isinstance(padding, int) else padding

        self.input_shape = None
        self.window_size = self.kernel_size[0] * self.kernel_size[1]

    def updateOutput(self, input):
        self.input_shape = input.shape
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding

        # Паддинг нулями
        if pH > 0 or pW > 0:
            input_padded = np.pad(input, ((0, 0), (0, 0), (pH, pH), (pW, pW)), mode='constant')
            H_pad, W_pad = H + 2*pH, W + 2*pW
        else:
            input_padded = input
            H_pad, W_pad = H, W

        out_H = (H_pad - kH) // sH + 1
        out_W = (W_pad - kW) // sW + 1

        self.output = np.zeros((N, C, out_H, out_W), dtype=input.dtype)

        # Считаем среднее по окнам
        for n in range(N):
            for c in range(C):
                for h in range(out_H):
                    for w in range(out_W):
                        h_start, w_start = h * sH, w * sW
                        h_end, w_end = h_start + kH, w_start + kW
                        window = input_padded[n, c, h_start:h_end, w_start:w_end]
                        self.output[n, c, h, w] = np.mean(window)

        return self.output

    def updateGradInput(self, input, gradOutput):
        N, C, H, W = self.input_shape
        kH, kW = self.kernel_size
        sH, sW = self.stride
        pH, pW = self.padding
        _, _, out_H, out_W = gradOutput.shape

        H_pad, W_pad = H + 2*pH, W + 2*pW
        gradInput_padded = np.zeros((N, C, H_pad, W_pad), dtype=gradOutput.dtype)

        scale = 1.0 / self.window_size # Коэфф. для распределения градиента

        # Распределяем градиент равномерно по элементам окна
        for n in range(N):
            for c in range(C):
                for h in range(out_H):
                    for w in range(out_W):
                        h_start, w_start = h * sH, w * sW
                        h_end, w_end = h_start + kH, w_start + kW
                        gradInput_padded[n, c, h_start:h_end, w_start:w_end] += \
                            gradOutput[n, c, h, w] * scale

        # Убираем паддинг
        if pH > 0 or pW > 0:
             self.gradInput = gradInput_padded[:, :, pH:-pH if pH > 0 else None, pW:-pW if pW > 0 else None]
        else:
             self.gradInput = gradInput_padded

        return self.gradInput

    def __repr__(self):
        return f"AvgPool2d(k={self.kernel_size}, s={self.stride}, p={self.padding})"

# 8. (0.3) Implement **GlobalMaxPool2d** and **GlobalAvgPool2d**. They do not have testing and parameters are up to you but they must aggregate information within channels. Write test functions for these layers on your own.

In [13]:
class GlobalMaxPool2d(Module):
    # Макспулинг по всей карте HxW
    def __init__(self):
        super(GlobalMaxPool2d, self).__init__()
        self.input_shape = None
        self.max_mask = None # маска для backward

    def updateOutput(self, input):
        self.input_shape = input.shape
        N, C, H, W = self.input_shape

        # Ищем максимум по H, W осям
        self.output = np.max(input, axis=(2, 3), keepdims=True) # (N, C, 1, 1)
        # Маска для градиента - где были максимумы
        self.max_mask = (input == self.output)
        # Убираем лишние оси
        self.output = self.output.reshape(N, C)
        return self.output

    def updateGradInput(self, input, gradOutput):
        N, C = gradOutput.shape
        H, W = self.input_shape[2:]

        # Расширяем градиент для broadcasting
        gradOutput_expanded = gradOutput.reshape(N, C, 1, 1)

        # Градиент идет только туда где был максимум
        # Делим на кол-во максимумов если их несколько (редко но бывает)
        num_maxima = np.sum(self.max_mask, axis=(2, 3), keepdims=True)
        num_maxima[num_maxima == 0] = 1 # чтоб на 0 не делить

        self.gradInput = (self.max_mask * gradOutput_expanded) / num_maxima
        return self.gradInput

    def __repr__(self):
        return "GlobalMaxPool2d"

class GlobalAvgPool2d(Module):
    # Аналогично GlobalMaxPool2d но усреднение
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()
        self.input_shape = None
        self.spatial_size = None # H * W

    def updateOutput(self, input):
        self.input_shape = input.shape
        N, C, H, W = self.input_shape
        self.spatial_size = H * W

        # Среднее по H, W
        self.output = np.mean(input, axis=(2, 3)) # (N, C)
        return self.output

    def updateGradInput(self, input, gradOutput):
        N, C = gradOutput.shape
        # Расширяем gradOutput
        gradOutput_expanded = gradOutput.reshape(N, C, 1, 1)
        # Градиент распределяется равномерно
        self.gradInput = np.ones_like(input) * gradOutput_expanded / self.spatial_size
        return self.gradInput

    def __repr__(self):
        return "GlobalAvgPool2d"

# Тестовая функция
def test_global_pooling():
    batch_size = 2
    channels = 3
    height = 4
    width = 5 # Не квадратное для теста
    input_data = np.arange(batch_size * channels * height * width).reshape(batch_size, channels, height, width).astype(np.float32)
    input_data += np.random.randn(batch_size, channels, height, width) * 0.1 # Добавим шум

    print("--- GlobalMaxPool2d Test ---")
    gmax_pool = GlobalMaxPool2d()
    output_max = gmax_pool.forward(input_data)
    print(f"Input shape: {input_data.shape}")
    # Ожидаемый выход: максимум по последним двум осям
    expected_output_max = np.max(input_data, axis=(2, 3))
    print(f"Output shape: {output_max.shape}, Expected shape: {expected_output_max.shape}")
    print(f"Output matches expected: {np.allclose(output_max, expected_output_max)}")
    # Тест градиента
    grad_output_max = np.random.randn(*output_max.shape)
    grad_input_max = gmax_pool.backward(input_data, grad_output_max)
    print(f"Gradient input shape: {grad_input_max.shape}")
    # Проверка: градиент должен быть ненулевым только в позициях максимумов
    non_zero_grads = grad_input_max[gmax_pool.max_mask]
    zero_grads = grad_input_max[~gmax_pool.max_mask]
    print(f"All zero-masked grads are zero: {np.all(zero_grads == 0)}")
    print(f"Number of non-zero grads: {len(non_zero_grads)}")


    print("\n--- GlobalAvgPool2d Test ---")
    gavg_pool = GlobalAvgPool2d()
    output_avg = gavg_pool.forward(input_data)
    print(f"Input shape: {input_data.shape}")
    # Ожидаемый выход: среднее по последним двум осям
    expected_output_avg = np.mean(input_data, axis=(2, 3))
    print(f"Output shape: {output_avg.shape}, Expected shape: {expected_output_avg.shape}")
    print(f"Output matches expected: {np.allclose(output_avg, expected_output_avg)}")
    # Тест градиента
    grad_output_avg = np.random.randn(*output_avg.shape)
    grad_input_avg = gavg_pool.backward(input_data, grad_output_avg)
    print(f"Gradient input shape: {grad_input_avg.shape}")
    # Проверка: градиент должен быть равен gradOutput / (H*W) для всех элементов
    expected_grad_val = grad_output_avg.reshape(batch_size, channels, 1, 1) / (height * width)
    print(f"Gradient input matches expected: {np.allclose(grad_input_avg, expected_grad_val)}")


test_global_pooling()

--- GlobalMaxPool2d Test ---
Input shape: (2, 3, 4, 5)
Output shape: (2, 3), Expected shape: (2, 3)
Output matches expected: True
Gradient input shape: (2, 3, 4, 5)
All zero-masked grads are zero: True
Number of non-zero grads: 6

--- GlobalAvgPool2d Test ---
Input shape: (2, 3, 4, 5)
Output shape: (2, 3), Expected shape: (2, 3)
Output matches expected: True
Gradient input shape: (2, 3, 4, 5)
Gradient input matches expected: True


# 9. (0.2) Implement [**Flatten**](https://pytorch.org/docs/stable/generated/torch.flatten.html)

In [14]:
class Flatten(Module):
    # Выпрямляем тензор
    def __init__(self):
        super(Flatten, self).__init__()
        self.input_shape = None

    def updateOutput(self, input):
        self.input_shape = input.shape
        batch_size = self.input_shape[0]
        # Произведение всех размерностей кроме первой
        num_features = np.prod(self.input_shape[1:])
        self.output = input.reshape(batch_size, num_features)
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Просто возвращаем исходную форму
        self.gradInput = gradOutput.reshape(self.input_shape)
        return self.gradInput

    def __repr__(self):
        return "Flatten"

# Activation functions

Here's the complete example for the **Rectified Linear Unit** non-linearity (aka **ReLU**):

In [15]:
class ReLU(Module):
    def __init__(self):
         super(ReLU, self).__init__()
         self.input_cache = None

    def updateOutput(self, input):
        self.input_cache = input
        self.output = np.maximum(input, 0)
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Градиент = gradOutput там где input > 0
        self.gradInput = np.multiply(gradOutput, self.input_cache > 0)
        return self.gradInput

    def __repr__(self):
        return "ReLU"

## 10. (0.1) Leaky ReLU
Implement [**Leaky Rectified Linear Unit**](http://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29%23Leaky_ReLUs). Expriment with slope.

In [16]:
class LeakyReLU(Module):
    def __init__(self, slope=0.01):
        super(LeakyReLU, self).__init__()
        self.slope = slope
        self.input_cache = None

    def updateOutput(self, input):
        self.input_cache = input
        self.output = np.maximum(input, self.slope * input)
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Градиент = 1 или slope
        grad_mask = np.where(self.input_cache > 0, 1.0, self.slope)
        self.gradInput = gradOutput * grad_mask
        return self.gradInput

    def __repr__(self):
        return f"LeakyReLU({self.slope})"

## 11. (0.1) ELU
Implement [**Exponential Linear Units**](http://arxiv.org/abs/1511.07289) activations.

In [17]:
class ELU(Module):
    def __init__(self, alpha=1.0):
        super(ELU, self).__init__()
        self.alpha = alpha
        self.input_cache = None
        self.exp_cache = None # кеш для exp(x) при x<0

    def updateOutput(self, input):
        self.input_cache = input
        positive_mask = input > 0
        negative_input = input[~positive_mask]
        self.exp_cache = np.exp(negative_input) # кешируем для backward

        self.output = np.empty_like(input)
        self.output[positive_mask] = input[positive_mask]
        self.output[~positive_mask] = self.alpha * (self.exp_cache - 1)
        return self.output

    def updateGradInput(self, input, gradOutput):
        positive_mask = self.input_cache > 0
        # Градиент = 1 или alpha * exp(x)
        grad_mask = np.ones_like(self.input_cache)
        if self.exp_cache is not None and np.any(~positive_mask):
             grad_mask[~positive_mask] = self.alpha * self.exp_cache
        elif np.any(~positive_mask): # на всякий случай если кеша нет
             grad_mask[~positive_mask] = self.alpha * np.exp(self.input_cache[~positive_mask])

        self.gradInput = gradOutput * grad_mask
        return self.gradInput

    def __repr__(self):
        return f"ELU({self.alpha})"

## 12. (0.1) SoftPlus
Implement [**SoftPlus**](https://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29) activations. Look, how they look a lot like ReLU.

In [18]:
class SoftPlus(Module):
    def __init__(self):
        super(SoftPlus, self).__init__()
        self.input_cache = None

    def updateOutput(self, input):
        self.input_cache = input
        # log(1 + exp(x))
        self.output = np.log1p(np.exp(input)) # log1p для точности
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Градиент = sigmoid(x)
        sigmoid_input = 1.0 / (1.0 + np.exp(-self.input_cache))
        self.gradInput = gradOutput * sigmoid_input
        return self.gradInput

    def __repr__(self):
        return "SoftPlus"

# 13. (0.2) Gelu
Implement [**Gelu**](https://pytorch.org/docs/stable/generated/torch.nn.GELU.html) activations.

In [19]:
class Gelu(Module):
    # Гелу активация
    def __init__(self):
        super(Gelu, self).__init__()
        self.input_cache = None
        self.cdf_cache = None
        self.pdf_cache = None

    def updateOutput(self, input):
        self.input_cache = input
        # Используем erf из scipy
        arg = input / np.sqrt(2.0)
        self.cdf_cache = 0.5 * (1.0 + special.erf(arg)) # CDF норм распред
        self.output = 0.5 * input * (1.0 + special.erf(arg))
        self.pdf_cache = np.exp(-0.5 * input**2) / np.sqrt(2.0 * np.pi) # PDF норм распред
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Используем кешированные cdf и pdf
        cdf = self.cdf_cache
        pdf = self.pdf_cache
        current_input = self.input_cache

        # Градиент гелу
        grad_gelu = cdf + current_input * pdf
        self.gradInput = gradOutput * grad_gelu
        return self.gradInput

    def __repr__(self):
        return "Gelu"

# Criterions

Criterions are used to score the models answers.

In [20]:
class Criterion(object):
    # Базовый класс для лоссов
    def __init__ (self):
        self.output = None
        self.gradInput = None

    def forward(self, input, target):
        return self.updateOutput(input, target)

    def backward(self, input, target):
        return self.updateGradInput(input, target)

    def updateOutput(self, input, target):
        # Вычисление лосса
        pass

    def updateGradInput(self, input, target):
        # Вычисление градиента лосса по input
        pass

    def __repr__(self):
        return "Criterion"

The **MSECriterion**, which is basic L2 norm usually used for regression, is implemented here for you.
- input:   **`batch_size x n_feats`**
- target: **`batch_size x n_feats`**
- output: **scalar**

In [21]:
class MSECriterion(Criterion):
    # Среднеквадратичная ошибка
    def __init__(self):
        super(MSECriterion, self).__init__()

    def updateOutput(self, input, target):
        batch_size = input.shape[0]
        self.output = np.sum(np.power(input - target, 2)) / batch_size
        return self.output

    def updateGradInput(self, input, target):
        batch_size = input.shape[0]
        self.gradInput = 2.0 * (input - target) / batch_size
        return self.gradInput

    def __repr__(self):
        return "MSECriterion"

## 14. (0.2) Negative LogLikelihood criterion (numerically unstable)
You task is to implement the **ClassNLLCriterion**. It should implement [multiclass log loss](http://scikit-learn.org/stable/modules/model_evaluation.html#log-loss). Nevertheless there is a sum over `y` (target) in that formula,
remember that targets are one-hot encoded. This fact simplifies the computations a lot. Note, that criterions are the only places, where you divide by batch size. Also there is a small hack with adding small number to probabilities to avoid computing log(0).
- input:   **`batch_size x n_feats`** - probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**



In [22]:
class ClassNLLCriterionUnstable(Criterion):
    EPS = 1e-15
    def __init__(self):
        super(ClassNLLCriterionUnstable, self).__init__()

    def updateOutput(self, input, target):
        batch_size = input.shape[0]
        # Обрезаем чтоб избежать log(0)
        input_clamp = np.clip(input, self.EPS, 1.0 - self.EPS)

        # Для one-hot target просто берем log вероятности нужного класса
        log_likelihood = np.sum(target * np.log(input_clamp), axis=1)
        self.output = -np.mean(log_likelihood)
        return self.output

    def updateGradInput(self, input, target):
        batch_size = input.shape[0]
        input_clamp = np.clip(input, self.EPS, 1.0 - self.EPS)
        # Градиент NLL
        self.gradInput = - target / (input_clamp * batch_size)
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterionUnstable"

## 15. (0.3) Negative LogLikelihood criterion (numerically stable)
- input:   **`batch_size x n_feats`** - log probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**

Task is similar to the previous one, but now the criterion input is the output of log-softmax layer. This decomposition allows us to avoid problems with computation of forward and backward of log().

In [23]:
class ClassNLLCriterion(Criterion):
    # NLL лосс для log-вероятностей (выход LogSoftMax), стабильный
    def __init__(self):
        super(ClassNLLCriterion, self).__init__()

    def updateOutput(self, input, target):
        batch_size = input.shape[0]
        # Аналогично нестабильному но работаем с логарифмами
        log_likelihood = np.sum(target * input, axis=1)
        self.output = -np.mean(log_likelihood)
        return self.output

    def updateGradInput(self, input, target):
        batch_size = input.shape[0]
        # Градиент тут проще
        self.gradInput = - target / batch_size
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterion"

1-я часть задания: реализация слоев, лосей и функций активации - 5 баллов. \\
2-я часть задания: реализация моделей на своих классах. Что должно быть:
  1. Выберите оптимизатор и реализуйте его, чтоб он работал с вами классами. - 1 балл.
  2. Модель для задачи мультирегрессии на выбраных вами данных. Использовать FCNN, dropout, batchnorm, MSE. Пробуйте различные фукнции активации. Для первой модели попробуйте большую, среднюю и маленькую модель. - 1 балл.
  3. Модель для задачи мультиклассификации на MNIST. Использовать свёртки, макспулы, флэттэны, софтмаксы - 1 балла.
  4. Автоэнкодер для выбранных вами данных. Должен быть на свёртках и полносвязных слоях, дропаутах, батчнормах и тд. - 2 балла. \\

Дополнительно в оценке каждой модели будет учитываться:
1. Наличие правильно выбранной метрики и лосс функции.
2. Отрисовка графиков лосей и метрик на трейне-валидации. Проверка качества модели на тесте.
3. Наличие шедулера для lr.
4. Наличие вормапа.
5. Наличие механизма ранней остановки и сохранение лучшей модели.
6. Свитч лося (метрики) и оптимайзера.