#Задание 1.

 Напишите функцию, которая моделирует один нейрон с сигмоидной активацией и реализует вычисление градиента для обновления весов и смещений нейрона. Функция должна принимать список векторов признаков, ассоциированные бинарные метки класса, начальные веса, начальное смещение, скорость обучения и количество эпох. Функция должна обновлять веса и смещение с помощью градиентного спуска (классической версии) на основе функции потерь NLL и возвращать обновленные веса, смещение и список значений NLL для каждой эпохи, округленное до четырех десятичных знаков. Проведите обучение на предоставленном наборе данных из задания 4 (для двух разных лет). Опционально сгенерируйте другие подходящие наборы данных. Опишите ваши результаты. Предоставленная функция будет также протестирована во время защиты ДЗ. Можно использовать только чистый torch (без использования autograd и torch.nn).

In [1]:
import torch

In [36]:
def activationf(z):
  return 1/(1 + torch.exp(-z))

def lossf(y, p):
  return -(y * torch.log(p + 1e-8) + (1 - y) * torch.log(1 - p + 1e-8))

def MLP(X, Y_vec, W_vec, b, lr, n_epochs):
  X = torch.tensor(X, dtype=torch.float32)
  Y_vec = torch.tensor(Y_vec, dtype=torch.float32)
  W_vec = torch.tensor(W_vec, dtype=torch.float32)
  b = torch.tensor(b, dtype=torch.float32)

  losses = []
  n_samples = len(X)

  for epoch in range(n_epochs):
    epoch_loss = 0.0
    grad_w = torch.zeros_like(W_vec)
    grad_b = torch.tensor(0.0)

    for i in range(n_samples):
      X_vec = X[i]
      y = Y_vec[i]

      # Прямое распространение
      z = torch.dot(X_vec, W_vec) + b
      p = activationf(z)

      # NLL лосс
      iter_loss = lossf(y, p)
      epoch_loss += iter_loss

      # Обратное распространение
      error = p - y
      grad_w += X_vec * error
      grad_b += error


    grad_w /= n_samples
    grad_b /= n_samples

    # Обновляем веса в конце э похи после того, как мы прошлись
    # по всем элементам выборки
    W_vec -= lr * grad_w
    b -= lr * grad_b

    losses.append(epoch_loss / n_samples)

  return W_vec, b, losses


Проверка на примере

In [37]:
features = [[1.0, 2.0], [2.0, 1.0], [-1.0, -2.0]]
labels = [1, 0, 0]
initial_weights = [0.1, -0.2]
initial_bias = 0.0
learning_rate = 0.1
epochs = 2

result = MLP(features, labels,
initial_weights, initial_bias,
             learning_rate, epochs)
print(result)

(tensor([ 0.1070, -0.0847]), tensor(-0.0335), [tensor(0.8006), tensor(0.7631)])


Обучение на данных из задания 4

In [38]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

In [39]:
# Загрузка данных
x = pd.read_csv('train_x.csv')
y = pd.read_csv('train_y.csv')
test_x = pd.read_csv('test_x.csv')

x = x.rename(columns={x.columns[0]: 'id'})
y = y.rename(columns={y.columns[0]: 'id'})

data = pd.merge(x, y, on='id')
test_ids = test_x['id'].values

data = data[data['year'].isin([2003, 2004])]

X_all = data.drop(['id', 'year'], axis=1).values.astype(np.float32)
y_all = data['year'].values.astype(np.float32)

y_all_binary = np.where(y_all == 2003, 0, 1).astype(np.float32)

print(f"Класс 0: {np.sum(y_all == 2003)} образцов")
print(f"Класс 1: {np.sum(y_all == 2004)} образцов")

# Разделение на train и test
X_train, X_test, y_train, y_test = train_test_split(
    X_all, y_all_binary, test_size=0.2, random_state=42, stratify=y_all_binary
)

Класс 0: 703 образцов
Класс 1: 792 образцов


In [40]:
# Параметры
n_features = X_train.shape[1]
W_initial = np.random.randn(n_features).astype(np.float32) * 0.01
b_initial = 0.0
learning_rate = 0.01
epochs = 500

In [41]:
W_trained, b_trained, losses = MLP(X_train, y_train, W_initial, b_initial, learning_rate, epochs)
print(f"Final loss: {losses[-1]:.4f}")

Final loss: 8.6559


