## Лабораторная работа 1 "Полносвязные нейронные сети

В современных фреймворках, нейронные сети конструируются в виде разнообразных слоев. Каждый слой реализует два метода forward и backward. forward предназначен для расчета выхода слоя, backward для расчета градиента. В лабораторной лаботе вам необходимо рализовать оба метода для линейного, сигмоидального и Relu.
После чего необходимо сконструировать нейронную сеть для решения задачи классификации.

Функция forward будет вычислять по $x$ значение $y$, backward — по $\frac{\partial L}{\partial y}$ вычислять $\frac{\partial L}{\partial x}$ и обновлять внутри себя $\frac{\partial L}{\partial w}$.

Важным требованием к реализации является векторизация всех слоев: все операции должны быть сведены к матричным, не должно быть циклов. Это значительно уменьшает временные затраты.

In [1]:
from typing import Tuple, List, Union

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, 
                             confusion_matrix,
                             ConfusionMatrixDisplay)

from tqdm.notebook import tqdm

### Часть 1: Линейный слой

Приведем пример вычисления градиентов для линейного слоя: $y = Wx$, $x \in \mathbb{R}^{K \times n}$, $y \in \mathbb{R}^{K \times n}$, $W \in \mathbb{R}^{n \times m}$, где $K$ — число объектов.

Рассмотрим $L$ как функцию от выходов нейронной сети: $L = L(y_{11}, y_{12}, \dots)$

$$y_{kt} = (Wx)_{kt} = \sum_{z=1}^{n} x_{kz}W_{zt}$$

$$\frac{\partial L}{\partial x_{ij}} = \sum_{kt} \frac{\partial L}{\partial y_{kt}}\frac{\partial y_{kt}}{\partial x_{ij}} = \sum_{kt} \frac{\partial L}{\partial y_{kt}}\frac{\partial \sum_z x_{kz}w_{zt}}{\partial x_{ij}}= \sum_{t} \frac{\partial L}{\partial y_{it}}\frac{\partial w_{jt}}{\partial x_{ij}}$$

$$\frac{\partial{L}}{\partial x} = \frac{\partial{L}}{\partial y}W^T$$

In [2]:
class Parameters:
    """
    Класс для хранения весов и градиентов.
    """
    def __init__(self, shape: Tuple[int, int]):
        # Weights
        self.weights = np.random.randn(*shape) * 0.1
        # Gradients
        self.grad = np.zeros_like(self.weights)

