# Практическое задание

## Данные о студенте

1. **ФИО**: Денисов Даниил Михайлович
2. **Факультет**: Механико-математический
3. **Курс**: 2 (магистратура)
4. **Группа**: М2

## Замечания

* Заполненный ноутбук необходимо сдать боту
* Соблюдаем кодекс чести (по нулям и списавшему, и давшему списать)
* Можно (и нужно!) применять для реализации только библиотеку **Numpy**
* Ничего, крому Numpy, нельзя использовать для реализации 
* **Keras** используется только для тестирования Вашей реализации
* Если какой-то из классов не проходит приведенные тесты, то соответствующее задание не оценивается
* Возможно использование дополнительных (приватных) тестов
 

## Реализация собственного нейросетевого пакета для запуска и обучения нейронных сетей

Задание состоит из трёх частей:
1. Реализация прямого вывода нейронной сети (5 баллов)
2. Реализация градиентов по входу и распространения градиента по сети (5 баллов)
3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети (10 баллов)

Дополнительные баллы можно получить при реализации обучения сети со свёрточными слоями (10 баллов), с транспонированной свёрткой (10 баллов), дополнительного оптимизатора (5 баллов). 

###  1. Реализация вывода собственной нейронной сети

1.1 Внимательно ознакомьтесь с интерфейсом слоя. Любой слой должен содержать как минимум три метода:
- конструктор
- прямой вывод 
- обратный вывод, производные по входу и по параметрам

In [1]:
class Layer(object):
    def __init__(self):
        self.name = 'Layer'
    
    def forward(self, input_data):
        pass

    def backward(self, input_data):
        return [self.grad_x(input_data), self.grad_param(input_data)]
    
    def grad_x(self, input_data):
        pass

    def grad_param(self, input_data):
        return []
    
    def update_param(self, grads, learning_rate):
        pass

1.2 Ниже предствален интерфейс класса  Network. Обратите внимание на реализацию метода predict, который последовательно обрабатывает входные данные слой за слоем.

In [2]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class Network(object):
    def __init__(self, layers, loss=None):
        self.name = 'Network'
        self.layers = layers
        self.loss = loss
    
    def forward(self, input_data):
        return self.predict(input_data)
    
    def grad_x(self, input_data, labels):
        # Intermediary gradients (forward pass)
        grads_inter = []
        current_input = input_data
        for layer in self.layers:
            grads_inter.append(layer.grad_x(current_input))
            current_input = layer.forward(current_input)

        # Target gradient (backward pass)
        grad = self.loss.grad_x(current_input, labels)
        for layer_grad in reversed(grads_inter):
            grad = np.einsum('bi,bij->bj', grad, layer_grad)

        return grad
    
    def grad_param(self, input_data, labels):
        # Intermediary gradients (forward pass)
        grads_inter = []
        current_input = input_data
        for layer in self.layers:
            grads_inter.append(layer.backward(current_input))
            current_input = layer.forward(current_input)
        
        # Target gradients (backward pass)
        grads_targ = []
        grad = self.loss.grad_x(current_input, labels)
        for layer_grad_x, layer_grad_param in reversed(grads_inter):
            grads_targ.append([np.einsum('bi,bij->bj', grad, grad_param) 
                               for grad_param in layer_grad_param])
            grad = np.einsum('bi,bij->bj', grad, layer_grad_x)

        return grads_targ[::-1]

    def update(self, grads, learning_rate):
        for layer, grad in zip(self.layers, grads):
            layer.update_param(grad, learning_rate)
    
    def predict(self, input_data):
        current_input = input_data
        for layer in self.layers:
            current_input = layer.forward(current_input)
        return current_input
    
    def calculate_loss(self, input_data, labels):
        return self.loss.forward(self.predict(input_data), labels)
    
    def train_step(self, input_data, labels, learning_rate=0.001):
        grads = self.grad_param(input_data, labels)
        self.update(grads, learning_rate)
    
    def fit(self, trainX, trainY, validation_split=0.25, 
            batch_size=1, nb_epoch=1, learning_rate=0.01):
        
        train_x, val_x, train_y, val_y = train_test_split(trainX, trainY, 
                                                          test_size=validation_split,
                                                          random_state=42)
        
        for epoch in range(nb_epoch):
            for i in tqdm(range(int(len(train_x)/batch_size))):
                batch_x = train_x[i*batch_size: (i+1)*batch_size]
                batch_y = train_y[i*batch_size: (i+1)*batch_size]
                self.train_step(batch_x, batch_y, learning_rate)
            
            print('%d epoch: val %.2f' %(epoch + 1, self.evaluate(val_x, val_y)))
            
    def evaluate(self, testX, testY):
        y_pred = np.argmax(self.predict(testX), axis=1)
        y_true = np.argmax(testY, axis=1)
        return np.sum(y_pred == y_true) / len(y_true)

#### 1.1 Необходимо реализовать метод forward для вычисления следующих слоёв:

- DenseLayer
- ReLU
- Softmax
- FlattenLayer