In [42]:
# Предсказания на тренировочных данных
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
train_predictions = activationf(torch.matmul(X_train_tensor, W_trained) + b_trained)
train_predicted_labels = (train_predictions > 0.5).float()

# Точность на тренировочных данных
train_accuracy = (train_predicted_labels == torch.tensor(y_train)).float().mean()
print(f"Train accuracy: {train_accuracy.item() * 100:.2f}%")

Train accuracy: 47.41%


In [43]:
# Предсказания на тестовых данных
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
test_predictions = activationf(torch.matmul(X_test_tensor, W_trained) + b_trained)
test_predicted_labels = (test_predictions > 0.5).float()

# Точность на тестовых данных
test_accuracy = (test_predicted_labels == torch.tensor(y_test)).float().mean()
print(f"Test accuracy: {test_accuracy.item() * 100:.2f}%")

Test accuracy: 47.49%


#Задание 2.

Реализуйте базовые функции autograd. Можете вдохновиться видео от Andrej Karpathy. Напишите класс, аналогичный предоставленному классу 'Element', который реализует основные операции autograd: сложение, умножение и активацию ReLU. Класс должен обрабатывать скалярные объекты и правильно вычислять градиенты для этих операций посредством автоматического дифференцирования. Плюсом будет набор предоставленных тестов, оценивающих правильность вычислений. Большим плюсом будет, если тесты будут написаны с помощью unittest. Можно использовать только чистый torch (без использования autograd и torch.nn). За каждую нереализованную операцию будет вычитаться 3 балла.

Вдохновился видеороликом Andrey Karpathy, как и было указано в задании. Предусмотрел возможность работы напрямую со скалярами

In [13]:
class Node:
	def __init__(self, data, _children=(), _op=''):
		self.data = data
		self.grad = 0
		self._backward = lambda: None
		self._prev = set(_children)
		self._op = _op

	def __repr__(self):
		return f" Element(data={self.data}, grad={self.grad})"

	def __add__(self, other):
		other = other if isinstance(other, Node) else Node(other)
		out = Node(self.data + other.data, (self, other), '+')

		def _backward():
			self.grad += out.grad
			other.grad += out.grad

		out._backward = _backward

		return out

	def __mul__(self, other):
		other = other if isinstance(other, Node) else Node(other)
		out = Node(self.data * other.data, (self, other), '*')

		def _backward():
			self.grad += other.data * out.grad
			other.grad += self.data * out.grad

		out._backward = _backward

		return out

	def relu(self):
		out = Node(0 if self.data < 0 else self.data, (self, ), 'relu')

		def _backward():
			self.grad += (0 if self.data < 0 else 1) * out.grad

		out._backward = _backward

		return out

	def backward(self):
		# Топологическая сортировка для прохода backward-ом по графу
		visited = set()
		sorted = []

		def top_sort(v):
			if v not in visited:
				visited.add(v)
				for child in v._prev:
					top_sort(child)
				sorted.append(v)

		top_sort(self)

		# Градиент последней вершины равен 1
		self.grad = 1
		# Берем reversed, т.к. в backward идем с конца
		for v in reversed(sorted):
			v._backward()


Юнит-тесты

In [15]:
import unittest
from io import StringIO

