In [1]:
import numpy as np

In [2]:
#pip install torch

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network. You do not need to change anything here, just read the comments.

In [3]:
class Module(object):
    """
    Basically, you can think of a module as of a something (black box)
    which can process input data and produce ouput data.
    This is like applying a function which is called forward:

        output = module.forward(input)

    The module should be able to perform a backward pass: to differentiate the forward function.
    More, it should be able to differentiate it if is a part of chain (chain rule).
    The latter implies there is a gradient from previous step of a chain rule.

        gradInput = module.backward(input, gradOutput)
    """
    def init (self):
        self.output = None
        self.gradInput = None
        self.training = True

    def forward(self, input):
        """
        Takes an input object, and computes the corresponding output of the module.
        """
        return self.updateOutput(input)

    def backward(self,input, gradOutput):
        """
        Performs a backpropagation step through the module, with respect to the given input.

        This includes
         - computing a gradient w.r.t. input (is needed for further backprop),
         - computing a gradient w.r.t. parameters (to update parameters while optimizing).
        """
        self.updateGradInput(input, gradOutput)
        self.accGradParameters(input, gradOutput)
        return self.gradInput


    def updateOutput(self, input):
        """
        Computes the output using the current parameter set of the class and input.
        This function returns the result which is stored in the output field.

        Make sure to both store the data in output field and return it.
        """

        # The easiest case:

        # self.output = input
        # return self.output

        pass

    def updateGradInput(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own input.
        This is returned in gradInput. Also, the gradInput state variable is updated accordingly.

        The shape of gradInput is always the same as the shape of input.

        Make sure to both store the gradients in gradInput field and return it.
        """

        # The easiest case:

        # self.gradInput = gradOutput
        # return self.gradInput

        pass

    def accGradParameters(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own parameters.
        No need to override if module has no parameters (e.g. ReLU).
        """
        pass

    def zeroGradParameters(self):
        """
        Zeroes gradParams variable if the module has params.
        """
        pass

    def getParameters(self):
        """
        Returns a list with its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def getGradParameters(self):
        """
        Returns a list with gradients with respect to its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def train(self):
        """
        Sets training mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = True

    def evaluate(self):
        """
        Sets evaluation mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = False

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Module"

# Sequential container

**Define** a forward and backward pass procedures.

In [4]:
class Sequential(Module):
    def __init__(self):
        super(Sequential, self).__init__()
        self.modules = []
        self._cache = []
        self.output = None
        self.gradInput = None

    def add(self, module):
        if not isinstance(module, Module):
            raise TypeError("Expected module to be instance of Module")
        self.modules.append(module)
        return self

    def updateOutput(self, input):
        self._cache = [input]  # Store initial input
        x = input
        
        for i, module in enumerate(self.modules):
            x = module.updateOutput(x)
            self._cache.append(x)  # Store output of each layer
            
        self.output = x
        return self.output

    def backward(self, input, gradOutput):
        if len(self._cache) == 0:
            self.updateOutput(input)
            
        if len(self._cache) != len(self.modules) + 1:
            raise RuntimeError("Invalid cache state. Forward pass not executed properly.")
            
        grad = gradOutput.clone() if isinstance(gradOutput, torch.Tensor) else gradOutput.copy()
        
        # Backward through all layers
        for i in reversed(range(len(self.modules))):
            module = self.modules[i]
            x = self._cache[i]  # Get stored input
            grad = module.backward(x, grad)
            
            # Special handling for BatchNorm-like layers
            if hasattr(module, 'accGradParameters'):
                module.accGradParameters(x, grad)
        
        self.gradInput = grad
        return self.gradInput

    def zeroGradParameters(self):
        for module in self.modules:
            module.zeroGradParameters()

    def getParameters(self):
        params = []
        for module in self.modules:
            if hasattr(module, 'getParameters'):
                p = module.getParameters()
                if p is not None:
                    params.extend(p) if isinstance(p, (list, tuple)) else params.append(p)
        return params

    def getGradParameters(self):
        grads = []
        for module in self.modules:
            if hasattr(module, 'getGradParameters'):
                g = module.getGradParameters()
                if g is not None:
                    grads.extend(g) if isinstance(g, (list, tuple)) else grads.append(g)
        return grads

    def __repr__(self):
        return f"Sequential(\n  " + "\n  ".join(str(m) for m in self.modules) + "\n)"

    def __getitem__(self, idx):
        return self.modules[idx]

    def train(self):
        self.training = True
        for module in self.modules:
            module.train()

    def evaluate(self):
        self.training = False
        for module in self.modules:
            module.evaluate()

# Layers

## 1 (0.2). Linear transform layer
Also known as dense layer, fully-connected layer, FC-layer, InnerProductLayer (in caffe), affine transform
- input:   **`batch_size x n_feats1`**
- output: **`batch_size x n_feats2`**

In [5]:

class Linear:
    """
    A fully-connected (linear) layer that applies y = xW^T + b
    """
    def __init__(self, n_in, n_out):
        super(Linear, self).__init__()
        
        # Xavier/Glorot initialization with correct scale
        stdv = np.sqrt(6.0 / (n_in + n_out))
        self.W = np.random.uniform(-stdv, stdv, size=(n_out, n_in))
        self.b = np.random.uniform(-stdv, stdv, size=n_out)
        
        # Gradients
        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)
        
        # Cache
        self.input = None
        self.output = None
        self.gradInput = None

    def updateOutput(self, input):
        self.input = input
        self.output = np.dot(input, self.W.T) + self.b
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.dot(gradOutput, self.W)
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        # Correct gradient accumulation for weights
        self.gradW += np.dot(gradOutput.T, input)
        
        # Correct gradient accumulation for bias
        self.gradb += np.sum(gradOutput, axis=0)

    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b]

    def getGradParameters(self):
        return [self.gradW, self.gradb]

    def __repr__(self):
        return f'Linear {self.W.shape[1]} -> {self.W.shape[0]}'