In [3]:
import numpy as np

In [4]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, W_init=None, b_init=None):
        self.name = 'Dense'
        self.input_dim = input_dim
        self.output_dim = output_dim

        # Use He initialization by default
        inputs = input_dim
        self.W = W_init if W_init is not None \
                 else np.random.normal(0, np.sqrt(2 / inputs), 
                                       size=(input_dim, output_dim))
        self.b = b_init if b_init is not None \
                 else np.zeros(output_dim, dtype='float32')
    
    def forward(self, input_data):
        return np.einsum('bi,ij->bj', input_data, self.W) + self.b

    def grad_x(self, input_data):
        # dy/dx = W^T
        return np.tile(self.W.T, reps=(len(input_data), 1, 1))

    def grad_W(self, input_data):
        # dy/dW = (x_1 * I ... x_n * I)
        W_rows, W_cols = self.W.shape
        grad = np.zeros((len(input_data), W_rows, W_cols, W_cols))
        diag = np.einsum('bijj->bij', grad)
        diag[:] = input_data[..., None]
        return grad.transpose(0, 2, 1, 3).reshape(len(input_data), W_cols, -1)

    def grad_b(self, input_data):
        # dy/db = I
        return np.tile(np.eye(len(self.b)), reps=(len(input_data), 1, 1))

    def grad_param(self, input_data):
        return [self.grad_W(input_data), self.grad_b(input_data)]
    
    def update_W(self, grad, learning_rate):
        self.W -= learning_rate * np.mean(grad, axis=0).reshape(self.W.shape)
    
    def update_b(self, grad, learning_rate):
        self.b -= learning_rate * np.mean(grad, axis=0)
        
    def update_param(self, grads, learning_rate):
        self.update_W(grads[0], learning_rate)
        self.update_b(grads[1], learning_rate)

In [5]:
class ReLU(Layer):
    def __init__(self):
        self.name = 'ReLU'
    
    def forward(self, input_data):
        return np.clip(input_data, 0, None)

    def grad_x(self, input_data):
        batch, *dims = input_data.shape
        size = np.prod(dims)
        grad = np.zeros((batch, size, size))
        diag = np.einsum('bii->bi', grad)
        diag[:] = np.ceil(np.clip(input_data, 0, 1)).reshape(batch, -1)
        return grad

In [6]:
class Softmax(Layer):
    def __init__(self):
        self.name = 'Softmax'
    
    def forward(self, input_data):
        exps = np.e ** input_data
        return exps / np.sum(exps, axis=1, keepdims=True)
    
    def grad_x(self, input_data):
        # dy/dx = diag(e^x/S) - (e^x/S)^T * (e^x/S)
        forward = self.forward(input_data)
        grad = -np.einsum('bi,bj->bij', forward, forward)
        diag = np.einsum('bii->bi', grad)
        diag[:] = forward + diag
        return grad

In [7]:
class FlattenLayer(Layer):
    def __init__(self):
        self.name = 'Flatten'
        
    def forward(self, input_data):
        return input_data.reshape(len(input_data), -1)
    
    def grad_x(self, input_data):
        batch, *dims = input_data.shape
        size = np.prod(dims)
        return np.tile(np.eye(size), reps=(batch, 1, 1))

#### 1.2 Реализуйте теперь свёрточный слой и транспонированную свёртку  (опционально)