class TestNodeBackward(unittest.TestCase):
    def test_addition_gradient(self):
        # Проверка градиента для операции сложения
        a = Node(2.0)
        b = Node(3.0)
        c = a + b
        c.backward()

        self.assertEqual(a.grad, 1.0)
        self.assertEqual(b.grad, 1.0)
        self.assertEqual(c.data, 5.0)

    def test_multiplication_gradient(self):
        # Проверка градиента для операции умножения
        a = Node(2.0)
        b = Node(3.0)
        c = a * b
        c.backward()

        self.assertEqual(a.grad, 3.0)  # dc/da = b
        self.assertEqual(b.grad, 2.0)  # dc/db = a
        self.assertEqual(c.data, 6.0)

    def test_relu_positive(self):
        # Проверка ReLU для положительного числа
        a = Node(2.0)
        b = a.relu()
        b.backward()

        self.assertEqual(b.data, 2.0)
        self.assertEqual(a.grad, 1.0)

    def test_relu_negative(self):
        # Проверка ReLU для отрицательного числа
        a = Node(-2.0)
        b = a.relu()
        b.backward()

        self.assertEqual(b.data, 0.0)
        self.assertEqual(a.grad, 0.0)

    def test_expression_chain(self):
        # Проверка цепочки операций
        a = Node(2.0)
        b = Node(3.0)
        c = a * b  # 6.0
        d = c + Node(1.0)  # 7.0
        e = d.relu()  # 7.0
        e.backward()

        self.assertEqual(e.data, 7.0)
        self.assertEqual(a.grad, 3.0)  # de/da = de/dd * dd/dc * dc/da = 1 * 1 * b
        self.assertEqual(b.grad, 2.0)  # de/db = de/dd * dd/dc * dc/db = 1 * 1 * a

    def test_scalar_addition(self):
        # Проверка сложения со скаляром
        a = Node(2.0)
        b = a + 3.0  # 5.0
        b.backward()

        self.assertEqual(b.data, 5.0)
        self.assertEqual(a.grad, 1.0)

    def test_scalar_multiplication(self):
        # Проверка умножения на скаляр
        a = Node(2.0)
        b = a * 3.0  # 6.0
        b.backward()

        self.assertEqual(b.data, 6.0)
        self.assertEqual(a.grad, 3.0)

    def test_multiple_usage(self):
        # Проверка множественного использования узла
        a = Node(3.0)
        b = a * a  # 9.0
        b.backward()

        self.assertEqual(b.data, 9.0)
        self.assertEqual(a.grad, 6.0)  # db/da = 2*a

# Создаем test suite и запускаем
def run_tests():
    suite = unittest.TestLoader().loadTestsFromTestCase(TestNodeBackward)
    runner = unittest.TextTestRunner(verbosity=2)
    return runner.run(suite)

# Запускаем тесты в Jupyter
run_tests()

test_addition_gradient (__main__.TestNodeBackward.test_addition_gradient) ... ok
test_expression_chain (__main__.TestNodeBackward.test_expression_chain) ... ok
test_multiple_usage (__main__.TestNodeBackward.test_multiple_usage) ... ok
test_multiplication_gradient (__main__.TestNodeBackward.test_multiplication_gradient) ... ok
test_relu_negative (__main__.TestNodeBackward.test_relu_negative) ... ok
test_relu_positive (__main__.TestNodeBackward.test_relu_positive) ... ok
test_scalar_addition (__main__.TestNodeBackward.test_scalar_addition) ... ok
test_scalar_multiplication (__main__.TestNodeBackward.test_scalar_multiplication) ... ok

----------------------------------------------------------------------
Ran 8 tests in 0.014s

OK


<unittest.runner.TextTestResult run=8 errors=0 failures=0>

#Задание 3.

Реализуйте один из оптимизаторов на выбор. Придумайте и напишите тесты для проверки выбранного оптимизатора. Проведите обучение нейрона из первого задания с использованием оптимизатора, а не ванильного градиентного спуска. Также опишите идею алгоритма (+1 балл). {*} Можете реализовать более 1 алгоритма. Каждый следующий даст 1 балл.

Варианты:

Momentum (3 балла)
Nesterov (3 балла)
Adagrad (4 балла)
Adadelta (4 балла)
RMSProp (5 баллов)
Adam (5 баллов)
Nadam (6 баллов)
NAG (6 баллов)
AdamW (6 баллов)


Формула взята из хендбука Яндекса

\begin{aligned}
v_{k+1} &= \beta_1 v_k + (1 - \beta_1) \nabla f(x_k) \\
G_{k+1} &= \beta_2 G_k + (1 - \beta_2) (\nabla f(x_k))^2 \\
x_{k+1} &= x_k - \left( \frac{\alpha}{\sqrt{G_{k+1} + \varepsilon}} v_{k+1} + \lambda x_k \right)
\end{aligned}

In [44]:
class AdamW:
  def __init__(self, params, beta_1, beta_2,
               lr=0.001, eps=1e-8, weight_decay=0.01):
    self.params = params
    self.beta_1 = beta_1
    self.beta_2 = beta_2
    self.lr = lr
    self.eps = eps
    self.weight_decay = weight_decay
    self.steps = 0

    self.v = [torch.zeros_like(param) for param in params]
    self.G = [torch.zeros_like(param) for param in params]

  def zero_grad(self):
    for param in self.params:
      if param.grad is not None:
        param.grad.zero_()

  def step(self):
    self.steps += 1
    for i, param in enumerate(self.params):
      if param.grad is None:
        continue

      grad = param.grad

      self.v[i] = self.beta_1 * self.v[i] + (1 - self.beta_1) * param.grad

      self.G[i] = self.beta_2 * self.G[i] + (1 - self.beta_2) * param.grad ** 2

      v_hat = (1 / (1 - self.beta_1 ** self.steps)) * self.v[i]

      G_hat = (1 / (1 - self.beta_2 ** self.steps)) * self.G[i]

      param.data -= self.lr * v_hat / torch.sqrt(G_hat + self.eps) + self.weight_decay * param.data


