In [None]:
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt

In [None]:
class MultilayerPerceptron(nn.Module):
    def __init__(self, layers_sizes, dropout_rate=0.1):
        super(MultilayerPerceptron, self).__init__()

        layers = []
        for idx, s in enumerate(layers_sizes[:-1]):
            if idx + 1 == len(layers_sizes) - 1:
                layers.append(nn.Linear(s, layers_sizes[idx + 1]))
            else:
                layers.append(nn.Linear(s, layers_sizes[idx + 1]))
                layers.append(..)
                layers.append(nn.Dropout(p=dropout_rate))

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

In [None]:
def initialize_weights(model):
    if isinstance(model, nn.Linear):
        nn.init.kaiming_uniform_(model.weight)
        if model.bias is not None:
            nn.init.constant_(model.bias, 0)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# 1. Аппроксимация прямой

In [None]:
def foo(x, mean=0, std=1):
    return 2 * x + 4 + np.random.normal(mean, std, size=x.shape)


train_size = 100
X_train = np.random.uniform(low=-2, high=2, size=train_size)
y_train = foo(X_train, 0, 0)

X_test = np.arange(-3, 3, 0.01)
y_test = foo(X_test, 0, 0)

In [None]:
X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)
X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)

y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

In [None]:
print(X_train.shape, y_train.shape)

In [None]:
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_test, y_test)

In [None]:
batch_size = ..

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

In [None]:
dataloaders_dict = {'train': train_dataloader, 'val': val_dataloader}

In [None]:
hidden_sizes = [..]
input_size = 1
output_size = 1

dropout_rate = ..
device = 'cpu'

LR = ..
STEP_SIZE = ..
num_epochs = ..

In [None]:
layers_sizes = [input_size, *hidden_sizes, output_size]

In [None]:
model = MultilayerPerceptron(layers_sizes, dropout_rate=dropout_rate)
model.apply(initialize_weights)
model = model.to(device)

In [None]:
n = count_parameters(model)
print(f"Number of parameters: {n}")

In [None]:
optimizer = optim.SGD(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=STEP_SIZE, gamma=0.1)

criterion = ..

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, epoch_vis=10):
    train_loss_history = []
    val_loss_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):

        if epoch % epoch_vis == 0:
            print('Epoch {}/{}'.format(epoch, num_epochs - 1))
            print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0

            # Iterate over data.
            for idx, (inputs, labels) in enumerate(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
                        optimizer.step()

                # Accumulate loss
                running_loss += loss.item() * inputs.size(0)

            # Average loss for this epoch
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            if epoch % epoch_vis == 0:
                print('{} Loss: {:.4f}'.format(phase, epoch_loss))

            # Save loss history for plotting
            if phase == 'train':
                train_loss_history.append(epoch_loss)
            else:
                val_loss_history.append(epoch_loss)

    # Plot train and validation loss
    plt.figure(figsize=(10, 6))
    plt.plot(train_loss_history, label='Train Loss')
    plt.plot(val_loss_history, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

    return model, train_loss_history, val_loss_history

model_ft, train_loss, val_loss = train_model(model, dataloaders_dict, criterion, optimizer, num_epochs=num_epochs)

In [None]:
y_pred = model_ft(X_test).squeeze().data
plt.plot(X_test.squeeze(), y_pred)
plt.grid()

# 2. Автодифференцирование

In [None]:
a = torch.tensor([2., 3.], requires_grad=True)
b = torch.tensor([6., 4.], requires_grad=True)

Q = 3 * a ** 3 - b ** 2
Q

In [None]:
external_grad = torch.tensor([1., 1.])
Q.backward(gradient=external_grad)

print(a.grad)
print(b.grad)

In [None]:
model = MultilayerPerceptron([2, 4, 1], dropout_rate=dropout_rate)
model.apply(initialize_weights)
model = model.to(device)

In [None]:
model.network[0].weight.grad

In [None]:
x = torch.ones(2)
y = model(x)
y.backward()

In [None]:
# производная по параметрам
model.network[0].weight.grad

In [None]:
# производная по входу
x = torch.ones(2)
x.requires_grad = True
y = model(x)
dV = torch.autograd.grad(y, x, grad_outputs=torch.ones(1, device=device), create_graph=True, retain_graph=True)[0]
dV

# 3. Взрыв и затухание градиентов

In [None]:
sigmoid = lambda x : 1 / (1 + np.exp(-x))
relu = lambda x : (x > 0).astype(float) * x

weights = np.array([[1, 4], [4, 1]])
activation = sigmoid(np.array([1, 0.01]))

print("Activations")
activations = list()
for iter in range(10):
    activation = sigmoid(activation.dot(weights))
    activations.append(activation)
    print(activation)
    
print("\nGradients")
gradient = np.ones_like(activation)
for activation in reversed(activations):
    gradient = (activation * (1 - activation) * gradient)
    gradient = gradient.dot(weights.transpose())
    print(gradient)

In [None]:
print("Relu Activations")
activations = list()
for iter in range(10):
    activation = relu(activation.dot(weights))
    activations.append(activation)
    print(activation)

print("\nRelu Gradients")
gradient = np.ones_like(activation)
for activation in reversed(activations):
    gradient = ((activation > 0) * gradient).dot(weights.transpose())
    print(gradient)