In [8]:
class Conv2DLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3, 
                 padding='same', stride=1, K_init=None, b_init=None):
        # padding: 'same' или 'valid'
        # Работаем с квадратными ядрами, поэтому kernel_size - одно число
        # Работаем с единообразным сдвигом, поэтому stride - одно число
        # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
        self.name = 'Conv2D'
        self.kernel_size = kernel_size
        self.input_channels = input_channels
        self.output_channels = output_channels

        # Use He initialization by default
        inputs = input_channels * kernel_size ** 2
        self.kernel = K_init if K_init is not None \
                      else np.random.normal(0, np.sqrt(2 / inputs), 
                                            size=(kernel_size, kernel_size, 
                                                  input_channels, output_channels))
        self.bias = b_init if b_init is not None \
                    else np.zeros(output_channels, dtype='float32')
        self.padding = padding        
        self.stride = stride
    
    def forward(self, input_data):
        # На входе - четырехмерный тензор вида [batch, input_channels, height, width]
        # Вначале нужно проверить на согласование размерностей входных данных и ядра!
        # Нужно заполнить Numpy-тензор out

        # Consistency check
        self.check_input(input_data)

        # Apply padding if needed
        if self.padding == 'same':
            input_data = self.pad(input_data, self.get_pads())

        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))
        
        # Get patches
        patches = self.get_patches(input_data, (h_out, w_out))

        # Apply convolution w/ bias
        conv = np.tensordot(patches, self.kernel, axes=([3, 4, 5], [0, 1, 2])) + self.bias

        return conv.transpose(0, 3, 1, 2)

    def grad_x(self, input_data):
        # Consistency check
        self.check_input(input_data)

        # Apply padding if needed
        if self.padding == 'same':
            pad_l, pad_r = self.get_pads()  # used later
            input_data = self.pad(input_data, (pad_l, pad_r))

        k, s = self.kernel_size, self.stride
        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))

        # Input height indexer
        indh_in = np.arange(k) + s * np.arange(h_out).reshape(h_out, 1).repeat(k, axis=1)
        indh_in = indh_in.reshape(h_out, 1, k, 1)

        # Input width indexer
        indw_in = np.arange(k) + s * np.arange(w_out).reshape(w_out, 1).repeat(k, axis=1)
        indw_in = indh_in.reshape(1, w_out, 1, k)

        # Output height/width indexers
        indh_out = np.arange(h_out).reshape(h_out, 1, 1, 1)
        indw_out = np.arange(w_out).reshape(1, w_out, 1, 1)

        # Get the gradient
        grad = np.zeros((b, h_out, w_out, h_in, w_in, c_in, c_out))
        grad[..., indh_out, indw_out, indh_in, indw_in, :, :] = self.kernel

        # Remove padding if needed
        if self.padding == 'same':
            pad = pad_l + pad_r
            h_in -= pad
            w_in -= pad
            grad = grad[..., pad_l:h_in+pad_l, pad_l:w_in+pad_l, :, :]

        grad = grad.transpose(0, 6, 1, 2, 5, 3, 4)
        grad = grad.reshape(b, c_out * h_out * w_out, c_in * h_in * w_in)

        return grad
    
    def grad_kernel(self, input_data):
        # Consistency check
        self.check_input(input_data)

        # Apply padding if needed
        if self.padding == 'same':
            input_data = self.pad(input_data, self.get_pads())

        k = self.kernel_size
        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))
        
        # Get patches
        patches = self.get_patches(input_data, (h_out, w_out))

        # Get the gradient
        grad = np.tile(np.eye(c_out), reps=(b, h_out, w_out, k, k, c_in, 1, 1))
        diag = np.einsum('bijklmnn->bijklmn', grad)
        diag[:] = patches[..., None]

        grad = grad.transpose(0, 6, 1, 2, 3, 4, 5, 7)
        grad = grad.reshape(b, c_out * h_out * w_out, k * k * c_in * c_out)

        return grad

    def grad_param(self, input_data):
        return [self.grad_kernel(input_data)]

    def update_kernel(self, grad, learning_rate):
        self.kernel -= learning_rate * np.mean(grad, axis=0).reshape(self.kernel.shape)
        
    def update_param(self, grads, learning_rate):
        self.update_kernel(grads[0], learning_rate)

    def check_input(self, input_data):
        b, c, h, w = input_data.shape
        if c != self.input_channels:
            raise ValueError(f"Input channels mismatch: \
                               got {c}, expected {self.input_channels}")
        if h < self.kernel_size or w < self.kernel_size:
            raise ValueError(f"Dimensions mismatch: \
                               got {h, w}, expected at least {self.kernel_size, self.kernel_size}")

    def get_output_shape(self, input_shape):
        return (np.array(input_shape) - self.kernel_size) // self.stride + 1

    def get_pads(self):
        pad = self.kernel_size - 1
        pad_l = pad // 2
        pad_r = pad - pad_l
        return pad_l, pad_r

    def pad(self, data, pads):
        return np.pad(data, ((0, 0), (0, 0), pads, pads))

    def get_patches(self, input_data, output_shape):
        k, s = self.kernel_size, self.stride
        h_out, w_out = output_shape

        # Height indexer
        indh = np.arange(k) + s * np.arange(h_out).reshape(h_out, 1).repeat(k, axis=1)
        indh = indh.reshape(h_out, 1, k, 1).repeat(w_out, axis=1)

        # Width indexer
        indw = np.arange(k) + s * np.arange(w_out).reshape(w_out, 1).repeat(k, axis=1)
        indw = np.tile(indw.reshape(1, w_out, 1, k), reps=(h_out, 1, 1, 1))

        # Extract patches; keep batch and channels
        patches = input_data[..., indh, indw]

        return patches.transpose(0, 2, 3, 4, 5, 1)