## 2. (0.2) SoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{softmax}(x)_i = \frac{\exp x_i} {\sum_j \exp x_j}$

Recall that $\text{softmax}(x) == \text{softmax}(x - \text{const})$. It makes possible to avoid computing exp() from large argument.

In [6]:
class SoftMax(Module):
    def __init__(self):
        super(SoftMax, self).__init__()
        self.output = None
        self.gradInput = None

    def updateOutput(self, input):
        """
        Forward pass: softmax(x_i) = exp(x_i) / sum(exp(x_j))
        with numerical stability optimization
        """
        # Subtract max for numerical stability (doesn't change output)
        shifted_input = input - np.max(input, axis=1, keepdims=True)
        exp_values = np.exp(shifted_input)
        self.output = exp_values / np.sum(exp_values, axis=1, keepdims=True)
        return self.output

    def updateGradInput(self, input, gradOutput):
        """
        Backward pass:
        gradInput_i = sum_j (gradOutput_j * (delta_ij * softmax_j - softmax_i * softmax_j))
        where delta_ij is Kronecker delta
        """
        if self.gradInput is None:
            self.gradInput = np.zeros_like(input)
            
        batch_size = input.shape[0]
        
        for i in range(batch_size):
            # Reshape softmax output to column vector
            s = self.output[i].reshape(-1, 1)
            # Compute Jacobian matrix: J = diag(s) - s*s.T
            jacobian = np.diagflat(s) - np.dot(s, s.T)
            # Compute gradient: gradInput = gradOutput * Jacobian
            self.gradInput[i] = np.dot(gradOutput[i], jacobian)
            
        return self.gradInput

    def __repr__(self):
        return "SoftMax"

## 3. (0.2) LogSoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{logsoftmax}(x)_i = \log\text{softmax}(x)_i = x_i - \log {\sum_j \exp x_j}$

The main goal of this layer is to be used in computation of log-likelihood loss.

In [7]:
class LogSoftMax(Module):
    def __init__(self):
        super(LogSoftMax, self).__init__()
        self.output = None
        self.gradInput = None
        self.softmax = None

    def updateOutput(self, input):
        # Numerically stable log-softmax
        max_val = np.max(input, axis=1, keepdims=True)
        log_sum_exp = np.log(np.sum(np.exp(input - max_val), axis=1, keepdims=True)) + max_val
        self.output = input - log_sum_exp
        
        # Save softmax for backward pass
        self.softmax = np.exp(self.output)
        return self.output

    def updateGradInput(self, input, gradOutput):
        if self.gradInput is None:
            self.gradInput = np.zeros_like(input)
        
        # Compute gradient using saved softmax
        grad_sum = np.sum(gradOutput, axis=1, keepdims=True)
        self.gradInput = gradOutput - self.softmax * grad_sum
        
        return self.gradInput

    def __repr__(self):
        return "LogSoftMax"