In [45]:
def MLP_with_AdamW(X, Y_vec, W_vec, b, lr=0.01, n_epochs=1000, weight_decay=0.01):
    X = torch.tensor(X, dtype=torch.float32)
    Y_vec = torch.tensor(Y_vec, dtype=torch.float32)
    W_vec = torch.tensor(W_vec, dtype=torch.float32, requires_grad=True)
    b = torch.tensor(b, dtype=torch.float32, requires_grad=True)

    # Инициализация оптимизатора
    optimizer = AdamW([W_vec, b], 0.9, 0.9, lr=lr, weight_decay=weight_decay)

    losses = []
    n_samples = len(X)

    for epoch in range(n_epochs):
        epoch_loss = 0.0
        grad_w = torch.zeros_like(W_vec)
        grad_b = torch.tensor(0.0)

        for i in range(n_samples):
            X_vec = X[i]
            y = Y_vec[i]

            # Прямое распространение
            z = torch.dot(X_vec, W_vec) + b
            p = activationf(z)

            # Вычисление потерь
            iter_loss = lossf(y, p)
            epoch_loss += iter_loss.item()

            # Обратное распространение
            error = p - y
            grad_w += X_vec * error
            grad_b += error

        grad_w /= n_samples
        grad_b /= n_samples

        W_vec.grad = grad_w
        b.grad = grad_b

        # Обновление весов с AdamW
        optimizer.step()
        optimizer.zero_grad()

        losses.append(epoch_loss / n_samples)

    return W_vec.detach(), b.detach(), losses

In [46]:
result = MLP_with_AdamW(features, labels, initial_weights, initial_bias,learning_rate, epochs)
print(result)