In [9]:
class Conv2DTrLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3, 
                 padding=0, stride=1, K_init=None, b_init=None):      
        # padding: число (сколько отрезать от модифицированной входной карты)
        # Работаем с квадратными ядрами, поэтому kernel_size - одно число
        # stride - одно число (коэффициент расширения)
        # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
        self.name = 'Conv2DTr'
        self.kernel_size = kernel_size
        self.input_channels = input_channels
        self.output_channels = output_channels
        
        # Use He initialization by default
        inputs = input_channels * kernel_size ** 2
        self.kernel = K_init if K_init is not None \
                      else np.random.normal(0, np.sqrt(2 / inputs), 
                                            size=(kernel_size, kernel_size, 
                                                  input_channels, output_channels))
        self.bias = b_init if b_init is not None \
                    else np.zeros(output_channels, dtype='float32')
        self.padding = padding
        self.stride = stride
    
    def forward(self, input_data):
        # На входе - четырехмерный тензор вида [batch, input_channels, height, width]
        # Вначале нужно проверить на согласование размерностей входных данных и ядра!
        # Нужно заполнить Numpy-тензор out 
        
        # Consistency check
        self.check_input(input_data)

        # Inflate and pad with zeros
        input_data = self.inflate(input_data)

        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))
        
        # Get patches
        patches = self.get_patches(input_data, (h_out, w_out))

        # Apply convolution w/ bias
        conv = np.tensordot(patches, self.kernel, axes=([3, 4, 5], [0, 1, 2])) + self.bias

        return conv.transpose(0, 3, 1, 2)
    
    def grad_x(self, input_data):
        # Consistency check
        self.check_input(input_data)

        # Inflate and pad with zeros
        input_data = self.inflate(input_data)

        k = self.kernel_size
        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))

        # Input height indexer
        indh_in = np.arange(k) + np.arange(h_out).reshape(h_out, 1).repeat(k, axis=1)
        indh_in = indh_in.reshape(h_out, 1, k, 1)

        # Input width indexer
        indw_in = np.arange(k) + np.arange(w_out).reshape(w_out, 1).repeat(k, axis=1)
        indw_in = indh_in.reshape(1, w_out, 1, k)

        # Output height/width indexers
        indh_out = np.arange(h_out).reshape(h_out, 1, 1, 1)
        indw_out = np.arange(w_out).reshape(1, w_out, 1, 1)

        # Get the gradient
        grad = np.zeros((b, h_out, w_out, h_in, w_in, c_in, c_out))
        grad[..., indh_out, indw_out, indh_in, indw_in, :, :] = self.kernel

        grad = grad.transpose(0, 6, 1, 2, 5, 3, 4)

        # Deflate to the original input
        grad = self.deflate(grad)
        
        grad = grad.reshape(b, c_out * h_out * w_out, -1)

        return grad
    
    def grad_kernel(self, input_data):
        # Consistency check
        self.check_input(input_data)

        # Inflate and pad with zeros
        input_data = self.inflate(input_data)

        k = self.kernel_size
        b, c_in, h_in, w_in = input_data.shape
        c_out, h_out, w_out = self.output_channels, *self.get_output_shape((h_in, w_in))
        
        # Get patches
        patches = self.get_patches(input_data, (h_out, w_out))

        # Get the gradient
        grad = np.tile(np.eye(c_out), reps=(b, h_out, w_out, k, k, c_in, 1, 1))
        diag = np.einsum('bijklmnn->bijklmn', grad)
        diag[:] = patches[..., None]

        grad = grad.transpose(0, 6, 1, 2, 3, 4, 5, 7)
        grad = grad.reshape(b, c_out * h_out * w_out, k * k * c_in * c_out)

        return grad

    def grad_param(self, input_data):
        return [self.grad_kernel(input_data)]

    def update_kernel(self, grad, learning_rate):
        self.kernel -= learning_rate * np.mean(grad, axis=0).reshape(self.kernel.shape)
        
    def update_param(self, grads, learning_rate):
        self.update_kernel(grads[0], learning_rate)

    def check_input(self, input_data):
        b, c, h, w = input_data.shape
        if c != self.input_channels:
            raise ValueError(f"Input channels mismatch: \
                               got {c}, expected {self.input_channels}")
        if h < self.kernel_size or w < self.kernel_size:
            raise ValueError(f"Dimensions mismatch: \
                               got {h, w}, expected at least {self.kernel_size, self.kernel_size}")

    def inflate(self, data):
        s, p = self.stride, self.kernel_size - self.padding - 1
        h, w = data.shape[-2:]
        
        # Determine the target data dimensions
        h_target = h + (h - 1) * (s - 1) + 2 * p
        w_target = w + (w - 1) * (s - 1) + 2 * p

        # Define the inflated array
        inflated = np.zeros(data.shape[:-2] + (h_target, w_target))

        # Height/width indexers
        indh = p + s * np.arange(h).reshape(h, 1)
        indw = p + s * np.arange(w).reshape(1, w)

        # Write data to the inflated array
        inflated[..., indh, indw] = data

        return inflated

    def deflate(self, data):
        s, p = self.stride, self.kernel_size - self.padding - 1
        h, w = data.shape[-2:]

        # Determine the source data dimensions
        h_source = (h - 2 * p - 1) // s + 1
        w_source = (w - 2 * p - 1) // s + 1

        # Height/width indexers
        indh = p + s * np.arange(h_source).reshape(h_source, 1)
        indw = p + s * np.arange(w_source).reshape(1, w_source)

        return data[..., indh, indw]

    def get_output_shape(self, input_shape):
        return np.array(input_shape) - self.kernel_size + 1

    def get_patches(self, input_data, output_shape):
        k = self.kernel_size
        h_out, w_out = output_shape

        # Height indexer
        indh = np.arange(k) + np.arange(h_out).reshape(h_out, 1).repeat(k, axis=1)
        indh = indh.reshape(h_out, 1, k, 1).repeat(w_out, axis=1)

        # Width indexer
        indw = np.arange(k) + np.arange(w_out).reshape(w_out, 1).repeat(k, axis=1)
        indw = np.tile(indw.reshape(1, w_out, 1, k), reps=(h_out, 1, 1, 1))

        # Extract patches; keep batch and channels
        patches = input_data[..., indh, indw]

        return patches.transpose(0, 2, 3, 4, 5, 1)

