In [1]:
import numpy as np

**Module** is an abstract class which defines fundamental methods necessary for a training a neural network. You do not need to change anything here, just read the comments.

In [3]:
class Module(object):
    """
    Basically, you can think of a module as of a something (black box)
    which can process `input` data and produce `ouput` data.
    This is like applying a function which is called `forward`:

        output = module.forward(input)

    The module should be able to perform a backward pass: to differentiate the `forward` function.
    More, it should be able to differentiate it if is a part of chain (chain rule).
    The latter implies there is a gradient from previous step of a chain rule.

        gradInput = module.backward(input, gradOutput)
    """
    def __init__ (self):
        self.output = None
        self.gradInput = None
        self.training = True

    def forward(self, input):
        """
        Takes an input object, and computes the corresponding output of the module.
        """
        return self.updateOutput(input)

    def backward(self,input, gradOutput):
        """
        Performs a backpropagation step through the module, with respect to the given input.

        This includes
         - computing a gradient w.r.t. `input` (is needed for further backprop),
         - computing a gradient w.r.t. parameters (to update parameters while optimizing).
        """
        self.updateGradInput(input, gradOutput)
        self.accGradParameters(input, gradOutput)
        return self.gradInput


    def updateOutput(self, input):
        """
        Computes the output using the current parameter set of the class and input.
        This function returns the result which is stored in the `output` field.

        Make sure to both store the data in `output` field and return it.
        """

        # The easiest case:

        # self.output = input
        # return self.output

        pass

    def updateGradInput(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own input.
        This is returned in `gradInput`. Also, the `gradInput` state variable is updated accordingly.

        The shape of `gradInput` is always the same as the shape of `input`.

        Make sure to both store the gradients in `gradInput` field and return it.
        """

        # The easiest case:

        # self.gradInput = gradOutput
        # return self.gradInput

        pass

    def accGradParameters(self, input, gradOutput):
        """
        Computing the gradient of the module with respect to its own parameters.
        No need to override if module has no parameters (e.g. ReLU).
        """
        pass

    def zeroGradParameters(self):
        """
        Zeroes `gradParams` variable if the module has params.
        """
        pass

    def getParameters(self):
        """
        Returns a list with its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def getGradParameters(self):
        """
        Returns a list with gradients with respect to its parameters.
        If the module does not have parameters return empty list.
        """
        return []

    def train(self):
        """
        Sets training mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = True

    def evaluate(self):
        """
        Sets evaluation mode for the module.
        Training and testing behaviour differs for Dropout, BatchNorm.
        """
        self.training = False

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Module"

# Sequential container

**Define** a forward and backward pass procedures.

In [5]:
class Sequential(Module):
    """
    This class implements a container, which processes `input` data sequentially.
    """
    def __init__(self):
        super(Sequential, self).__init__()
        self.modules = []

    def add(self, module):
        """
        Adds a module to the container.
        """
        self.modules.append(module)

    def updateOutput(self, input):
        """
        Forward pass: Processes input through each module sequentially.
        """
        output = input
        for module in self.modules:
            output = module.updateOutput(output)
        self.output = output
        return self.output

    def backward(self, input, gradOutput):
        """
        Backward pass: Propagates the gradient through each module sequentially in reverse order.
        """
        gradInput = gradOutput
        # Iterate backward through the modules
        for module in reversed(self.modules):
            gradInput = module.updateGradInput(input, gradInput)
            input = module.output  # This is the input the module saw during the forward pass
        self.gradInput = gradInput
        return self.gradInput

    def zeroGradParameters(self):
        """
        Resets gradients to zero for each module.
        """
        for module in self.modules:
            module.zeroGradParameters()

    def getParameters(self):
        """
        Returns all parameters of the modules in the sequential container.
        """
        parameters = []
        for module in self.modules:
            parameters += module.getParameters()
        return parameters

    def getGradParameters(self):
        """
        Returns all gradients w.r.t parameters in the sequential container.
        """
        grad_parameters = []
        for module in self.modules:
            grad_parameters += module.getGradParameters()
        return grad_parameters

    def __repr__(self):
        """
        Provides a string representation of the Sequential container.
        """
        string = "".join([str(x) + '\n' for x in self.modules])
        return string

    def __getitem__(self, index):
        """
        Allows indexing of modules in the Sequential container.
        """
        return self.modules[index]

    def train(self):
        """
        Set each module to training mode.
        """
        self.training = True
        for module in self.modules:
            module.train()

    def evaluate(self):
        """
        Set each module to evaluation mode.
        """
        self.training = False
        for module in self.modules:
            module.evaluate()


# Layers

## 1 (0.2). Linear transform layer
Also known as dense layer, fully-connected layer, FC-layer, InnerProductLayer (in caffe), affine transform
- input:   **`batch_size x n_feats1`**
- output: **`batch_size x n_feats2`**

In [None]:
import numpy as np

class Linear(Module):
    """
    A module which applies a linear transformation
    A common name is fully-connected layer, InnerProductLayer in caffe.

    The module should work with 2D input of shape (n_samples, n_feature).
    """
    def __init__(self, n_in, n_out):
        super(Linear, self).__init__()

        stdv = 1. / np.sqrt(n_in)
        self.W = np.random.uniform(-stdv, stdv, size=(n_out, n_in))
        self.b = np.random.uniform(-stdv, stdv, size=n_out)

        self.gradW = np.zeros_like(self.W)
        self.gradb = np.zeros_like(self.b)

    def updateOutput(self, input):
        self.output = np.dot(input, self.W.T) + self.b
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.dot(gradOutput, self.W)  
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        self.gradW += np.dot(gradOutput.T, input)  
        self.gradb += np.sum(gradOutput, axis=0)  

    def zeroGradParameters(self):
        self.gradW.fill(0)
        self.gradb.fill(0)

    def getParameters(self):
        return [self.W, self.b]

    def getGradParameters(self):
        return [self.gradW, self.gradb]

    def __repr__(self):
        s = self.W.shape
        q = 'Linear %d -> %d' % (s[1], s[0])
        return q


## 2. (0.2) SoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{softmax}(x)_i = \frac{\exp x_i} {\sum_j \exp x_j}$

Recall that $\text{softmax}(x) == \text{softmax}(x - \text{const})$. It makes possible to avoid computing exp() from large argument.

In [None]:
class SoftMax(Module):
    def __init__(self):
         super(SoftMax, self).__init__()

    def updateOutput(self, input):
        self.output = np.subtract(input, input.max(axis=1, keepdims=True))
        self.output = np.exp(self.output)
        self.output = np.divide(self.output, self.output.sum(axis=1, keepdims=True))
        return self.output

    def updateGradInput(self, input, gradOutput):
        s = self.output
        grad = np.zeros_like(input)
        
        for i in range(input.shape[0]):
            s_i = s[i].reshape(-1, 1)
            jacobian = np.diagflat(s_i) - np.dot(s_i, s_i.T)
            grad[i] = np.dot(gradOutput[i], jacobian)
        
        self.gradInput = grad
        return self.gradInput

    def __repr__(self):
        return "SoftMax"

## 3. (0.2) LogSoftMax
- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

$\text{logsoftmax}(x)_i = \log\text{softmax}(x)_i = x_i - \log {\sum_j \exp x_j}$

The main goal of this layer is to be used in computation of log-likelihood loss.

In [None]:
class LogSoftMax(Module):
    def __init__(self):
         super(LogSoftMax, self).__init__()

    def updateOutput(self, input):
        self.output = np.subtract(input, input.max(axis=1, keepdims=True))

        # Your code goes here. ################################################
        exp_input = np.exp(shifted_input)
        log_sum_exp = np.log(np.sum(exp_input, axis=1, keepdims=True))
        self.output = np.subtract(shifted_input, log_sum_exp)
        
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        exp_output = np.exp(self.output)
        self.gradInput = gradOutput * (1 - exp_output)
        return self.gradInput

    def __repr__(self):
        return "LogSoftMax"

In [None]:
class LogSoftMax(Module):
    def __init__(self):
        super(LogSoftMax, self).__init__()
        self.exp_normalized = None 

    def updateOutput(self, input):
        shifted_input = input - input.max(axis=1, keepdims=True)
        
        exp_shifted = np.exp(shifted_input)
        sum_exp = np.sum(exp_shifted, axis=1, keepdims=True)
        self.output = shifted_input - np.log(sum_exp)
        
        self.exp_normalized = exp_shifted / sum_exp
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput - self.exp_normalized * np.sum(gradOutput, axis=1, keepdims=True)
        return self.gradInput

    def __repr__(self):
        return "LogSoftMax"

## 4. (0.3) Batch normalization
One of the most significant recent ideas that impacted NNs a lot is [**Batch normalization**](http://arxiv.org/abs/1502.03167). The idea is simple, yet effective: the features should be whitened ($mean = 0$, $std = 1$) all the way through NN. This improves the convergence for deep models letting it train them for days but not weeks. **You are** to implement the first part of the layer: features normalization. The second part (`ChannelwiseScaling` layer) is implemented below.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

The layer should work as follows. While training (`self.training == True`) it transforms input as $$y = \frac{x - \mu}  {\sqrt{\sigma + \epsilon}}$$
where $\mu$ and $\sigma$ - mean and variance of feature values in **batch** and $\epsilon$ is just a small number for numericall stability. Also during training, layer should maintain exponential moving average values for mean and variance:
```
    self.moving_mean = self.moving_mean * alpha + batch_mean * (1 - alpha)
    self.moving_variance = self.moving_variance * alpha + batch_variance * (1 - alpha)
```
During testing (`self.training == False`) the layer normalizes input using moving_mean and moving_variance.

Note that decomposition of batch normalization on normalization itself and channelwise scaling here is just a common **implementation** choice. In general "batch normalization" always assumes normalization + scaling.

In [9]:
class BatchNormalization(Module):
    EPS = 1e-3 

    def __init__(self, alpha=0.9):
        super(BatchNormalization, self).__init__()
        self.alpha = alpha  
        self.gamma = None  
        self.beta = None  

        self.moving_mean = None
        self.moving_variance = None

    def updateOutput(self, input):
        batch_size, n_in = input.shape

        if self.gamma is None:
            self.gamma = np.ones((n_in,))
            self.beta = np.zeros((n_in,))
            self.moving_mean = np.zeros((n_in,))
            self.moving_variance = np.ones((n_in,))

        if self.training:
            batch_mean = np.mean(input, axis=0)
            batch_variance = np.var(input, axis=0)

            normalized_input = (input - batch_mean) / np.sqrt(batch_variance + self.EPS)
            self.output = self.gamma * normalized_input + self.beta

            self.moving_mean = self.alpha * self.moving_mean + (1 - self.alpha) * batch_mean
            self.moving_variance = self.alpha * self.moving_variance + (1 - self.alpha) * batch_variance
        else:
            normalized_input = (input - self.moving_mean) / np.sqrt(self.moving_variance + self.EPS)
            self.output = self.gamma * normalized_input + self.beta

        return self.output

    def updateGradInput(self, input, gradOutput):
        batch_size, n_in = input.shape

        grad_gamma = np.sum(gradOutput * (self.output - self.beta), axis=0)
        grad_beta = np.sum(gradOutput, axis=0)

        grad_normalized_input = gradOutput * self.gamma


        grad_variance = np.sum(grad_normalized_input * (input - np.mean(input, axis=0)) * -0.5 * np.power(np.var(input, axis=0) + self.EPS, -1.5), axis=0)
        grad_mean = np.sum(grad_normalized_input * -1 / np.sqrt(np.var(input, axis=0) + self.EPS), axis=0) + grad_variance * np.sum(-2 * (input - np.mean(input, axis=0)), axis=0) / batch_size

        gradInput = grad_normalized_input / np.sqrt(np.var(input, axis=0) + self.EPS) + grad_variance * 2 * (input - np.mean(input, axis=0)) / batch_size + grad_mean / batch_size

        self.gradInput = gradInput

        return self.gradInput

    def __repr__(self):
        return f"BatchNormalization(alpha={self.alpha})"


In [11]:
class ChannelwiseScaling(Module):
    """
       Implements linear transform of input y = \gamma * x + \beta
       where \gamma, \beta - learnable vectors of length x.shape[-1]
    """
    def __init__(self, n_out):
        super(ChannelwiseScaling, self).__init__()

        stdv = 1./np.sqrt(n_out)
        self.gamma = np.random.uniform(-stdv, stdv, size=n_out)
        self.beta = np.random.uniform(-stdv, stdv, size=n_out)

        self.gradGamma = np.zeros_like(self.gamma)
        self.gradBeta = np.zeros_like(self.beta)

    def updateOutput(self, input):
        self.output = input * self.gamma + self.beta
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = gradOutput * self.gamma
        return self.gradInput

    def accGradParameters(self, input, gradOutput):
        self.gradBeta = np.sum(gradOutput, axis=0)
        self.gradGamma = np.sum(gradOutput*input, axis=0)

    def zeroGradParameters(self):
        self.gradGamma.fill(0)
        self.gradBeta.fill(0)

    def getParameters(self):
        return [self.gamma, self.beta]

    def getGradParameters(self):
        return [self.gradGamma, self.gradBeta]

    def __repr__(self):
        return "ChannelwiseScaling"

Practical notes. If BatchNormalization is placed after a linear transformation layer (including dense layer, convolutions, channelwise scaling) that implements function like `y = weight * x + bias`, than bias adding become useless and could be omitted since its effect will be discarded while batch mean subtraction. If BatchNormalization (followed by `ChannelwiseScaling`) is placed before a layer that propagates scale (including ReLU, LeakyReLU) followed by any linear transformation layer than parameter `gamma` in `ChannelwiseScaling` could be freezed since it could be absorbed into the linear transformation layer.

## 5. (0.3) Dropout
Implement [**dropout**](https://www.cs.toronto.edu/~hinton/absps/JMLRdropout.pdf). The idea and implementation is really simple: just multimply the input by $Bernoulli(p)$ mask. Here $p$ is probability of an element to be zeroed.

This has proven to be an effective technique for regularization and preventing the co-adaptation of neurons.

While training (`self.training == True`) it should sample a mask on each iteration (for every batch), zero out elements and multiply elements by $1 / (1 - p)$. The latter is needed for keeping mean values of features close to mean values which will be in test mode. When testing this module should implement identity transform i.e. `self.output = input`.

- input:   **`batch_size x n_feats`**
- output: **`batch_size x n_feats`**

In [None]:
import numpy as np

class Dropout(Module):
    def __init__(self, p=0.5):
        super(Dropout, self).__init__()
        self.p = p
        self.mask = None  

    def updateOutput(self, input):
        """
        Forward pass: Randomly zeroes some of the input elements during training.
        During evaluation, just returns the input.
        """
        if self.training:
            self.mask = np.random.rand(*input.shape) > self.p  
            self.output = input * self.mask / (1 - self.p)
        else:
            self.output = input
        return self.output

    def updateGradInput(self, input, gradOutput):
        """
        Backward pass: Propagate gradients only through the active units.
        """
        if self.training:
            self.gradInput = gradOutput * self.mask / (1 - self.p)
        else:
            self.gradInput = gradOutput
        return self.gradInput

    def __repr__(self):
        return "Dropout"


#6. (2.0) Conv2d
Implement [**Conv2d**](https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html). Use only this list of parameters: (in_channels, out_channels, kernel_size, stride, padding, bias, padding_mode) and fix dilation=1 and groups=1.

In [17]:
class Conv2d:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True, padding_mode='zeros'):
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.bias = bias
        self.padding_mode = padding_mode

        # Инициализация весов
        self.weights = np.random.randn(out_channels, in_channels, kernel_size, kernel_size)
        if self.bias:
            self.bias_weights = np.random.randn(out_channels)

    def apply_padding(self, input):
        if self.padding_mode == 'zeros':
            return np.pad(input, ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant', constant_values=0)
        elif self.padding_mode == 'replicate':
            return np.pad(input, ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='edge')
        elif self.padding_mode == 'reflect':
            return np.pad(input, ((0, 0), (0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='reflect')
        elif self.padding_mode == 'same':
            height = input.shape[2]
            width = input.shape[3]
            padding_height = (height * (self.stride[0] - 1) - 1 + self.kernel_size - height) // 2
            padding_width = (width * (self.stride[1] - 1) - 1 + self.kernel_size - width) // 2
            return np.pad(input, ((0, 0), (0, 0), (padding_height, padding_height), (padding_width, padding_width)), mode='constant', constant_values=0)
        else:
            raise ValueError(f"Unknown padding mode: {self.padding_mode}")

    def updateOutput(self, input):
        input_padded = self.apply_padding(input)
        
        batch_size, in_channels, in_height, in_width = input_padded.shape
        kernel_height, kernel_width = self.kernel_size, self.kernel_size
        stride_height, stride_width = self.stride if isinstance(self.stride, tuple) else (self.stride, self.stride)
        
        out_height = (in_height - kernel_height) // stride_height + 1
        out_width = (in_width - kernel_width) // stride_width + 1
        
        self.output = np.zeros((batch_size, self.out_channels, out_height, out_width))
        
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for ic in range(self.in_channels):
                    for i in range(out_height):
                        for j in range(out_width):
                            start_i = i * stride_height
                            start_j = j * stride_width
                            end_i = start_i + kernel_height
                            end_j = start_j + kernel_width
                            
                            region = input_padded[b, ic, start_i:end_i, start_j:end_j]
                            self.output[b, oc, i, j] += np.sum(region * self.weights[oc, ic, :, :])
                
                if self.bias:
                    self.output[b, oc, :, :] += self.bias_weights[oc]
        
        return self.output

    def updateGradInput(self, input, gradOutput):
        batch_size, in_channels, in_height, in_width = input.shape
        gradInput = np.zeros_like(input)
        
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for ic in range(self.in_channels):
                    for i in range(gradOutput.shape[2]):
                        for j in range(gradOutput.shape[3]):
                            start_i = i * self.stride[0]
                            start_j = j * self.stride[1]
                            end_i = start_i + self.kernel_size
                            end_j = start_j + self.kernel_size

                            gradInput[b, ic, start_i:end_i, start_j:end_j] += gradOutput[b, oc, i, j] * self.weights[oc, ic, :, :]
        
        return gradInput

    def __repr__(self):
        return f"Conv2d(in_channels={self.in_channels}, out_channels={self.out_channels}, kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding}, bias={self.bias}, padding_mode={self.padding_mode})"


#7. (0.5) Implement [**MaxPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.MaxPool2d.html) and [**AvgPool2d**](https://pytorch.org/docs/stable/generated/torch.nn.AvgPool2d.html). Use only parameters like kernel_size, stride, padding (negative infinity for maxpool and zero for avgpool) and other parameters fixed as in framework.

In [21]:
import numpy as np

class MaxPool2d(Module):
    def __init__(self, kernel_size, stride, padding):
        super(MaxPool2d, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

    def updateOutput(self, input):
        if self.padding > 0:
            input = np.pad(input, ((0, 0), (self.padding, self.padding), (self.padding, self.padding)), mode='constant', constant_values=-np.inf)

        batch_size, channels, height, width = input.shape

        out_height = (height - self.kernel_size) // self.stride + 1
        out_width = (width - self.kernel_size) // self.stride + 1

        self.output = np.zeros((batch_size, channels, out_height, out_width))

        for i in range(out_height):
            for j in range(out_width):
                h_start = i * self.stride
                h_end = h_start + self.kernel_size
                w_start = j * self.stride
                w_end = w_start + self.kernel_size

                self.output[:, :, i, j] = np.max(input[:, :, h_start:h_end, w_start:w_end], axis=(2, 3))

        return self.output

    def updateGradInput(self, input, gradOutput):
        batch_size, channels, height, width = input.shape
        gradInput = np.zeros_like(input)

        for i in range(gradOutput.shape[2]): 
            for j in range(gradOutput.shape[3]):  
                h_start = i * self.stride
                h_end = h_start + self.kernel_size
                w_start = j * self.stride
                w_end = w_start + self.kernel_size

                for b in range(batch_size):
                    for c in range(channels):
                        window = input[b, c, h_start:h_end, w_start:w_end]
                        max_val = np.max(window)
                        max_mask = (window == max_val)

                        gradInput[b, c, h_start:h_end, w_start:w_end] += max_mask * gradOutput[b, c, i, j]

        return gradInput

    def __repr__(self):
        return f"MaxPool2d(kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding})"


class AvgPool2d(Module):
    def __init__(self, kernel_size, stride=None, padding=0):
        super(AvgPool2d, self).__init__()

        self.kernel_size = (kernel_size, kernel_size) if isinstance(kernel_size, int) else kernel_size
        self.stride = stride if stride is not None else kernel_size
        self.stride = (self.stride, self.stride) if isinstance(self.stride, int) else self.stride
        self.padding = (padding, padding) if isinstance(padding, int) else padding

        self.actual_kernel_areas = None

    def updateOutput(self, input):
        batch_size, channels, height, width = input.shape
        k_h, k_w = self.kernel_size
        s_h, s_w = self.stride
        p_h, p_w = self.padding
        
        if p_h > 0 or p_w > 0:
            input_padded = np.pad(input, 
                                ((0,0), (0,0),
                                (p_h, p_h), (p_w, p_w)),
                                mode='constant', constant_values=0)
        else:
            input_padded = input
        
        out_height = (height + 2*p_h - k_h) // s_h + 1
        out_width = (width + 2*p_w - k_w) // s_w + 1
        
        self.output = np.zeros((batch_size, channels, out_height, out_width))
        self.actual_kernel_areas = np.ones((out_height, out_width)) * (k_h * k_w)
        
        for i in range(out_height):
            for j in range(out_width):
                h_start = i * s_h
                w_start = j * s_w
                h_end = h_start + k_h
                w_end = w_start + k_w
                region = input_padded[:, :, h_start:h_end, w_start:w_end]

                actual_h = region.shape[2]
                actual_w = region.shape[3]
                if actual_h != k_h or actual_w != k_w:
                    self.actual_kernel_areas[i,j] = actual_h * actual_w
                
                self.output[:, :, i, j] = np.sum(region, axis=(2,3)) / self.actual_kernel_areas[i,j]
        
        return self.output

    def updateGradInput(self, input, gradOutput):
        batch_size, channels, height, width = input.shape
        k_h, k_w = self.kernel_size
        s_h, s_w = self.stride
        p_h, p_w = self.padding
        
        gradInput = np.zeros_like(input)
        if p_h > 0 or p_w > 0:
            gradInput_padded = np.pad(gradInput, 
                                   ((0,0), (0,0),
                                   (p_h, p_h), (p_w, p_w)),
                                   mode='constant', constant_values=0)
        else:
            gradInput_padded = gradInput
        
        for i in range(gradOutput.shape[2]):
            for j in range(gradOutput.shape[3]):
                h_start = i * s_h
                w_start = j * s_w
                h_end = h_start + k_h
                w_end = w_start + k_w
                
                norm = 1.0 / self.actual_kernel_areas[i,j]
                
                gradInput_padded[:, :, h_start:h_end, w_start:w_end] += (
                    gradOutput[:, :, i, j][:,:,None,None] * norm
                )
        
        if p_h > 0 or p_w > 0:
            gradInput = gradInput_padded[:, :, p_h:-p_h, p_w:-p_w]
        
        return gradInput

    def __repr__(self):
        return f"AvgPool2d(kernel_size={self.kernel_size}, stride={self.stride}, padding={self.padding})"

#8. (0.3) Implement **GlobalMaxPool2d** and **GlobalAvgPool2d**. They do not have testing and parameters are up to you but they must aggregate information within channels. Write test functions for these layers on your own.

#9. (0.2) Implement [**Flatten**](https://pytorch.org/docs/stable/generated/torch.flatten.html)

In [23]:
class Flatten(Module):
    def __init__(self, start_dim=0, end_dim=-1):
        super(Flatten, self).__init__()

        self.start_dim = start_dim
        self.end_dim = end_dim

        self.original_shape = None

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.original_shape = input.shape

        if self.end_dim < 0:
            self.end_dim = len(input.shape) + self.end_dim

        new_shape = list(input.shape[:self.start_dim])
        flattened_dim = np.prod(input.shape[self.start_dim:self.end_dim+1])
        new_shape.append(flattened_dim)
        new_shape.extend(input.shape[self.end_dim+1:])
        
        self.output = input.reshape(*new_shape)
        return self.output


    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = gradOutput.reshape(self.original_shape)
        return self.gradInput

    def __repr__(self):
        return "Flatten"

# Activation functions

Here's the complete example for the **Rectified Linear Unit** non-linearity (aka **ReLU**):

In [25]:
class ReLU(Module):
    def __init__(self):
         super(ReLU, self).__init__()

    def updateOutput(self, input):
        self.output = np.maximum(input, 0)
        return self.output

    def updateGradInput(self, input, gradOutput):
        self.gradInput = np.multiply(gradOutput , input > 0)
        return self.gradInput

    def __repr__(self):
        return "ReLU"

## 10. (0.1) Leaky ReLU
Implement [**Leaky Rectified Linear Unit**](http://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29%23Leaky_ReLUs). Expriment with slope.

In [27]:
class LeakyReLU(Module):
    def __init__(self, slope = 0.03):
        super(LeakyReLU, self).__init__()

        self.slope = slope
        self.mask = None

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.mask = (input > 0)  
        self.output = np.where(self.mask, input, input * self.slope)
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = np.where(self.mask, gradOutput, gradOutput * self.slope)
        return self.gradInput

    def __repr__(self):
        return "LeakyReLU"

## 11. (0.1) ELU
Implement [**Exponential Linear Units**](http://arxiv.org/abs/1511.07289) activations.

In [29]:
class ELU(Module):
    def __init__(self, alpha = 1.0):
        super(ELU, self).__init__()

        self.alpha = alpha
        self.activated = None

    def updateOutput(self, input):
        # Your code goes here. ################################################
        self.activated = (input <= 0) 
        self.output = np.where(input > 0, 
                             input, 
                             self.alpha * (np.exp(input) - 1))
        return self.output

    def updateGradInput(self, input, gradOutput):
        # Your code goes here. ################################################
        self.gradInput = np.where(input > 0,
                                gradOutput,
                                gradOutput * (self.output + self.alpha))
        return self.gradInput


    def __repr__(self):
        return "ELU"

## 12. (0.1) SoftPlus
Implement [**SoftPlus**](https://en.wikipedia.org/wiki%2FRectifier_%28neural_networks%29) activations. Look, how they look a lot like ReLU.

In [31]:
import numpy as np

class SoftPlus(Module):
    def __init__(self):
        super(SoftPlus, self).__init__()

    def updateOutput(self, input):
        self.output = np.log(1 + np.exp(input))
        return self.output

    def updateGradInput(self, input, gradOutput):
        gradInput = gradOutput * (1 / (1 + np.exp(-input)))  
        self.gradInput = gradInput
        return self.gradInput

    def __repr__(self):
        return "SoftPlus"


#13. (0.2) Gelu
Implement [**Gelu**](https://pytorch.org/docs/stable/generated/torch.nn.GELU.html) activations.

In [39]:
class Gelu(Module):
    def __init__(self):
        super(Gelu, self).__init__()

    def updateOutput(self, input):
        self.output = 0.5 * input * (1 + np.tanh(np.sqrt(2 / np.pi) * (input + 0.044715 * input ** 3)))
        return self.output

    def updateGradInput(self, input, gradOutput):
        sqrt_2_pi = np.sqrt(2 / np.pi)
        tanh_term = np.tanh(sqrt_2_pi * (input + 0.044715 * input ** 3))
        grad = 0.5 * (1 + tanh_term) + 0.5 * input * (1 - tanh_term ** 2) * sqrt_2_pi * (1 + 0.13429 * input ** 2)
        
        self.gradInput = grad * gradOutput
        return self.gradInput

    def __repr__(self):
        return "Gelu"


# Criterions

Criterions are used to score the models answers.

In [None]:
class Criterion(object):
    def __init__ (self):
        self.output = None
        self.gradInput = None

    def forward(self, input, target):
        """
            Given an input and a target, compute the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateOutput`.
        """
        return self.updateOutput(input, target)

    def backward(self, input, target):
        """
            Given an input and a target, compute the gradients of the loss function
            associated to the criterion and return the result.

            For consistency this function should not be overrided,
            all the code goes in `updateGradInput`.
        """
        return self.updateGradInput(input, target)

    def updateOutput(self, input, target):
        """
        Function to override.
        """
        return self.output

    def updateGradInput(self, input, target):
        """
        Function to override.
        """
        return self.gradInput

    def __repr__(self):
        """
        Pretty printing. Should be overrided in every module if you want
        to have readable description.
        """
        return "Criterion"

The **MSECriterion**, which is basic L2 norm usually used for regression, is implemented here for you.
- input:   **`batch_size x n_feats`**
- target: **`batch_size x n_feats`**
- output: **scalar**

In [None]:
class MSECriterion(Criterion):
    def __init__(self):
        super(MSECriterion, self).__init__()

    def updateOutput(self, input, target):
        self.output = np.sum(np.power(input - target,2)) / input.shape[0]
        return self.output

    def updateGradInput(self, input, target):
        self.gradInput  = (input - target) * 2 / input.shape[0]
        return self.gradInput

    def __repr__(self):
        return "MSECriterion"

## 14. (0.2) Negative LogLikelihood criterion (numerically unstable)
You task is to implement the **ClassNLLCriterion**. It should implement [multiclass log loss](http://scikit-learn.org/stable/modules/model_evaluation.html#log-loss). Nevertheless there is a sum over `y` (target) in that formula,
remember that targets are one-hot encoded. This fact simplifies the computations a lot. Note, that criterions are the only places, where you divide by batch size. Also there is a small hack with adding small number to probabilities to avoid computing log(0).
- input:   **`batch_size x n_feats`** - probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**



In [None]:
class ClassNLLCriterionUnstable(Criterion):
    EPS = 1e-15
    def __init__(self):
        a = super(ClassNLLCriterionUnstable, self)
        super(ClassNLLCriterionUnstable, self).__init__()
        self.input_clamp = None

    def updateOutput(self, input, target):
        input_clamp = np.clip(input, self.EPS, 1 - self.EPS)
        self.input_clamp = input_clamp

        # Your code goes here. ################################################
        self.output = -np.mean(np.log(input_clamp[np.arange(len(target)), target]))
        return self.output

    def updateGradInput(self, input, target):
        input_clamp = self.input_clamp
        self.gradInput = np.zeros_like(input)
        batch_indices = np.arange(len(target))
        self.gradInput[batch_indices, target] = -1.0 / input_clamp[batch_indices, target]

        # Your code goes here. ################################################
        self.gradInput /= len(target)
        
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterionUnstable"

In [None]:
class ClassNLLCriterionUnstable(Criterion):
    EPS = 1e-15 

    def __init__(self):
        super(ClassNLLCriterionUnstable, self).__init__()
        self.input_clamp = None

    def updateOutput(self, input, target):
        input_clamp = np.clip(input, self.EPS, 1 - self.EPS)
        self.input_clamp = input_clamp

        target_int = target.astype(np.int64)

        self.output = -np.mean(np.log(input_clamp[np.arange(len(target_int)), target_int.argmax(axis=1)]))

        return self.output

    def updateGradInput(self, input, target):
        input_clamp = self.input_clamp
        self.gradInput = np.zeros_like(input)

        target_int = target.astype(np.int64)

        batch_indices = np.arange(len(target_int))
        self.gradInput[batch_indices, target_int.argmax(axis=1)] = -1.0 / input_clamp[batch_indices, target_int.argmax(axis=1)]

        self.gradInput /= len(target_int)

        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterionUnstable"

## 15. (0.3) Negative LogLikelihood criterion (numerically stable)
- input:   **`batch_size x n_feats`** - log probabilities
- target: **`batch_size x n_feats`** - one-hot representation of ground truth
- output: **scalar**

Task is similar to the previous one, but now the criterion input is the output of log-softmax layer. This decomposition allows us to avoid problems with computation of forward and backward of log().

In [None]:
class ClassNLLCriterion(Criterion):
    EPS = 1e-10
    def __init__(self):
        a = super(ClassNLLCriterion, self)
        super(ClassNLLCriterion, self).__init__()
        self.input_clamp = None

    def updateOutput(self, input, target):
        # Your code goes here. ################################################
        self.input_clamp = np.clip(input, self.EPS, 1 - self.EPS)
        log_probs = np.log(self.input_clamp)
        self.output = -np.sum(input * target) / input.shape[0]
        return self.output

    def updateGradInput(self, input, target):
        # Your code goes here. ################################################
        self.gradInput = -target / (self.input_clamp * input.shape[0])
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterion"

In [None]:
class ClassNLLCriterion(Criterion):
    def __init__(self):
        # Инициализация, вызываем конструктор родительского класса
        super(ClassNLLCriterion, self).__init__()

    def updateOutput(self, input, target):
        target_labels = np.argmax(target, axis=1)

        log_probs = np.sum(input * target, axis=1)  
        self.output = -np.mean(log_probs)  

        return self.output

    def updateGradInput(self, input, target):
        target_labels = np.argmax(target, axis=1)

        grad_input = np.zeros_like(input)
        grad_input[np.arange(input.shape[0]), target_labels] = -1
        grad_input /= input.shape[0]  # Среднее по выборке

        self.gradInput = grad_input
        return self.gradInput

    def __repr__(self):
        return "ClassNLLCriterion"

1-я часть задания: реализация слоев, лосей и функций активации - 5 баллов. \\
2-я часть задания: реализация моделей на своих классах. Что должно быть:
  1. Выберите оптимизатор и реализуйте его, чтоб он работал с вами классами. - 1 балл.
  2. Модель для задачи мультирегрессии на выбраных вами данных. Использовать FCNN, dropout, batchnorm, MSE. Пробуйте различные фукнции активации. Для первой модели попробуйте большую, среднюю и маленькую модель. - 1 балл.
  3. Модель для задачи мультиклассификации на MNIST. Использовать свёртки, макспулы, флэттэны, софтмаксы - 1 балла.
  4. Автоэнкодер для выбранных вами данных. Должен быть на свёртках и полносвязных слоях, дропаутах, батчнормах и тд. - 2 балла. \\

Дополнительно в оценке каждой модели будет учитываться:
1. Наличие правильно выбранной метрики и лосс функции.
2. Отрисовка графиков лосей и метрик на трейне-валидации. Проверка качества модели на тесте.
3. Наличие шедулера для lr.
4. Наличие вормапа.
5. Наличие механизма ранней остановки и сохранение лучшей модели.
6. Свитч лося (метрики) и оптимайзера.

### Оптимизатор

In [None]:
class SGD:
    def __init__(self, parameters, lr=0.01, momentum=0.9):
        self.parameters = parameters  
        self.lr = lr
        self.momentum = momentum
        self.velocity = [np.zeros_like(param) for param in self.parameters] 

    def step(self):
        for i, param in enumerate(self.parameters):
            grad = param.grad  # Градиенты
            self.velocity[i] = self.momentum * self.velocity[i] + grad  
            param -= self.lr * self.velocity[i] 


### 2. Модель для задачи мультирегрессии

In [None]:
class SimpleModel(Module):
    def __init__(self):
        super(SimpleModel, self).__init__()

        self.fc1 = Linear(10, 64)
        self.relu1 = ReLU()
        self.batchnorm1 = BatchNorm(64)  
        self.dropout1 = Dropout(0.5)     

        self.fc2 = Linear(64, 32)
        self.relu2 = ReLU()
        self.batchnorm2 = BatchNorm(32)
        self.dropout2 = Dropout(0.5)

        self.fc3 = Linear(32, 3) 

    def updateOutput(self, input):
        x = self.fc1.updateOutput(input)
        x = self.relu1.updateOutput(x)
        x = self.batchnorm1.updateOutput(x)
        x = self.dropout1.updateOutput(x)

        x = self.fc2.updateOutput(x)
        x = self.relu2.updateOutput(x)
        x = self.batchnorm2.updateOutput(x)
        x = self.dropout2.updateOutput(x)

        x = self.fc3.updateOutput(x)
        return x

    def updateGradInput(self, input, gradOutput):
        grad = self.fc3.updateGradInput(input, gradOutput)
        grad = self.dropout2.updateGradInput(input, grad)
        grad = self.batchnorm2.updateGradInput(input, grad)
        grad = self.relu2.updateGradInput(input, grad)
        grad = self.fc2.updateGradInput(input, grad)
        grad = self.dropout1.updateGradInput(input, grad)
        grad = self.batchnorm1.updateGradInput(input, grad)
        grad = self.relu1.updateGradInput(input, grad)
        grad = self.fc1.updateGradInput(input, grad)
        return grad

    def accGradParameters(self, input, gradOutput):
        self.fc3.accGradParameters(input, gradOutput)
        self.fc2.accGradParameters(input, gradOutput)
        self.fc1.accGradParameters(input, gradOutput)

    def zeroGradParameters(self):
        self.fc3.zeroGradParameters()
        self.fc2.zeroGradParameters()
        self.fc1.zeroGradParameters()

    def computeMSELoss(self, output, target):
        return np.mean((output - target) ** 2)
    
    def updateLoss(self, input, target):
        output = self.updateOutput(input)
        return self.computeMSELoss(output, target)

    def __repr__(self):
        return f"SimpleModel(input_dim=10, hidden_dims=[64, 32], output_dim=3)"