## 4. (0.3) Batch normalization
One of the most significant recent ideas that impacted NNs a lot is [**Batch normalization**](http://arxiv.org/abs/1502.03167). The idea is simple, yet effective: the features should be whitened ($mean = 0$, $std = 1$) all the way through NN. This improves the convergence for deep models letting it train them for days but not weeks. **You are** to implement the first part of the layer: features normalization. The second part (`ChannelwiseScaling` layer) is implemented below.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer should work as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
where $\mu$ and $\sigma$ - mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer should maintain exponential moving average values for mean and variance:
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance.

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

In [8]:
import numpy as np

class BatchNormalization(Module):
    EPS = 1e-3
    
    def __init__(self, alpha=0.):
        super(BatchNormalization, self).__init__()
        self.alpha = alpha
        self.moving_mean = None
        self.moving_variance = None
        self.output = None
        self.gradInput = None
        self.batch_mean = None
        self.batch_var = None
        self.normalized = None

    def updateOutput(self, input):
        if self.moving_mean is None:
            # Initialize moving statistics with proper shape
            self.moving_mean = np.zeros(input.shape[1])
            self.moving_variance = np.ones(input.shape[1])
        
        if self.training:
            # Training mode - use batch statistics
            self.batch_mean = np.mean(input, axis=0)
            self.batch_var = np.var(input, axis=0)
            
            # Update moving statistics
            self.moving_mean = self.alpha * self.moving_mean + (1 - self.alpha) * self.batch_mean
            self.moving_variance = self.alpha * self.moving_variance + (1 - self.alpha) * self.batch_var
            
            # Normalize using batch statistics
            self.normalized = (input - self.batch_mean) / np.sqrt(self.batch_var + self.EPS)
        else:
            # Inference mode - use moving statistics
            self.normalized = (input - self.moving_mean) / np.sqrt(self.moving_variance + self.EPS)
        
        self.output = self.normalized
        return self.output

    def updateGradInput(self, input, gradOutput):
        if self.training:
            # Gradient of batch normalization during training
            N = input.shape[0]
            std_inv = 1.0 / np.sqrt(self.batch_var + self.EPS)
            
            # Compute gradient
            dx_hat = gradOutput
            dvar = np.sum(dx_hat * (input - self.batch_mean) * (-0.5) * (self.batch_var + self.EPS)**(-1.5), axis=0)
            dmean = np.sum(dx_hat * (-std_inv), axis=0) + dvar * np.mean(-2.0 * (input - self.batch_mean), axis=0)
            
            self.gradInput = (dx_hat * std_inv) + (dvar * 2 * (input - self.batch_mean) / N) + (dmean / N)
        else:
            # Gradient during inference is simpler
            std_inv = 1.0 / np.sqrt(self.moving_variance + self.EPS)
            self.gradInput = gradOutput * std_inv
            
        return self.gradInput

    def __repr__(self):
        return "BatchNormalization"

In [9]:
class ChannelwiseScaling(Module):
    """
       Implements linear transform of input y = \gamma * x + \beta
       where \gamma, \beta - learnable vectors of length x.shape[-1]
    """
    def __init__(self, n_out):
        super(ChannelwiseScaling, self).__init__()

        stdv = 1./np.sqrt(n_out)
        self.gamma = np.random.uniform(-stdv, stdv, size=n_out)
        self.beta = np.random.uniform(-stdv, stdv, size=n_out)

        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

    def updateOutput(self, input):
        self.output = input * self.gamma + self.beta
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput * self.gamma
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        self.gradBeta = np.sum(gradOutput, axis=0)
        self.gradGamma = np.sum(gradOutput*input, axis=0)

    def zeroGradParameters(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)

    def getParameters(self):
        return [self.gamma, self.beta]

    def getGradParameters(self):
        return [self.gradGamma, self.gradBeta]

    def __repr__(self):
        return "ChannelwiseScaling"

Practical notes. If BatchNormalization is placed after a linear transformation layer (including dense layer, convolutions, channelwise scaling) that implements function like `y = weight * x + bias`, than bias adding become useless and could be omitted since its effect will be discarded while batch mean subtraction. If BatchNormalization (followed by `ChannelwiseScaling`) is placed before a layer that propagates scale (including ReLU, LeakyReLU) followed by any linear transformation layer than parameter `gamma` in `ChannelwiseScaling` could be freezed since it could be absorbed into the linear transformation layer.

## 5. (0.3) Dropout
Implement [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it should sample a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module should implement identity transform i.e. `self.output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

In [10]:
class Dropout(Module):
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        self.p = p
        self.mask = None
        self.output = None
        self.gradInput = None
        
    def updateOutput(self, input):
        if self.training:
            # Generate random mask during training
            self.mask = (np.random.rand(*input.shape) > self.p) / (1 - self.p)
            self.output = input * self.mask
        else:
            # Identity transform during inference
            self.output = input
        return self.output
    
    def updateGradInput(self, input, gradOutput):
        if self.training:
            # Only propagate gradients through active units
            self.gradInput = gradOutput * self.mask
        else:
            # Identity transform during inference
            self.gradInput = gradOutput
        return self.gradInput
    
    def __repr__(self):
        return f"Dropout(p={self.p})"

#6. (2.0) Conv2d
Implement [**Conv2d**](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html). Use only this list of parameters: (in_channels, out_channels, kernel_size, stride, padding, bias, padding_mode) and fix dilation=1 and groups=1.

In [11]:
from scipy.signal import correlate2d, convolve2d

class Conv2d(Module):
    def __init__(self, in_channels, out_channels, kernel_size,
                 stride=1, padding=0, bias=True, padding_mode='zeros'):
        super(Conv2d, self).__init__()
        
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.stride = stride if isinstance(stride, tuple) else (stride, stride)
        self.padding = padding if isinstance(padding, tuple) else (padding, padding)
        self.bias = bias
        self.padding_mode = padding_mode
        
        # Initialize weights and bias
        stdv = 1. / np.sqrt(self.in_channels * self.kernel_size[0] * self.kernel_size[1])
        self.weight = np.random.uniform(-stdv, stdv, 
                                      (self.out_channels, self.in_channels, 
                                       self.kernel_size[0], self.kernel_size[1]))
        if self.bias:
            self.bias = np.random.uniform(-stdv, stdv, self.out_channels)
        else:
            self.bias = None
            
        # Gradients
        self.gradWeight = np.zeros_like(self.weight)
        if self.bias is not None:
            self.gradBias = np.zeros_like(self.bias)
        
        # Cache for backward pass
        self.input_shape = None
        self.padded_input = None

    def _pad_input(self, input):
        """Apply padding to input according to padding_mode"""
        if self.padding_mode == 'zeros':
            return np.pad(input, 
                        ((0, 0), (0, 0), 
                         (self.padding[0], self.padding[0]), 
                         (self.padding[1], self.padding[1])), 
                        mode='constant')
        else:
            raise NotImplementedError(f"Padding mode {self.padding_mode} not implemented")

    def updateOutput(self, input):
        batch_size, in_channels, in_height, in_width = input.shape
        out_height = (in_height + 2*self.padding[0] - self.kernel_size[0]) // self.stride[0] + 1
        out_width = (in_width + 2*self.padding[1] - self.kernel_size[1]) // self.stride[1] + 1
        
        # Apply padding
        padded_input = self._pad_input(input)
        self.padded_input = padded_input  # Cache for backward pass
        self.input_shape = input.shape    # Cache original shape
        
        # Initialize output
        self.output = np.zeros((batch_size, self.out_channels, out_height, out_width))
        
        # Perform convolution for each sample in batch
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for ic in range(self.in_channels):
                    # 2D cross-correlation (equivalent to convolution with flipped kernel)
                    self.output[b, oc] += correlate2d(
                        padded_input[b, ic],
                        self.weight[oc, ic],
                        mode='valid'
                    )[::self.stride[0], ::self.stride[1]]
                
                if self.bias is not None:
                    self.output[b, oc] += self.bias[oc]
        
        return self.output

    def updateGradInput(self, input, gradOutput):
        batch_size, in_channels, in_height, in_width = input.shape
        self.gradInput = np.zeros_like(input)
        
        # Pad gradient output to match input size after convolution
        grad_padded = np.zeros_like(self.padded_input)
        
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for ic in range(self.in_channels):
                    # Create full gradient output with stride positions filled
                    full_grad = np.zeros((
                        (gradOutput.shape[2]-1)*self.stride[0] + 1,
                        (gradOutput.shape[3]-1)*self.stride[1] + 1
                    ))
                    full_grad[::self.stride[0], ::self.stride[1]] = gradOutput[b, oc]
                    
                    # Convolve with flipped kernel to get input gradient
                    grad_padded[b, ic] += convolve2d(
                        full_grad,
                        self.weight[oc, ic],
                        mode='full'
                    )
        
        # Remove padding to get correct gradient shape
        if self.padding[0] > 0 or self.padding[1] > 0:
            self.gradInput = grad_padded[
                :, :, 
                self.padding[0]:-self.padding[0] if self.padding[0] > 0 else None,
                self.padding[1]:-self.padding[1] if self.padding[1] > 0 else None
            ]
        else:
            self.gradInput = grad_padded
            
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        batch_size = input.shape[0]
        
        for oc in range(self.out_channels):
            for ic in range(self.in_channels):
                for b in range(batch_size):
                    # Compute gradient for weights
                    self.gradWeight[oc, ic] += correlate2d(
                        self.padded_input[b, ic],
                        gradOutput[b, oc],
                        mode='valid'
                    )
            
            if self.bias is not None:
                # Compute gradient for bias
                self.gradBias[oc] += np.sum(gradOutput[b, oc])

    def zeroGradParameters(self):
        self.gradWeight.fill(0)
        if self.bias is not None:
            self.gradBias.fill(0)

    def getParameters(self):
        return [self.weight, self.bias] if self.bias is not None else [self.weight]

    def getGradParameters(self):
        return [self.gradWeight, self.gradBias] if self.bias is not None else [self.gradWeight]

    def __repr__(self):
        return (f"Conv2d({self.in_channels}, {self.out_channels}, kernel_size={self.kernel_size}, "
                f"stride={self.stride}, padding={self.padding}, bias={self.bias is not None})")

#7. (0.5) Implement [**MaxPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html) and [**AvgPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.AvgPool2d.html). Use only parameters like kernel_size, stride, padding (negative infinity for maxpool and zero for avgpool) and other parameters fixed as in framework.

In [12]:
import torch
import torch.nn.functional as F
import numpy as np

class MaxPool2d(Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super(MaxPool2d, self).__init__()
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = stride if stride is not None else kernel_size
        self.stride = (self.stride, self.stride) if isinstance(self.stride, int) else self.stride
        self.padding = (padding, padding) if isinstance(padding, int) else padding
        self.indices = None
        self.input_shape = None

    def updateOutput(self, input):
        # Сохраняем оригинальную форму входа
        self.input_shape = input.shape
        
        # Конвертируем в torch tensor с явным указанием float32
        input_tensor = torch.from_numpy(input.astype(np.float32))
        
        # Применяем padding с -inf для max pooling
        if self.padding != (0, 0):
            input_tensor = F.pad(input_tensor, 
                               (self.padding[1], self.padding[1],  # left, right
                               self.padding[0], self.padding[0]),  # top, bottom
                               mode='constant', value=-float('inf'))
        
        # Выполняем max pooling с сохранением индексов
        output, self.indices = F.max_pool2d_with_indices(
            input_tensor,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=0
        )
        
        # Конвертируем результат обратно в numpy
        self.output = output.numpy()
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Конвертируем градиенты в torch tensor
        grad_output_tensor = torch.from_numpy(gradOutput.astype(np.float32))
        
        # Создаем тензор для градиентов с учетом padding
        padded_shape = (
            self.input_shape[0],  # batch
            self.input_shape[1],  # channels
            self.input_shape[2] + 2*self.padding[0],  # height
            self.input_shape[3] + 2*self.padding[1]   # width
        )
        grad_input_padded = torch.zeros(padded_shape, dtype=torch.float32)
        
        # Распределяем градиенты только в позиции максимумов
        grad_input_padded.view(-1)[self.indices.view(-1)] = grad_output_tensor.contiguous().view(-1)
        
        # Удаляем padding если он был добавлен
        if self.padding != (0, 0):
            self.gradInput = grad_input_padded[
                :, :,  # batch и channels
                self.padding[0]:-self.padding[0] if self.padding[0] > 0 else None,  # height
                self.padding[1]:-self.padding[1] if self.padding[1] > 0 else None   # width
            ].numpy()
        else:
            self.gradInput = grad_input_padded.numpy()
            
        return self.gradInput

    def __repr__(self):
        return f"MaxPool2d(kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding})"

class AvgPool2d(Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super(AvgPool2d, self).__init__()
        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = stride if stride is not None else kernel_size
        self.stride = (self.stride, self.stride) if isinstance(self.stride, int) else self.stride
        self.padding = (padding, padding) if isinstance(padding, int) else padding
        self.input_shape = None

    def updateOutput(self, input):
        self.input_shape = input.shape
        input_tensor = torch.from_numpy(input)
        
        # Apply zero padding if needed
        if any(p > 0 for p in self.padding):
            input_tensor = F.pad(input_tensor, 
                               (self.padding[1], self.padding[1], 
                                self.padding[0], self.padding[0]),
                               mode='constant', value=0)
        
        # Perform average pooling
        self.output = F.avg_pool2d(
            input_tensor,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=0,
            count_include_pad=False
        ).numpy()
        
        return self.output

    def updateGradInput(self, input, gradOutput):
        grad_output_tensor = torch.from_numpy(gradOutput)
        
        # Initialize gradient tensor with padding
        padded_shape = (
            self.input_shape[0],
            self.input_shape[1],
            self.input_shape[2] + 2*self.padding[0],
            self.input_shape[3] + 2*self.padding[1]
        )
        grad_input_padded = torch.zeros(padded_shape, dtype=torch.float32)
        
        kh, kw = self.kernel_size
        sh, sw = self.stride
        
        # Distribute gradients evenly across pooling windows
        for i in range(gradOutput.shape[2]):
            for j in range(gradOutput.shape[3]):
                h_start = i * sh
                w_start = j * sw
                window_grad = grad_output_tensor[:, :, i:i+1, j:j+1] / (kh * kw)
                grad_input_padded[:, :, h_start:h_start+kh, w_start:w_start+kw] += window_grad
        
        # Remove padding if needed
        if any(p > 0 for p in self.padding):
            self.gradInput = grad_input_padded[
                :, :,
                self.padding[0]:grad_input_padded.size(2)-self.padding[0],
                self.padding[1]:grad_input_padded.size(3)-self.padding[1]
            ].numpy()
        else:
            self.gradInput = grad_input_padded.numpy()
            
        return self.gradInput

    def __repr__(self):
        return f"AvgPool2d(kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding})"

#8. (0.3) Implement **GlobalMaxPool2d** and **GlobalAvgPool2d**. They do not have testing and parameters are up to you but they must aggregate information within channels. Write test functions for these layers on your own.

In [13]:
import torch
import torch.nn as nn

class GlobalMaxPool2d(nn.Module):
    """
    Global Max Pooling layer that reduces each channel to its maximum value
    by taking the maximum over all spatial dimensions (height and width).
    Input shape: (batch_size, channels, height, width)
    Output shape: (batch_size, channels, 1, 1)
    """
    def __init__(self):
        super(GlobalMaxPool2d, self).__init__()
        
    def forward(self, x):
        # Max over height and width dimensions (dim 2 and 3)
        return torch.max(x, dim=2, keepdim=True)[0].max(dim=3, keepdim=True)[0]

class GlobalAvgPool2d(nn.Module):
    """
    Global Average Pooling layer that reduces each channel to its average value
    by averaging over all spatial dimensions (height and width).
    Input shape: (batch_size, channels, height, width)
    Output shape: (batch_size, channels, 1, 1)
    """
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()
        
    def forward(self, x):
        # Mean over height and width dimensions (dim 2 and 3)
        return torch.mean(x, dim=[2, 3], keepdim=True)
    
def test_global_max_pool2d():
    """Test function for GlobalMaxPool2d"""
    # Create test input: batch_size=2, channels=3, height=4, width=4
    input_tensor = torch.randn(2, 3, 4, 4)
    
    # Manually compute expected output
    expected_output = input_tensor.max(dim=3)[0].max(dim=2, keepdim=True)[0]
    expected_output = expected_output.unsqueeze(-1)  # Add width dimension
    
    # Create layer and compute output
    layer = GlobalMaxPool2d()
    output = layer(input_tensor)
    
    # Check shapes
    assert output.shape == (2, 3, 1, 1), f"Shape mismatch: {output.shape} != (2, 3, 1, 1)"
    
    # Check values
    assert torch.allclose(output, expected_output), "Output values don't match expected"
    
    print("GlobalMaxPool2d test passed!")

def test_global_avg_pool2d():
    """Test function for GlobalAvgPool2d"""
    # Create test input: batch_size=2, channels=3, height=4, width=4
    input_tensor = torch.randn(2, 3, 4, 4)
    
    # Manually compute expected output
    expected_output = input_tensor.mean(dim=[2, 3], keepdim=True)
    
    # Create layer and compute output
    layer = GlobalAvgPool2d()
    output = layer(input_tensor)
    
    # Check shapes
    assert output.shape == (2, 3, 1, 1), f"Shape mismatch: {output.shape} != (2, 3, 1, 1)"
    
    # Check values
    assert torch.allclose(output, expected_output), "Output values don't match expected"
    
    print("GlobalAvgPool2d test passed!")

# Run the tests
if __name__ == "__main__":
    test_global_max_pool2d()
    test_global_avg_pool2d()    

GlobalMaxPool2d test passed!
GlobalAvgPool2d test passed!


#9. (0.2) Implement [**Flatten**](https://pytorch.org/docs/stable/generated/torch.flatten.html)

In [14]:
import torch
import torch.nn as nn

class Flatten(nn.Module):
    def __init__(self, start_dim=0, end_dim=-1):
        """
        Flattens a tensor between specified dimensions.
        
        Args:
            start_dim: first dimension to flatten (default: 0)
            end_dim: last dimension to flatten (default: -1)
        """
        super(Flatten, self).__init__()
        self.start_dim = start_dim
        self.end_dim = end_dim

    def forward(self, input):
        # Handle negative indices
        start_dim = self.start_dim if self.start_dim >= 0 else input.dim() + self.start_dim
        end_dim = self.end_dim if self.end_dim >= 0 else input.dim() + self.end_dim
        
        # If start_dim > end_dim, return the input unchanged
        if start_dim > end_dim:
            return input
        
        # Calculate the new shape
        input_size = list(input.size())
        flattened_size = 1
        for i in range(start_dim, end_dim + 1):
            flattened_size *= input_size[i]
        
        new_shape = (
            input_size[:start_dim] + 
            [flattened_size] + 
            input_size[end_dim + 1:]
        )
        
        return input.view(new_shape)

    def __repr__(self):
        return f"Flatten(start_dim={self.start_dim}, end_dim={self.end_dim})"


class Flatten:
    def __init__(self, start_dim=0, end_dim=-1):
        self.start_dim = start_dim
        self.end_dim = end_dim
        
    def updateOutput(self, input):
        input_shape = np.array(input.shape)
        ndims = len(input_shape)
        
        # Обработка отрицательных индексов
        start_dim = self.start_dim if self.start_dim >= 0 else ndims + self.start_dim
        end_dim = self.end_dim if self.end_dim >= 0 else ndims + self.end_dim
        
        # Корректировка границ
        start_dim = max(0, min(start_dim, ndims-1))
        end_dim = max(0, min(end_dim, ndims-1))
        
        if start_dim > end_dim:
            self.output = input.copy()
            self.original_shape = input.shape
            return self.output
        
        # Вычисление новой формы
        flattened_size = np.prod(input_shape[start_dim:end_dim+1])
        new_shape = (
            list(input_shape[:start_dim]) + 
            [int(flattened_size)] + 
            list(input_shape[end_dim+1:])
        )
        
        self.output = input.reshape(new_shape)
        self.original_shape = input.shape
        return self.output
    
    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput.reshape(self.original_shape)
        return self.gradInput
    
    def __repr__(self):
        return f"Flatten(start_dim={self.start_dim}, end_dim={self.end_dim})"

# Activation functions

Here's the complete example for the **Rectified Linear Unit** non-linearity (aka **ReLU**):

In [15]:
class ReLU(Module):
    def __init__(self):
         super(ReLU, self).__init__()

    def updateOutput(self, input):
        self.output = np.maximum(input, 0)
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.multiply(gradOutput , input > 0)
        return self.gradInput

    def __repr__(self):
        return "ReLU"

## 10. (0.1) Leaky ReLU
Implement [**Leaky Rectified Linear Unit**](http://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29%23Leaky_ReLUs). Expriment with slope.

In [16]:
class LeakyReLU:
    def __init__(self, slope=0.03):
        super(LeakyReLU, self).__init__()
        self.slope = slope
        
    def updateOutput(self, input):
        self.output = np.where(input > 0, input, input * self.slope)
        self.input = input  # Store input for backward pass
        return self.output
    
    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput * np.where(input > 0, 1.0, self.slope)
        return self.gradInput
    
    def __repr__(self):
        return "LeakyReLU"

## 11. (0.1) ELU
Implement [**Exponential Linear Units**](http://arxiv.org/abs/1511.07289) activations.

In [17]:
import numpy as np

class ELU:
    def __init__(self, alpha=1.0):
        super(ELU, self).__init__()
        self.alpha = alpha
        
    def updateOutput(self, input):
        self.output = np.where(input > 0, input, self.alpha * (np.exp(input) - 1))
        self.input = input  # Store input for backward pass
        return self.output
    
    def updateGradInput(self, input, gradOutput):
        # Gradient is 1 for positive inputs, alpha*exp(input) for negative inputs
        grad = np.where(input > 0, 1.0, self.output + self.alpha)
        self.gradInput = gradOutput * grad
        return self.gradInput
    
    def __repr__(self):
        return "ELU"

## 12. (0.1) SoftPlus
Implement [**SoftPlus**](https://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29) activations. Look, how they look a lot like ReLU.

In [18]:
class SoftPlus:
    def __init__(self):
        self.output = None
        self.gradInput = None
        self.input = None
        
    def updateOutput(self, input):
        # Exact PyTorch equivalent implementation
        self.input = input.copy()
        # log(1 + exp(-|x|)) + max(x,0)
        self.output = np.where(input > 20,  # For large positive values
                          input,
                          np.where(input < -20,  # For large negative values
                                   np.exp(input),
                                   np.log1p(np.exp(input))))
        return self.output
    
    def updateGradInput(self, input, gradOutput):
        # Exact gradient computation matching PyTorch
        exp_input = np.exp(input)
        sigmoid = exp_input / (1. + exp_input)
        self.gradInput = gradOutput * sigmoid
        return self.gradInput
    
    def __repr__(self):
        return "SoftPlus"

#13. (0.2) Gelu
Implement [**Gelu**](https://pytorch.org/docs/stable/generated/torch.nn.GELU.html) activations.

In [19]:
import numpy as np
import torch
from torch.nn import Module

class Gelu(Module):
    def init(self):
        super(Gelu, self).init()  # Fixed super() call - was SoftPlus before
        
    def updateOutput(self, input):
        # Convert numpy array to torch tensor if needed
        if isinstance(input, np.ndarray):
            input_tensor = torch.from_numpy(input)
        else:
            input_tensor = input
            
        # GELU approximation formula used by PyTorch
        self.output = input_tensor * 0.5 * (1.0 + torch.erf(input_tensor / np.sqrt(2.0)))
        
        # Convert back to numpy if input was numpy
        if isinstance(input, np.ndarray):
            return self.output.numpy()
        return self.output
        
    def updateGradInput(self, input, gradOutput):
        # Convert numpy arrays to torch tensors if needed
        if isinstance(input, np.ndarray):
            input_tensor = torch.from_numpy(input)
        else:
            input_tensor = input
            
        if isinstance(gradOutput, np.ndarray):
            gradOutput_tensor = torch.from_numpy(gradOutput)
        else:
            gradOutput_tensor = gradOutput
        
        # Derivative of GELU
        x = input_tensor
        cdf = 0.5 * (1.0 + torch.erf(x / np.sqrt(2.0)))
        pdf = torch.exp(-0.5 * x**2) / np.sqrt(2.0 * np.pi)
        derivative = cdf + x * pdf
        
        self.gradInput = gradOutput_tensor * derivative
        
        # Convert back to numpy if input was numpy
        if isinstance(input, np.ndarray):
            return self.gradInput.numpy()
        return self.gradInput
        
    def repr(self):
        return "Gelu"

# Criterions

Criterions are used to score the models answers.

In [20]:
class Criterion(object):
    def __init__ (self):
        self.output = None
        self.gradInput = None

    def forward(self, input, target):
        """
            Given an input and a target, compute the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateOutput`.
        """
        return self.updateOutput(input, target)

    def backward(self, input, target):
        """
            Given an input and a target, compute the gradients of the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateGradInput`.
        """
        return self.updateGradInput(input, target)

    def updateOutput(self, input, target):
        """
        Function to override.
        """
        return self.output

    def updateGradInput(self, input, target):
        """
        Function to override.
        """
        return self.gradInput

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Criterion"

The **MSECriterion**, which is basic L2 norm usually used for regression, is implemented here for you.
- input:   **`batch_size x n_feats`**
- target: **`batch_size x n_feats`**
- output: **scalar**

In [21]:
class MSECriterion(Criterion):
    def __init__(self):
        super(MSECriterion, self).__init__()

    def updateOutput(self, input, target):
        self.output = np.sum(np.power(input - target,2)) / input.shape[0]
        return self.output

    def updateGradInput(self, input, target):
        self.gradInput  = (input - target) * 2 / input.shape[0]
        return self.gradInput

    def __repr__(self):
        return "MSECriterion"

## 14. (0.2) Negative LogLikelihood criterion (numerically unstable)
You task is to implement the **ClassNLLCriterion**. It should implement [multiclass log loss](http://scikit-learn.org/stable/modules/model_evaluation.html#log-loss). Nevertheless there is a sum over `y` (target) in that formula,
remember that targets are one-hot encoded. This fact simplifies the computations a lot. Note, that criterions are the only places, where you divide by batch size. Also there is a small hack with adding small number to probabilities to avoid computing log(0).
- input:   **`batch_size x n_feats`** - probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**



In [22]:
 class ClassNLLCriterionUnstable(Criterion):
    EPS = 1e-15
    
    def __init__(self):
        super(ClassNLLCriterionUnstable, self).__init__()
    
    def updateOutput(self, input, target):
        # Input should be probabilities (not log probabilities)
        # Stabilize probabilities to avoid log(0)
        input_clamp = np.clip(input, self.EPS, 1 - self.EPS)
        
        # Compute log probabilities
        log_probs = np.log(input_clamp)
        
        # Get target indices (from one-hot)
        target_indices = np.argmax(target, axis=1)
        
        # Compute loss only for target classes
        self.output = -np.mean(log_probs[np.arange(input.shape[0]), target_indices])
        
        # Save for backward pass
        self.input_clamp = input_clamp
        self.target_indices = target_indices
        self.batch_size = input.shape[0]
        
        return self.output
    
    def updateGradInput(self, input, target):
        self.gradInput = np.zeros_like(input)
        
        # Gradient is -1/(p_i * batch_size) for target class, 0 otherwise
        self.gradInput[np.arange(self.batch_size), self.target_indices] = (
            -1.0 / (self.input_clamp[np.arange(self.batch_size), self.target_indices] * self.batch_size))
        
        return self.gradInput
    
    def __repr__(self):
        return "ClassNLLCriterionUnstable"

## 15. (0.3) Negative LogLikelihood criterion (numerically stable)
- input:   **`batch_size x n_feats`** - log probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**

Task is similar to the previous one, but now the criterion input is the output of log-softmax layer. This decomposition allows us to avoid problems with computation of forward and backward of log().

In [23]:
class ClassNLLCriterion(Criterion):
    def __init__(self):
        super(ClassNLLCriterion, self).__init__()
        
    def updateOutput(self, input, target):
        # Для one-hot targets можно использовать прямое умножение
        self.output = -np.sum(input * target) / input.shape[0]
        
        # Альтернативная реализация через индексы:
        # target_indices = np.argmax(target, axis=1)
        # self.output = -np.mean(input[np.arange(input.shape[0]), target_indices])
        
        # Сохраняем для обратного прохода
        self.target = target
        return self.output
    
    def updateGradInput(self, input, target):
        # Градиент для one-hot представления
        self.gradInput = -target / input.shape[0]
        
        # Альтернативная реализация через индексы:
        # self.gradInput = np.zeros_like(input)
        # self.gradInput[np.arange(input.shape[0]), np.argmax(target, axis=1)] = -1.0 / input.shape[0]
        
        return self.gradInput
    
    def __repr__(self):
        return "ClassNLLCriterion"

1-я часть задания: реализация слоев, лосей и функций активации - 5 баллов. \\
2-я часть задания: реализация моделей на своих классах. Что должно быть:
  1. Выберите оптимизатор и реализуйте его, чтоб он работал с вами классами. - 1 балл.
  2. Модель для задачи мультирегрессии на выбраных вами данных. Использовать FCNN, dropout, batchnorm, MSE. Пробуйте различные фукнции активации. Для первой модели попробуйте большую, среднюю и маленькую модель. - 1 балл.
  3. Модель для задачи мультиклассификации на MNIST. Использовать свёртки, макспулы, флэттэны, софтмаксы - 1 балла.
  4. Автоэнкодер для выбранных вами данных. Должен быть на свёртках и полносвязных слоях, дропаутах, батчнормах и тд. - 2 балла. \\

Дополнительно в оценке каждой модели будет учитываться:
1. Наличие правильно выбранной метрики и лосс функции.
2. Отрисовка графиков лосей и метрик на трейне-валидации. Проверка качества модели на тесте.
3. Наличие шедулера для lr.
4. Наличие вормапа.
5. Наличие механизма ранней остановки и сохранение лучшей модели.
6. Свитч лося (метрики) и оптимайзера.