#### 1.4 Теперь настало время теста. 
#### Если вы всё сделали правильно, то запустив следующие ячейки у вас должна появиться надпись: Test PASSED

Переходить к дальнейшим заданиям не имеем никакого смысла, пока вы не добьётесь прохождение теста
    

#### Чтение данных

In [10]:
import numpy as np
from keras.utils import np_utils
from keras.datasets import mnist

np.random.seed(42)
 
(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.reshape(X_train.shape[0], 1, 28, 28).astype('float32') / 255
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28).astype('float32') / 255

Y_train = np_utils.to_categorical(y_train, 10)
Y_test = np_utils.to_categorical(y_test, 10)

print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


(60000, 1, 28, 28) (60000, 10) (10000, 1, 28, 28) (10000, 10)


#### Подготовка моделей

In [11]:
import keras
from keras.models import Model
from keras.layers import Input, Conv2D, Flatten, Dense
from keras.optimizers import SGD

print(keras.__version__)

def get_keras_model():
    input_image = Input(shape=(1, 28, 28))
    conv = Conv2D(1, 3, padding='same', data_format='channels_first', activation='relu')(input_image)
    flatten = Flatten()(conv)
    dense = Dense(10, activation='softmax')(flatten)
    model = Model(inputs=input_image, outputs=dense)
    model.get_weights()

    sgd = SGD(lr=0.01, momentum=0.9, nesterov=True)
    model.compile(loss='categorical_crossentropy',
                  optimizer=sgd,
                  metrics=['accuracy'])

    history = model.fit(X_train, Y_train, validation_split=0.25, 
                        batch_size=32, nb_epoch=2, verbose=1)
    return model

2.2.4


In [12]:
keras_model = get_keras_model()

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use tf.cast instead.




Train on 45000 samples, validate on 15000 samples
Epoch 1/2
Epoch 2/2


In [13]:
for weights in keras_model.get_weights():
    print(weights.shape)

(3, 3, 1, 1)
(1,)
(784, 10)
(10,)


In [14]:
def get_our_model(keras_model):
    W_conv, b_conv, W_dense, b_dense = keras_model.get_weights()
    conv = Conv2DLayer(3, 1, 1, padding='same', K_init=W_conv, b_init=b_conv)
    relu = ReLU()
    flatten = FlattenLayer()
    dense = DenseLayer(784, 10, W_init=W_dense, b_init=b_dense)
    softmax = Softmax()
    return Network([conv, relu, flatten, dense, softmax])

In [15]:
our_model = get_our_model(keras_model)

In [16]:
keras_pred = keras_model.predict(X_test)
our_pred = our_model.predict(X_test)
if np.sum(np.abs(keras_pred - our_pred)) < 0.01:
    print('Test PASSED')
else:
    print('Something went wrong!')

Test PASSED


### 2. Вычисление производных по входу для слоёв нейронной сети

В данном задании запрещено использовать численные формулы для вычисления производных.

#### 2.1  Реализуйте метод forward для класса CrossEntropy
Напоминание: $$ crossentropy = L(p, y) =  - \sum\limits_i y_i log p_i, $$
где вектор $(p_1, ..., p_k) $ -  выход классификационного алгоритма, а $(y_1,..., y_k)$ - правильные метки класса в унарной кодировке (one-hot encoding)

In [17]:
class CrossEntropy(object):
    def __init__(self):
        self.name = 'CrossEntropy'
    
    def forward(self, input_data, labels):
        return -np.sum(labels * np.log(input_data), axis=1)
    
    def grad_x(self, input_data, labels):
        return -labels / input_data

#### 2.2  Реализуйте метод grad_x класса CrossEntropy, который возвращает $\frac{\partial L}{\partial p}$

Проверить работоспособность кода поможет следующий тест:

In [18]:
def check_grad(source, target):
    if np.sum(np.abs(target - source)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Target grad is')
        print(target)
        print('Source grad is')
        print(source)

In [19]:
def numerical_diff_loss(loss, x, labels):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (loss.forward(x + delta, labels) - loss.forward(x - delta, labels)) / (2 * eps)
        right_answer.append(diff)
    
    return np.array(right_answer).T

def test_loss(loss, x, labels):    
    num_grad = numerical_diff_loss(loss, x, labels)
    grad = loss.grad_x(x, labels)
    check_grad(grad, num_grad)

In [20]:
loss = CrossEntropy()
x = np.array([[0.3, 0.2, 0.5], [0.3, 0.2, 0.5]])
labels = np.array([[1, 2, 3], [2, 3, 4]])
test_loss(loss, x, labels)

Test PASSED


#### 2.3  Реализуйте метод grad_x класса Softmax, который возвращает $\frac{\partial Softmax}{\partial x}$

Проверить работоспособность кода поможет следующий тест:

In [21]:
def numerical_diff_layer(layer, x):
    eps = 0.00001
    right_answer = []
    size, shape = x[0].size, x[0].shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        diff = (layer.forward(x + delta) - layer.forward(x - delta)) / (2 * eps)
        right_answer.append(diff.reshape(len(x), -1).T)
    
    return np.array(right_answer).T

def test_layer(layer, x):
    num_grad = numerical_diff_layer(layer, x)
    grad = layer.grad_x(x)
    check_grad(grad, num_grad)

In [22]:
layer = Softmax()
x = np.array([[1, 2, 3], [2, -3, 4]])
test_layer(layer, x)

Test PASSED


#### 2.4  Реализуйте метод grad_x для классов ReLU и DenseLayer

In [23]:
layer = ReLU()
x = np.array([[1, 2, 3], [2, -3, 4]])
test_layer(layer, x)

Test PASSED


In [24]:
layer = DenseLayer(3, 4)
x = np.array([[1, 2, 3], [2, -3, 4]])
test_layer(layer, x)

Test PASSED


#### Conv2D test

In [25]:
b, c_in, h_in, w_in, c_out = 5, 5, 5, 5, 5
k, s = 3, 2
layer = Conv2DLayer(k, c_in, c_out, padding='same', stride=s)
x = np.arange(b * c_in * h_in * w_in).reshape(b, c_in, h_in, w_in)
test_layer(layer, x)

Test PASSED


#### Conv2DTr test

In [26]:
b, c_in, h_in, w_in, c_out = 5, 5, 5, 5, 5
k, p, s = 3, 1, 2
layer = Conv2DTrLayer(k, c_in, c_out, padding=p, stride=s)
x = np.arange(b * c_in * h_in * w_in).reshape(b, c_in, h_in, w_in)
test_layer(layer, x)

Test PASSED


#### 2.5 (4 балла) Для класса Network реализуйте метод grad_x, который должен реализовывать взятие производной от лосса по входу

In [27]:
def numerical_diff_net(net, x, labels):
    eps = 0.00001
    right_answer = []
    size, shape = x[0].size, x[0].shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        diff = (net.calculate_loss(x + delta, labels) - net.calculate_loss(x - delta, labels)) / (2 * eps)
        right_answer.append(diff)
    
    return np.array(right_answer).T

def test_net(net, x, labels):
    num_grad = numerical_diff_net(net, x, labels)
    grad = net.grad_x(x, labels)
    check_grad(grad, num_grad)

In [28]:
b, c_in, h_in, w_in, c_out = 2, 5, 5, 5, 5
k, s = 3, 2
net = Network([Conv2DLayer(k, c_in, c_out, padding='same', stride=s), ReLU(), 
               FlattenLayer(), DenseLayer(45, 3), Softmax()], loss=CrossEntropy())
x = np.arange(b * c_in * h_in * w_in).reshape(b, c_in, h_in, w_in)
labels = np.array([[1, 2, 3], [2, 3, 4]])
test_net(net, x, labels)

Test PASSED


### 3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

#### 3.1  Реализуйте функции grad_b и grad_W. При подготовке теста grad_W предполагается, что W является отномерным вектором.

In [29]:
def numerical_grad_b(input_size, output_size, W_init, b_init, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(b_init)):
        delta = np.zeros(b_init.shape)
        delta[i] = eps
        dense1 = DenseLayer(input_size, output_size, W_init, b_init+delta)
        dense2 = DenseLayer(input_size, output_size, W_init, b_init-delta)
        diff = (dense1.forward(x) - dense2.forward(x)) / (2 * eps)
        right_answer.append(diff.T)
    
    return np.array(right_answer).T

def test_grad_b():
    b, input_size, output_size = 2, 3, 4 
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random(output_size)
    x = np.random.random((b, input_size))

    num_grad = numerical_grad_b(input_size, output_size, W_init, b_init, x)
    layer = DenseLayer(input_size, output_size, W_init, b_init)
    grad = layer.grad_b(x)
    check_grad(grad, num_grad)

In [30]:
test_grad_b()

Test PASSED


In [31]:
def numerical_grad_W(input_size, output_size, W_init, b_init, x):
    eps = 0.00001
    right_answer = []
    size, shape = W_init.size, W_init.shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        dense1 = DenseLayer(input_size, output_size, W_init+delta, b_init)
        dense2 = DenseLayer(input_size, output_size, W_init-delta, b_init)
        diff = (dense1.forward(x) - dense2.forward(x)) / (2 * eps)
        right_answer.append(diff.T)
    
    return np.array(right_answer).T

def test_grad_W():
    b, input_size, output_size = 2, 3, 4
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random(output_size)
    x = np.random.random((b, input_size))
    
    num_grad = numerical_grad_W(input_size, output_size, W_init, b_init, x)
    layer = DenseLayer(input_size, output_size, W_init, b_init)
    grad = layer.grad_W(x)
    check_grad(grad, num_grad)

In [32]:
test_grad_W()

Test PASSED


#### Conv2D grad_kernel test

In [33]:
def numerical_grad_kernel_conv2d(kernel_size, input_channels, output_channels, 
                                 padding, stride, K_init, b_init, x):
    eps = 0.00001
    right_answer = []
    size, shape = K_init.size, K_init.shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        conv1 = Conv2DLayer(kernel_size, input_channels, output_channels, 
                            padding, stride, K_init+delta, b_init)
        conv2 = Conv2DLayer(kernel_size, input_channels, output_channels, 
                            padding, stride, K_init-delta, b_init)
        diff = (conv1.forward(x) - conv2.forward(x)) / (2 * eps)
        right_answer.append(diff.reshape(len(x), -1).T)
            
    return np.array(right_answer).T

def test_grad_kernel_conv2d():
    b, c_in, h_in, w_in, c_out = 5, 5, 5, 5, 5
    k, p, s = 3, 'same', 2
    K_init = np.random.random((k, k, c_in, c_out))
    b_init = np.random.random(c_out)
    x = np.random.random((b, c_in, h_in, w_in))
    
    num_grad = numerical_grad_kernel_conv2d(k, c_in, c_out, p, s, K_init, b_init, x)
    layer = Conv2DLayer(k, c_in, c_out, 'same', s, K_init, b_init)
    grad = layer.grad_kernel(x)
    check_grad(grad, num_grad)

In [34]:
test_grad_kernel_conv2d()

Test PASSED


#### Conv2DTr grad_kernel test

In [35]:
def numerical_grad_kernel_conv2dtr(kernel_size, input_channels, output_channels, 
                                   padding, stride, K_init, b_init, x):
    eps = 0.00001
    right_answer = []
    size, shape = K_init.size, K_init.shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        conv1 = Conv2DTrLayer(kernel_size, input_channels, output_channels, 
                              padding, stride, K_init+delta, b_init)
        conv2 = Conv2DTrLayer(kernel_size, input_channels, output_channels, 
                              padding, stride, K_init-delta, b_init)
        diff = (conv1.forward(x) - conv2.forward(x)) / (2 * eps)
        right_answer.append(diff.reshape(len(x), -1).T)
            
    return np.array(right_answer).T

def test_grad_kernel_conv2dtr():
    b, c_in, h_in, w_in, c_out = 5, 5, 5, 5, 5
    k, p, s = 3, 1, 2
    K_init = np.random.random((k, k, c_in, c_out))
    b_init = np.random.random(c_out)
    x = np.random.random((b, c_in, h_in, w_in))
    
    num_grad = numerical_grad_kernel_conv2dtr(k, c_in, c_out, p, s, K_init, b_init, x)
    layer = Conv2DTrLayer(k, c_in, c_out, p, s, K_init, b_init)
    grad = layer.grad_kernel(x)
    check_grad(grad, num_grad)

In [36]:
test_grad_kernel_conv2dtr()

Test PASSED


#### 3.2 Полностью реализуйте метод обратного распространения ошибки в функции train_step класса Network


Рекомендуем реализовать сначала функцию Network.grad_param(), которая возвращает список длиной в количество слоёв и элементом которого является список градиентов по параметрам.
После чего, имея список градиентов, написать функцию обновления параметров для каждого слоя. 

Совет: рекомендуем написать тест для кода подсчета градиента по параметрам, чтобы быть уверенным в том, что градиент через всю сеть считается правильно
    

In [37]:
def numerical_grad_param(net, x, labels):
    eps = 0.00001
    right_answer = []
    conv = net.layers[0]
    k, c_in, c_out, p, s, kernel, bias = \
        conv.kernel_size, conv.input_channels, conv.output_channels, \
        conv.padding, conv.stride, conv.kernel, conv.bias
    size, shape = conv.kernel.size, conv.kernel.shape
    for i in range(size):
        delta = np.zeros(size)
        delta[i] = eps
        delta = delta.reshape(shape)
        layer1 = Conv2DLayer(k, c_in, c_out, p, s, kernel+delta, bias)
        layer2 = Conv2DLayer(k, c_in, c_out, p, s, kernel-delta, bias)
        net1 = Network([layer1] + net.layers[1:], loss=net.loss)
        net2 = Network([layer2] + net.layers[1:], loss=net.loss)
        diff = (net1.calculate_loss(x, labels) - net2.calculate_loss(x, labels)) / (2 * eps)
        right_answer.append(diff)
    
    return np.array(right_answer).T

def test_grad_param():
    b, c_in, h_in, w_in, c_out = 2, 5, 5, 5, 5
    k, p, s = 3, 'same', 2
    net = Network([Conv2DLayer(k, c_in, c_out, p, s), ReLU(), 
                   FlattenLayer(), DenseLayer(45, 3), Softmax()], loss=CrossEntropy())
    x = np.random.random((b, c_in, h_in, w_in))
    labels = np.array([[1, 2, 3], [2, 3, 4]])

    num_grad = numerical_grad_param(net, x, labels)
    grad = net.grad_param(x, labels)[0][0]
    check_grad(grad, num_grad)

In [38]:
test_grad_param()

Test PASSED


#### 3.3 Ознакомьтесь с реализацией функции fit класса Network. Запустите обучение модели. Если всё работает правильно, то точность на валидации должна будет возрастать

In [39]:
np.random.seed(42)

net = Network([DenseLayer(784, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::3], Y_train[::3], validation_split=0.25, 
        batch_size=16, nb_epoch=5, learning_rate=0.01)

100%|██████████| 937/937 [00:08<00:00, 115.78it/s]


1 epoch: val 0.84


100%|██████████| 937/937 [00:08<00:00, 115.38it/s]


2 epoch: val 0.87


100%|██████████| 937/937 [00:08<00:00, 116.40it/s]


3 epoch: val 0.88


100%|██████████| 937/937 [00:08<00:00, 114.52it/s]


4 epoch: val 0.88


100%|██████████| 937/937 [00:08<00:00, 115.79it/s]


5 epoch: val 0.89


In [40]:
np.random.seed(42)

net = Network([DenseLayer(784, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
        batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 468/468 [00:15<00:00, 30.14it/s]


1 epoch: val 0.27


100%|██████████| 468/468 [00:15<00:00, 30.00it/s]


2 epoch: val 0.46


100%|██████████| 468/468 [00:15<00:00, 30.05it/s]


3 epoch: val 0.56


100%|██████████| 468/468 [00:15<00:00, 30.34it/s]


4 epoch: val 0.63


100%|██████████| 468/468 [00:15<00:00, 30.03it/s]

5 epoch: val 0.70





In [41]:
np.random.seed(42)

convs = [Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1), ReLU()]
denses = [DenseLayer(576, 10)]
net = Network(convs + [FlattenLayer()] + denses + [Softmax()], loss=CrossEntropy())
net.fit(X_train[::6], Y_train[::6], validation_split=0.25, 
        batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 468/468 [00:41<00:00, 11.41it/s]


1 epoch: val 0.15


100%|██████████| 468/468 [00:40<00:00, 11.67it/s]


2 epoch: val 0.29


100%|██████████| 468/468 [00:40<00:00, 11.50it/s]


3 epoch: val 0.49


100%|██████████| 468/468 [00:41<00:00, 11.37it/s]


4 epoch: val 0.62


100%|██████████| 468/468 [00:41<00:00, 11.32it/s]


5 epoch: val 0.70


#### 3.5 Продемонстрируйте, что ваша реализация позволяет обучать более глубокие нейронные сети 

In [42]:
np.random.seed(42)

convs = [Conv2DLayer(kernel_size=3, input_channels=1, output_channels=1, padding='same', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1), ReLU()]
denses = [DenseLayer(144, 10)]
net = Network(convs + [FlattenLayer()] + denses + [Softmax()], loss=CrossEntropy())
net.fit(X_train[::10], Y_train[::10], validation_split=0.25, 
        batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 281/281 [00:43<00:00,  6.42it/s]


1 epoch: val 0.16


100%|██████████| 281/281 [00:43<00:00,  6.48it/s]


2 epoch: val 0.25


100%|██████████| 281/281 [00:43<00:00,  6.43it/s]


3 epoch: val 0.35


100%|██████████| 281/281 [00:43<00:00,  6.52it/s]


4 epoch: val 0.46


100%|██████████| 281/281 [00:43<00:00,  6.49it/s]


5 epoch: val 0.56


In [48]:
np.random.seed(42)

convs = [Conv2DLayer(kernel_size=3, input_channels=1, output_channels=1, padding='same', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1),
         Conv2DTrLayer(kernel_size=3, input_channels=1, output_channels=1, padding=0, stride=2), 
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1), 
         Conv2DLayer(kernel_size=5, input_channels=1, output_channels=1, padding='valid', stride=1), ReLU()]
denses = [DenseLayer(81, 10)]
net = Network(convs + [FlattenLayer()] + denses + [Softmax()], loss=CrossEntropy())
net.fit(X_train[::10], Y_train[::10], validation_split=0.25, 
        batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 281/281 [00:46<00:00,  6.03it/s]


1 epoch: val 0.15


100%|██████████| 281/281 [00:46<00:00,  6.06it/s]


2 epoch: val 0.21


100%|██████████| 281/281 [00:46<00:00,  6.07it/s]


3 epoch: val 0.24


100%|██████████| 281/281 [00:46<00:00,  6.08it/s]


4 epoch: val 0.33


100%|██████████| 281/281 [00:46<00:00,  6.05it/s]


5 epoch: val 0.41