In [3]:
class Linear:
    """
    Линейный (полносвязный) слой.
    """
    def __init__(self, 
                 input_size: int, 
                 output_size: int):
        self.w = Parameters((input_size, output_size))
        self.bias = Parameters((1, output_size))
        self.output_size = output_size

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        Прямой проход через слой.
        
        Аргументы:
        X: np.ndarray
            Входной массив формы (N, input_size).
        
        Возвращает:
        y: np.ndarray
            Выходной массив формы (N, output_size).
        """
        self.X = X.copy()
        self.y = X @ self.w.weights + self.bias.weights  
              
        return self.y
        
    def backward(self, dLdy: np.ndarray) -> np.ndarray:
        """
        Обратное распространение ошибки.
        
        Вычисляет:
            1. Градиент по входу: dL/dx = dL/dy @ W.T.
            2. Градиент по весам: dL/dW = X.T @ dL/dy.
            3. Градиент по смещениям: сумма по батчу.
        
        Аргументы:
        dLdy: np.ndarray
            Градиент ошибки по выходу слоя.
        
        Возвращает:
        dLdx: np.ndarray
            Градиент ошибки по входу слоя.
        """
        # Градиент по входу: dL/dx = dL/dy @ W.T
        dLdx = dLdy @ self.w.weights.T
        # Градиент по весам: dL/dW = X.T @ dL/dy
        self.w.grad = self.X.T.dot(dLdy)
        # Градиент по смещениям
        self.bias.grad = dLdy.sum(0)
        
        return dLdx

In [4]:
def mse(y_pred: np.ndarray, y_true: np.ndarray) -> float:
    """
    Вычисляет сумму квадратов ошибок (MSE).
    
    Аргументы:
    y_pred: np.ndarray
        Предсказанные значения.
    y_true: np.ndarray
        Истинные значения.
    
    Возвращает:
    float
        Сумма квадратов ошибок.
    """
    return ((y_pred - y_true) ** 2).sum()

def grad_mse(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray:
    """
    Вычисляет градиент функции потерь MSE по предсказаниям.
    
    Аргументы:
    y_pred: np.ndarray
        Предсказанные значения.
    y_true: np.ndarray
        Истинные значения.
    
    Возвращает:
    np.ndarray
        Градиент MSE, 2 * (y_pred - y_true).
    """
    return 2 * (y_pred - y_true)

#### Тесты

In [None]:
layer = Linear(input_size=4, output_size=3)

X = np.random.randn(5, 4)
output = layer.forward(X)
assert output.shape == (5, 3), "Неверная форма выхода прямого прохода"

dLdy = np.ones_like(output)
dl_dx = layer.backward(dLdy)
assert layer.w.grad.shape == (4, 3), "Неверная форма градиента весов"
assert layer.bias.grad.shape == (3,), "Неверная форма градиента смещений"
assert dl_dx.shape == (5, 4), "Неверная форма градиента по входу"

print("Тест forward/backward для Linear слоя пройден.")

In [None]:
y_pred = np.array([1.0, 2.0, 3.0])
y_true = np.array([1.5, 2.5, 3.5])

loss = mse(y_pred, y_true)
expected_loss = ((y_pred - y_true) ** 2).sum()
assert np.isclose(loss, expected_loss), "Неверное значение MSE"

print("Тест для mse пройден.")

In [None]:
grad = grad_mse(y_pred, y_true)
expected_grad = 2 * (y_pred - y_true)
assert np.allclose(grad, expected_grad), "Неверное значение градиента MSE"

print("Тест для grad_mse пройден.")

### Часть 2: Relu

In [8]:
class Relu:
    """
    Функция активации ReLu
    """
    def __init__(self):
        pass

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        Прямой проход через ReLU.
        
        Аргументы:
        X: np.ndarray
            Входной массив.
            
        Возвращает:
        np.ndarray
            Результат применения ReLU.
        """
        self.X = X

        return np.maximum(0, X)
    
    def backward(self, dLdy: np.ndarray) -> np.ndarray:
        """
        Обратное распространение через ReLU.
        
        Аргументы:
        dLdy: np.ndarray
            Градиент по выходу.
            
        Возвращает:
        np.ndarray
            Градиент по входу.
        """
        dLdx = dLdy * (self.X > 0)
        
        return dLdx

#### Тесты

In [None]:
X_relu = np.array([[-1, 2], [3, -4]])
relu = Relu()
forward_relu = relu.forward(X_relu)
backward_relu = relu.backward(np.ones_like(X_relu))
print("ReLU forward:\n", forward_relu)
print("ReLU backward:\n", backward_relu)

### Часть 3: Sigmoid