(tensor([-0.7116,  0.9921]), tensor(-0.7098), [0.8006192048390707, 0.7914285858472189, 0.7825563351313273, 0.7739961942036947, 0.7657400369644165, 0.7577776710192362, 0.7500940958658854, 0.7426670789718628, 0.7354649305343628, 0.7284481922785441, 0.72157750527064, 0.7148226102193197, 0.7081662813822428, 0.7016016244888306, 0.6951272090276083, 0.6887446045875549, 0.6824559768040975, 0.676263431708018, 0.6701686580975851, 0.6641728679339091, 0.6582770148913065, 0.6524815758069357, 0.6467864712079366, 0.6411916216214498, 0.6356966892878214, 0.6303008596102396, 0.6250034769376119, 0.6198034683863322, 0.6146997312704722, 0.6096911032994589, 0.6047762632369995, 0.5999538401762644, 0.5952223539352417, 0.5905802647272745, 0.5860260725021362, 0.5815580089886984, 0.5771743257840475, 0.5728734234968821, 0.5686534345149994, 0.564512570699056, 0.5604488253593445, 0.5564603606859843, 0.5525452593962351, 0.548701673746109, 0.5449275573094686, 0.5412210822105408, 0.5375805199146271, 0.5340040822823843

In [47]:
W_trained, b_trained, losses = MLP_with_AdamW(X_train, y_train, W_initial, b_initial, learning_rate, epochs)
print(f"Final loss: {losses[-1]:.4f}")

Final loss: 1.4785


In [48]:
# Предсказания на тренировочных данных
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
train_predictions = activationf(torch.matmul(X_train_tensor, W_trained) + b_trained)
train_predicted_labels = (train_predictions > 0.5).float()

# Точность на тренировочных данных
train_accuracy = (train_predicted_labels == torch.tensor(y_train)).float().mean()
print(f"Train accuracy: {train_accuracy.item() * 100:.2f}%")

Train accuracy: 48.24%


In [49]:
# Предсказания на тестовых данных (наша валидационная выборка)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
test_predictions = activationf(torch.matmul(X_test_tensor, W_trained) + b_trained)
test_predicted_labels = (test_predictions > 0.5).float()

# Точность на тестовых данных
test_accuracy = (test_predicted_labels == torch.tensor(y_test)).float().mean()
print(f"Test accuracy: {test_accuracy.item() * 100:.2f}%")

Test accuracy: 47.83%


In [22]:
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

#Задание 4.



In [23]:
# Загрузка данных
train_x = pd.read_csv('train_x.csv')
train_y = pd.read_csv('train_y.csv')
test_x = pd.read_csv('test_x.csv')

train_x = train_x.rename(columns={train_x.columns[0]: 'id'})
train_y = train_y.rename(columns={train_y.columns[0]: 'id'})

data = pd.merge(train_x, train_y, on='id')
test_ids = test_x['id'].values

X_all = data.drop(['id', 'year'], axis=1).values.astype(np.float32)
y_all = data['year'].values.astype(np.float32)
X_test = test_x.drop('id', axis=1).values.astype(np.float32)

Выбрал ResNext, которую вы упомянули на практике. Пробовал TabresNet, разницы по MSE не было

In [None]:
class TabDataset(Dataset):
    def __init__(self, features, targets=None):
        self.features = features.astype(np.float32)
        self.targets = targets.astype(np.float32) if targets is not None else None

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        x = torch.from_numpy(self.features[idx]).float()
        if self.targets is not None:
            y = torch.tensor(self.targets[idx]).float()
            return x, y
        return x

class ResNeXtTabular(nn.Module):
    def __init__(self, input_size, hidden_dim=512, cardinality=4, dropout=0.3):
        super().__init__()
        self.input_ln = nn.LayerNorm(input_size)
        self.fc_in = nn.Linear(input_size, hidden_dim)
        self.act = nn.Mish()

        self.block1 = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.Mish(),
            nn.Dropout(dropout)
        )

        self.block2 = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.Mish(),
            nn.Dropout(dropout*0.8)
        )

        self.fc_mid = nn.Linear(hidden_dim, hidden_dim//2)
        self.mid_ln = nn.LayerNorm(hidden_dim//2)
        self.dropout_final = nn.Dropout(dropout*0.5)
        self.fc_out = nn.Linear(hidden_dim//2, 1)

    def forward(self, x):
        x = self.input_ln(x)
        x = self.fc_in(x)
        x = self.act(x)
        x = x + self.block1(x)
        x = x + self.block2(x)
        x = self.fc_mid(x)
        x = self.act(self.mid_ln(x))
        x = self.dropout_final(x)
        x = self.fc_out(x)
        return x.view(-1)

In [None]:
# K-Fold кросс-валидация
n_splits = 5  # Количество фолдов
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
test_preds_list = []  # Список для предсказаний с каждого фолда

for fold, (train_idx, val_idx) in enumerate(kf.split(X_all)):
    print(f'Fold {fold + 1}/{n_splits}')

    # Разделение данных
    X_train, X_val = X_all[train_idx], X_all[val_idx]
    y_train, y_val = y_all[train_idx], y_all[val_idx]

    # Масштабирование для каждого фолда
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)

    # DataLoader
    train_dataset = TabDataset(X_train, y_train)
    val_dataset = TabDataset(X_val, y_val)
    test_dataset = TabDataset(X_test_scaled)

    train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=512, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=512, shuffle=False)

    # Инициализация модели для фолда
    model = ResNeXtTabular(input_size=X_train.shape[1]).to(device)
    optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
    criterion = nn.MSELoss()

    # Обучение
    for epoch in range(500):
        model.train()
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            preds = model(xb)
            loss = criterion(preds, yb)
            loss.backward()
            optimizer.step()

    # Валидация
    model.eval()
    val_preds = []
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy()
            val_preds.append(preds)

    val_preds = np.concatenate(val_preds).ravel()
    val_mse = mean_squared_error(y_val, val_preds)
    print(f"Fold {fold + 1} Validation MSE: {val_mse:.6f}")

    # Предсказание на тесте
    test_preds = []
    with torch.no_grad():
        for xb in test_loader:
            xb = xb.to(device)
            preds = model(xb).cpu().numpy()
            test_preds.append(preds)

    test_preds_list.append(np.concatenate(test_preds).ravel())

# Усреднение предсказаний по фолдам
test_preds_avg = np.mean(test_preds_list, axis=0)
final_test_preds = np.rint(test_preds_avg).astype(int)

# Создание submission файла
submission = pd.DataFrame({
    'id': test_ids,
    'year': final_test_preds
})
submission.to_csv('submission.csv', index=False)
print("Submission saved to submission.csv")