In [10]:
class Sigmoid:
    """
    Функция активации сигмоида
    """
    def __init__(self):
        pass

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        Прямой проход через сигмоиду.
        
        Аргументы:
        X: np.ndarray
            Входной массив формы (N, d).
            
        Возвращает:
        np.ndarray
            Результат применения сигмоиды той же формы.
        """
        self.output = 1 / (1 + np.exp(-X))

        return self.output

    def backward(self, dLdy: np.ndarray) -> np.ndarray:
        """
        Обратное распространение через сигмоиду.
        
        Вычисляет градиент: dLdx = dLdy * (output * (1 - output)).
        
        Аргументы:
        dLdy: np.ndarray
            Градиент по выходу.
            
        Возвращает:
        np.ndarray
            Градиент по входу.
        """
        dSdx = self.output * (1 - self.output)
        dLdx = dLdy * dSdx
        
        return dLdx

#### Тесты

In [None]:
sigmoid = Sigmoid()
forward_sigmoid = sigmoid.forward(X_relu)
backward_sigmoid = sigmoid.backward(np.ones_like(X_relu))
print("Sigmoid forward:\n", forward_sigmoid)
print("Sigmoid backward:\n", backward_sigmoid)

### Часть 4: Функция потерь

In [12]:
class NLLLoss:
    """
    Класс вычисляет функцию потерь
    Применяет softmax к входам и вычисляет Negative Likelihood Loss.
    """
    def __init__(self):
        pass

    def forward(self, X: np.ndarray, y: np.ndarray) -> float:
        """
        Прямой проход через слой NLLLoss.
        
        Аргументы:
        X: np.ndarray
            Входной массив формы (N, C), где C число классов.
        y: np.ndarray
            Вектор меток формы (N,).
            
        Возвращает:
        float
            Значение NLL loss.
        """
        self.X = X
        self.y = y 

        exp_X = np.exp(X)
        self.probs = exp_X / np.sum(exp_X, axis=1, keepdims=True)

        eps = 1e-10
        logprobs = -np.log(self.probs[np.arange(X.shape[0]), y] + eps)
        loss = np.mean(logprobs)
        return loss

    def backward(self):
        """
        Обратное распространение через NLLLoss.
        Вычисляет градиент по входу:
            dLdx = (probs - one_hot(y)) / N.
            
        Возвращает:
        np.ndarray
            Градиент по входу.
        """
        N = self.probs.shape[0]
        dLdx = self.probs.copy()
        dLdx[np.arange(N), self.y] -= 1
        dLdx /= N
        return dLdx

#### Тесты

In [None]:
X_nll = np.array([[2.0, 1.0, 0.1],
                  [0.5, 1.5, 1.0]])
y_nll = np.array([0, 2])
nll_loss = NLLLoss()
loss_value = nll_loss.forward(X_nll, y_nll)
grad_nll = nll_loss.backward()
print("NLLLoss loss:", loss_value)
print("NLLLoss gradient:\n", grad_nll)

### Часть 5: Численный градиент

Реализуем функцию проверки численного градиента. Для этого для каждой переменной, по которой считается градиент, надо вычислить численный градиент: $f'(x) \approx \frac{f(x+\epsilon)-f(x-\epsilon)}{2\epsilon}$. Функция возвращает максимальное абсолютное отклонение аналитического градиента от численного. В качестве $\epsilon$ рекомендуется взять $10^{-6}$. При правильной реализации максимальное отличие будет иметь порядок $10^{-8}-10^{-6}$.

In [14]:
def check_gradient(func: callable, 
                   X: np.ndarray,
                   return_max_deviation: bool=False) -> Union[np.ndarray, float]:
    """
    Вычисляет численный градиент и сравнивает его с аналитическим.

    Параметры:
    func: callable
        Функция, которая принимает X и возвращает кортеж (loss, grad).
    X: np.ndarray
        Тензор размера (n, m), по которому считается градиент.
    return_max_deviation: bool
        Если True, функция возвращает кортеж (численный градиент, максимальное отклонение),
        иначе — только численный градиент.
    
    Возвращает:
        Численный градиент, либо кортеж (численный градиент, максимальное отклонение).
    """
    eps = 10**(-6)
    numerical_grad = np.zeros_like(X)

    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            orig = X[i, j]
            X[i, j] = orig + eps
            plus_eps, _ = func(X)
            X[i, j] = orig - eps
            minus_eps, _ = func(X)
            numerical_grad[i, j] = (plus_eps - minus_eps) / (2 * eps)
            X[i, j] = orig

    _, analytical_grad = func(X)
    max_deviation = np.max(np.abs(numerical_grad - analytical_grad))
    
    if return_max_deviation:
        return numerical_grad, max_deviation
    else :
        return numerical_grad

#### Тесты

In [None]:
def quad_func(X):
    loss = np.sum(X ** 2)
    grad = 2 * X
    return loss, grad

X = np.random.randn(3, 4)

numerical_grad, max_dev = check_gradient(quad_func, X.copy(), return_max_deviation=True)
assert max_dev < 1e-6, f"Максимальное отклонение слишком велико: {max_dev}"

_, analytical_grad = quad_func(X)
assert np.allclose(numerical_grad, analytical_grad, atol=1e-6), "Градиенты не сходятся"

print("Тесты check_gradient пройдены.")

In [None]:
def sigmoid_loss(X):
    s = Sigmoid()
    out = s.forward(X)
    loss = np.sum(out)
    grad = s.backward(np.ones_like(out))
    return loss, grad

X = np.random.randn(3, 4)

numerical_grad, max_dev = check_gradient(sigmoid_loss, X.copy(), return_max_deviation=True)
assert max_dev < 1e-5, f"Максимальное отклонение для сигмоиды слишком велико: {max_dev}"

_, analytical_grad = sigmoid_loss(X)
assert np.allclose(numerical_grad, analytical_grad, atol=1e-5), "Градиенты не сходятся для сигмоиды"

print("Тесты Sigmoid пройдены.")

### Часть 6: Нейронная сеть

Мы можем написать класс, который будет собирать всю сеть вместе.

In [17]:
class NeuralNetwork:
    def __init__(self, 
                 input_size: int, 
                 hidden_size: int, 
                 output_size: int):
        self.layers = [Linear(input_size, hidden_size),
                       Relu(),
                       Linear(hidden_size, hidden_size),
                       Relu(),
                       Linear(hidden_size, output_size)]

    def forward(self, X: np.ndarray) -> np.ndarray:
        """
        Прямой проход через сеть.

        Аргументы:
        X: np.ndarray
            Входной массив формы (N, input_size).

        Возвращает:
        np.ndarray
            Выход сети формы (N, output_size).
        """
        out = X.copy()
        for layer in self.layers:
            out = layer.forward(out)
        return out

    def backward(self, dLdy: np.ndarray) -> np.ndarray:
        """
        Обратное распространение ошибки через сеть.

        Аргументы:
        dLdy: np.ndarray
            Градиент функции потерь по выходу сети.

        Возвращает:
        np.ndarray
            Градиент функции потерь по входу сети.
        """
        grad = dLdy
        for layer in reversed(self.layers):
            grad = layer.backward(grad)
        return grad
    
    def get_parameters(self) -> List[Parameters]:
        """
        Извлекает параметры (веса и смещения) всех линейных слоев сети.

        Возвращает:
        params: List
            Список параметров для линейных слоев.
        """
        params = []
        for layer in self.layers:
            if isinstance(layer, Linear):
                params.append(layer.w)
                params.append(layer.bias)
        return params

#### Тесты

In [None]:
input_size = 4
hidden_size = 5
output_size = 3
n_samples = 10

nn = NeuralNetwork(input_size, hidden_size, output_size)
X = np.random.randn(n_samples, input_size)

out = nn.forward(X)
assert out.shape == (n_samples, output_size), (
    "Неверная форма выхода нейронной сети на прямом проходе."
)

dLdy = np.ones_like(out)
grad = nn.backward(dLdy)
assert grad.shape == (n_samples, input_size), (
    "Неверная форма градиента обратного прохода."
)

params = nn.get_parameters()
assert len(params) == 6, "Неверное количество параметров."

print("Тесты NeuralNetwork пройдены.")

#### Часть 7: Оптимизатор

Реализуем оптимизатор, похожий по функционалу на тот, которые используется во фреймворке Pytorch. 

Он состоит из трех методов (включая `__init__`):
- `__init__` инициализирует параметры,
- `step` - обновляет веса,
- `zero_grad` - обнуляет градиенты.

In [19]:
class Optimizer:
    """
    Класс оптимизатора.
    """
    def __init__(self, 
                 parameters: List[Parameters],
                 lr: float):
        self.parameters = parameters
        self.lr = lr

    def step(self) -> None:
        """
        Выполняет один шаг оптимизации: обновляет веса параметров
        по правилу: W = W - lr * grad.
        """
        for param in self.parameters:
            param.weights -= self.lr * param.grad

    def zero_grad(self) -> None:
        """
        Обнуляет градиенты всех параметров.
        """
        for param in self.parameters:
            param.grad = np.zeros_like(param.weights)

#### Тесты

In [None]:
param = Parameters((2, 2))
param.weights = np.array([[1.0, 2.0], [3.0, 4.0]])
param.grad = np.array([[0.1, 0.2], [0.3, 0.4]])

lr = 0.1
optimizer = Optimizer([param], lr)

weights_old = param.weights.copy()
optimizer.step()
expected_weights = weights_old - lr * param.grad
assert np.allclose(param.weights, expected_weights, atol=1e-7), "Метод step() обновляет веса неверно."

optimizer.zero_grad()
assert np.all(param.grad == 0), "Метод zero_grad() не обнуляет градиенты корректно."

print("Тесты Optimizer пройдены.")

In [None]:
target = np.array([[3.0, 5.0]])
param = Parameters((1, 2))
param.weights = np.array([[0.0, 0.0]])

optimizer = Optimizer([param], 0.1)

num_iterations = 50
trajectory = []

for _ in range(num_iterations):
    trajectory.append(param.weights.copy())
    param.grad = 2 * (param.weights - target)
    optimizer.step()
    optimizer.zero_grad()

trajectory = np.concatenate(trajectory, axis=0)

x = np.linspace(-1, 5, 200)
y = np.linspace(-1, 7, 200)
X, Y = np.meshgrid(x, y)
Z = (X - 3) ** 2 + (Y - 5) ** 2

plt.figure(figsize=(6, 6))
contours = plt.contour(X, Y, Z, 
                       levels=np.logspace(0, 3, 20), 
                       cmap='viridis')
plt.clabel(contours, inline=True, fontsize=8)
plt.plot(trajectory[:, 0], trajectory[:, 1], marker='o', color='red')
plt.scatter(target[0, 0], target[0, 1], marker='x')
plt.xlabel('x')
plt.ylabel('y')
plt.show()

### Часть 8: Обучение на простых данных

Создадим архитектуру вида 4 -> 100 -> 100 -> 3:
* Linear(4, 100)
* Relu()
* Linear(100, 100)
* Relu()
* Linear(100, 3)

* В качестве функции потерь будем использовать NLLLoss.

* Для обновления весов воспользуемся написанным ранее оптимизатор.
* Построим график сходимости (величина NLL после каждого обновления).

In [None]:
import pandas as pd
from sklearn.datasets import load_iris

iris = load_iris(as_frame=True)
print(iris.data.shape)
iris.data.head()

In [None]:
print(iris.target.shape)
print(iris.target.value_counts())

classes = iris.target.unique()
print(classes)

iris.target.head()

In [24]:
X_train, X_test, y_train, y_test = train_test_split(iris.data.to_numpy(), 
                                                    iris.target.to_numpy(), 
                                                    stratify=iris.target, 
                                                    test_size=0.2)

In [25]:
input_size = X_train.shape[1]
hidden_size = 100
output_size = np.unique(y_train).shape[0]

In [26]:
nn = NeuralNetwork(input_size=input_size,
                   hidden_size=hidden_size,
                   output_size=output_size)

sigmoid = Sigmoid()
criterion = NLLLoss()

optimizer = Optimizer(nn.get_parameters(), lr=0.01)

In [None]:
num_epochs = 2000
batch_size = 32

loss_history = []
num_samples = X_train.shape[0]

for epoch in range(num_epochs):
    indices = np.random.permutation(num_samples)
    epoch_loss = 0
    num_batches = 0

    for start in range(0, num_samples, batch_size):
        optimizer.zero_grad()
        
        batch_indices = indices[start:start+batch_size]
        X_batch = X_train[batch_indices]
        y_batch = y_train[batch_indices]
        
        logits = nn.forward(X_batch)
        loss = criterion.forward(logits, y_batch)
        epoch_loss += loss
        num_batches += 1
        
        dLdx = criterion.backward()
        nn.backward(dLdx)
        
        optimizer.step()
    
    if epoch % 10 == 0:
        avg_epoch_loss = epoch_loss / num_batches
        loss_history.append(avg_epoch_loss)
    
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Avg Loss: {avg_epoch_loss}")

In [None]:
plt.plot(np.linspace(0, num_epochs, len(loss_history)), loss_history)
plt.xlabel('Epoch')
plt.ylabel('Loss')
# plt.yscale('log')
plt.title('Loss History')
plt.show()

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix

logits = nn.forward(X_test)
print(round(accuracy_score(np.argmax(logits, axis=1), y_test), 3))

cm = confusion_matrix(np.argmax(logits, axis=1), y_test)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=classes)
disp.plot()
plt.show()