# Практическое задание

## Данные о студенте

1. **ФИО**: Зуев Кирилл Петрович
2. **Факультет**: Факультет Космических Исследований
3. **Курс**: 005
4. **Группа**: 502

## Замечания

* Заполненный ноутбук необходимо сдать боту
* Соблюдаем кодекс чести (по нулям и списавшему, и давшему списать)
* Можно (и нужно!) применять для реализации только библиотеку **Numpy**
* Ничего, крому Numpy, нельзя использовать для реализации
* **Keras** используется только для тестирования Вашей реализации
* Если какой-то из классов не проходит приведенные тесты, то соответствующее задание не оценивается
* Возможно использование дополнительных (приватных) тестов


## Реализация собственного нейросетевого пакета для запуска и обучения нейронных сетей

Задание состоит из трёх частей:
1. Реализация прямого вывода нейронной сети (5 баллов)
2. Реализация градиентов по входу и распространения градиента по сети (5 баллов)
3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети (10 баллов)

Дополнительные баллы можно получить при реализации обучения сети со свёрточными слоями (10 баллов), с транспонированной свёрткой (10 баллов), дополнительного оптимизатора (5 баллов).

###  1. Реализация вывода собственной нейронной сети

1.1 Внимательно ознакомьтесь с интерфейсом слоя. Любой слой должен содержать как минимум три метода:
- конструктор
- прямой вывод
- обратный вывод, производные по входу и по параметрам

In [69]:
class Layer(object):
    def __init__(self):
        self.name = 'Layer'
    def forward(self, input_data):
        pass
    def backward(self, input_data):
        return [self.grad_x(input_data), self.grad_param(input_data)]

    def grad_x(self, input_data):
        pass
    def grad_param(self, input_data):
        return []

    def update_param(self, grads, learning_rate):
        pass


1.2 Ниже предствален интерфейс класса  Network. Обратите внимание на реализацию метода predict, который последовательно обрабатывает входные данные слой за слоем.

In [70]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class Network(object):
    def __init__(self, layers, loss=None):
        self.name = 'Network'
        self.layers = layers
        self.loss = loss

    def forward(self, input_data):
        return self.predict(input_data)

    def grad_x(self, input_data, labels):
        input_mas = []
        current_input = input_data
        for layer in self.layers:
            input_mas.append(layer.grad_x(current_input))
            current_input = layer.forward(current_input)
        L_y = self.loss.grad_x(current_input, labels)
        batch_size = L_y.shape[0]
        for layer in reversed(input_mas):
            grad = np.array([np.dot(L_y[b], layer[b]) for b in range(batch_size)])
            L_y = grad.copy()
        return L_y

    def grad_param(self, input_data, labels):
        input_mas = []
        current_input = input_data
        for layer in self.layers:
            input_mas.append(layer.backward(current_input))
            current_input = layer.forward(current_input)
        L_y = self.loss.grad_x(current_input, labels)
        batch_size = L_y.shape[0]
        grad_list =  []
        for layer_x, layer_param in reversed(input_mas):
            grad0 = []
            for grad_param in layer_param:
                grad0.append(np.array([np.dot(L_y[b], grad_param[b]) for b in range(batch_size)]))
            grad_list.append(grad0)
            grad = np.array([np.dot(L_y[b], layer_x[b]) for b in range(batch_size)])
            L_y = grad.copy()

        return grad_list[::-1]

    def update(self, grad_list, learning_rate):
        for layer, grad in zip(self.layers, grad_list):
            layer.update_param(grad, learning_rate)

    def predict(self, input_data):
        current_input = input_data
        self.input_mas = []
        self.input_mas.append(current_input)
        for layer in self.layers:
            current_input = layer.forward(current_input)
            self.input_mas.append(current_input)
        return current_input

    def calculate_loss(self, input_data, labels):
        return self.loss.forward(self.predict(input_data), labels)

    def train_step(self, input_data, labels, learning_rate=0.001):
        grad_list = self.grad_param(input_data, labels)
        self.update(grad_list, learning_rate)


    def fit(self, trainX, trainY, validation_split=0.25,
            batch_size=1, nb_epoch=1, learning_rate=0.01):

        train_x, val_x, train_y, val_y = train_test_split(trainX, trainY,
                                                          test_size=validation_split,
                                                          random_state=42)
        for epoch in range(nb_epoch):
            #train one epoch
            for i in tqdm(range(int(len(train_x)/batch_size))):
                batch_x = train_x[i*batch_size: (i+1)*batch_size]
                batch_y = train_y[i*batch_size: (i+1)*batch_size]
                self.train_step(batch_x, batch_y, learning_rate)
            #validate
            val_accuracy = self.evaluate(val_x, val_y)
            print('%d epoch: val %.2f' %(epoch+1, val_accuracy))

    def evaluate(self, testX, testY):
        y_pred = np.argmax(self.predict(testX), axis=1)
        y_true = np.argmax(testY, axis=1)
        val_accuracy = np.sum((y_pred == y_true))/(len(y_true))
        return val_accuracy

#### 1.1 Необходимо реализовать метод forward для вычисления следующих слоёв:

- DenseLayer
- ReLU
- Softmax
- FlattenLayer
- MaxPooling

In [71]:
#импорты
import numpy as np

In [72]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, W_init=None, b_init=None):
        self.name = 'Dense'
        self.input_dim = input_dim
        self.output_dim = output_dim
        if W_init is None or b_init is None:
            self.W = np.random.normal(0, np.sqrt(2/self.input_dim), (self.input_dim, self.output_dim))
            self.b = np.zeros(self.output_dim, 'float32')
        else:
            self.W = W_init
            self.b = b_init
    def forward(self, input_data):
        return np.dot(input_data, self.W) + self.b
    def grad_x(self, input_data):
        batch_size = input_data.shape[0]
        grad = []
        for i in range(batch_size):
            grad.append(self.W.T)
        return np.array(grad)
    def grad_b(self, input_data):
        bsize = input_data.shape[0]
        gradb = np.array([np.eye(self.output_dim, dtype=np.float32) for _ in range(bsize)])
        return gradb
    def grad_W(self, input_data):
        bsize = input_data.shape[0]
        gradw = np.zeros((bsize, self.output_dim, self.input_dim*self.output_dim), dtype=np.float32)
        for i in range(bsize):
          for j in range(self.output_dim):
            gradw[i, j, j::self.output_dim] = input_data[i]
        return np.array(gradw)

    def update_W(self, grad, learning_rate):
        self.W -= learning_rate * np.mean(grad, axis=0).reshape(self.W.shape)

    def update_b(self, grad,  learning_rate):
        self.b -= learning_rate * np.mean(grad, axis=0)

    def update_param(self, params_grad, learning_rate):
        self.update_W(params_grad[0], learning_rate)
        self.update_b(params_grad[1], learning_rate)

    def grad_param(self, input_data):
        return [self.grad_W(input_data), self.grad_b(input_data)]


class ReLU(Layer):
    def __init__(self):
        self.name = 'ReLU'
    def forward(self, input_data):
        return np.maximum(0, input_data)
    def grad_x(self, input_data):
        batch_size, num = input_data.shape
        grad = np.zeros((batch_size, num, num))
        for i in range(batch_size):
            grad[i] = np.diag((input_data[i] > 0).astype(float))
        return grad


class Softmax(Layer):
    def __init__(self):
        self.name = 'Softmax'
    def forward(self, input_data):
        exps = np.exp(input_data - np.max(input_data, axis=-1, keepdims=True))
        output = exps / np.sum(exps, axis=-1, keepdims=True)
        return output
    def grad_x(self, input_data):
        softmax_out = self.forward(input_data)
        batch_size, num = softmax_out.shape
        grad = np.zeros((batch_size, num, num))
        for i in range(batch_size):
            s = softmax_out[i].reshape(-1, 1)
            grad[i] = np.diagflat(s) - np.dot(s, s.T)
        return grad


class FlattenLayer(Layer):
    def __init__(self):
        self.name = 'Flatten'

    def forward(self, input_data):
        return input_data.reshape(input_data.shape[0], -1)
    def grad_x(self, input_data):
        batch = input_data.shape[0]
        size = 1
        for i in input_data.shape:
            size *= i
        size = int(size//batch)
        batch_eye = np.array([np.eye(size) for _ in range(batch)])
        return batch_eye


class MaxPooling(Layer):
    def __init__(self, pool_size=2, stride=2):
        self.name = 'MaxPooling'
        self.pool_size = pool_size
        self.stride = stride

    def forward(self, input_data):
        batch_size, channels, height, width = input_data.shape
        out_height = (height - self.pool_size) // self.stride + 1
        out_width = (width - self.pool_size) // self.stride + 1

        output = np.zeros((batch_size, channels, out_height, out_width))
        self.max_indices = np.zeros_like(input_data, dtype=int)

        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_height):
                    for j in range(out_width):
                        h_start, h_end = i * self.stride, i * self.stride + self.pool_size
                        w_start, w_end = j * self.stride, j * self.stride + self.pool_size
                        window = input_data[b, c, h_start:h_end, w_start:w_end]
                        output[b, c, i, j] = np.max(window)
        return output

    def grad_x(self, input_data):
        batch_size, channels, height, width = input_data.shape
        out_height = (height - self.pool_size) // self.stride + 1
        out_width = (width - self.pool_size) // self.stride + 1
        max_indices = np.zeros_like(input_data, dtype=int)
        jacobian = np.zeros((batch_size, channels*out_height*out_width, channels*height*width))

        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_height):
                    for j in range(out_width):
                        h_start, h_end = i * self.stride, i * self.stride + self.pool_size
                        w_start, w_end = j * self.stride, j * self.stride + self.pool_size
                        window = input_data[b, c, h_start:h_end, w_start:w_end]
                        max_pos = np.unravel_index(np.argmax(window), window.shape)
                        max_indices[b, c, h_start + max_pos[0], w_start + max_pos[1]] = 1
                        max_h, max_w = max_pos
                        input_idx = c * height * width + (h_start + max_h) * width + (w_start + max_w)
                        output_idx = c * out_height * out_width + i * out_width + j
                        jacobian[b, output_idx, input_idx] = 1
        return jacobian

Тест для MaxPooling

In [73]:
def numerical_diff_layer(layer, x):
    eps = 0.00001
    right_answer = []
    for i in range(x[0].size):
        delta = np.zeros(x[0].size)
        delta[i] = eps
        delta = delta.reshape(x[0].shape)
        diff = (layer.forward(x + delta) - layer.forward(x - delta)) / (2 * eps)
        right_answer.append(diff.reshape(x.shape[0], -1).T)
    return np.array(right_answer).T

def test_layer(layer):
    x = np.arange(625).reshape(5, 5, 5, 5)
    num_grad = numerical_diff_layer(layer, x)
    grad = layer.grad_x(x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

layer = MaxPooling()
test_layer(layer)

Test PASSED


#### 1.2 Реализуйте теперь свёрточный слой и транспонированную свёртку  (опционально)

In [74]:
class Conv2DLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3,
                 padding='same', stride=1, K_init=None, b_init=None):
        # padding: 'same' или 'valid'
        # Работаем с квадратными ядрами, поэтому kernel_size - одно число
        # Работаем с единообразным сдвигом, поэтому stride - одно число
        # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
        self.name = 'Conv2D'
        self.kernel_size = kernel_size
        self.input_channels = input_channels
        self.output_channels = output_channels
        if K_init is None or b_init is None:
            self.kernel = np.random.normal(0, np.sqrt(2/self.input_channels), (self.kernel_size, self.kernel_size, self.input_channels, self.output_channels))
            self.bias = np.zeros(self.output_channels, 'float32')
        else:
            self.kernel = K_init
            self.bias = b_init
        self.padding = padding
        self.stride = stride

    def forward(self, input_data):
        # На входе - четырехмерный тензор вида [batch, input_channels, height, width]
        # Вначале нужно проверить на согласование размерностей входных данных и ядра!
        # Нужно заполнить Numpy-тензор out
        if input_data.shape[2] < self.kernel_size or input_data.shape[3] < self.kernel_size:
            raise ValueError("h or w < kernel_size")
        if self.padding == 'same':
            pad = (self.kernel_size - 1) // 2
        else:
            pad = 0
        if self.padding == 'same':
            input_data = np.pad(input_data, ((0,0), (0,0), (pad,pad), (pad,pad)), mode='constant')
        batch, c_in, h_in, w_in = input_data.shape
        H_out = 1 + (h_in - self.kernel_size) // self.stride
        W_out = 1 + (w_in - self.kernel_size) // self.stride
        out = np.zeros((batch, self.output_channels, H_out, W_out))
        for b in range(batch):
            for i in range(H_out):
                for j in range(W_out):
                    region = input_data[b, :, i * self.stride:i * self.stride + self.kernel_size, j * self.stride:j * self.stride + self.kernel_size]
                    out[b, :, i, j] = np.tensordot(region, self.kernel, axes=([1, 2, 0], [0, 1, 2])) + self.bias
        return out

    def grad_x(self, input_data):
        k, s = self.kernel_size, self.stride
        if self.padding == 'same':
            pad = (self.kernel_size - 1) // 2
        else:
            pad = 0
        if self.padding == 'same':
            input_data = np.pad(input_data, ((0,0), (0,0), (pad,pad), (pad,pad)), mode='constant')
        batch_size, c_in, h_in, w_in = input_data.shape
        h_out = 1 + (h_in - self.kernel_size) // self.stride
        w_out = 1 + (w_in - self.kernel_size) // self.stride
        c_out = self.output_channels

        indh_in = self.get_indin(k, s, h_out)
        indh_in = indh_in.reshape(h_out, 1, k, 1)
        indw_in = self.get_indin(k, s, w_out)
        indw_in = indw_in.reshape(1, w_out, 1, k)
        indh_out = np.zeros((h_out, 1, 1, 1), dtype=int)
        indw_out = np.zeros((1, w_out, 1, 1), dtype=int)
        for i in range(h_out):
            indh_out[i, 0, 0, 0] = i
        for j in range(w_out):
            indw_out[0, j, 0, 0] = j

        grad = np.zeros((batch_size, h_out, w_out, h_in, w_in, c_in, c_out))
        grad[:, indh_out, indw_out, indh_in, indw_in, :, :] = self.kernel

        if self.padding == 'same':
            h_in -= 2*pad
            w_in -= 2*pad
        grad = grad[:, :, :, pad:h_in+pad, pad:w_in+pad, :, :]

        grad = grad.transpose(0, 6, 1, 2, 5, 3, 4).reshape(batch_size, c_out * h_out * w_out, c_in * h_in * w_in)
        return grad

    def grad_kernel(self, input_data):
        k, s = self.kernel_size, self.stride
        if self.padding == 'same':
            pad = (self.kernel_size - 1) // 2
        else:
            pad = 0
        if self.padding == 'same':
            input_data = np.pad(input_data, ((0,0), (0,0), (pad,pad), (pad,pad)), mode='constant')
        batch_size, c_in, h_in, w_in = input_data.shape
        h_out = 1 + (h_in - self.kernel_size) // self.stride
        w_out = 1 + (w_in - self.kernel_size) // self.stride
        c_out = self.output_channels

        indh_in = self.get_indin(k, s, h_out)
        h = indh_in.reshape(h_out, 1, k, 1).repeat(w_out, axis=1)
        indw_in = self.get_indin(k, s, w_out)
        w = indw_in.reshape(1, w_out, 1, k).repeat(h_out, axis=0)
        piece = input_data[:, :, h, w].transpose(0, 2, 3, 4, 5, 1)

        grad = np.tile(np.eye(c_out), reps=(batch_size, h_out, w_out, k, k, c_in, 1, 1))
        grad[:, :, :, :, :, :, -1] = np.expand_dims(piece, axis=-1)

        grad = grad.transpose(0, 6, 1, 2, 3, 4, 5, 7)
        grad = grad.reshape(batch_size, c_out * h_out * w_out, k * k * c_in * c_out)

        return grad

    def get_indin(self, k, s, i_out):
        ind_in = []
        for i in range(i_out):
            base_index = s * i
            row = []
            for j in range(k):
                row.append(base_index + j)
            ind_in.append(row)
        return np.array(ind_in)

    def grad_param(self, input_data):
        return [self.grad_kernel(input_data)]

    def update_param(self, grads, learning_rate):
        self.kernel -= learning_rate * np.mean(grads[0], axis=0).reshape(self.kernel.shape)

Тест для Conv2DLayer

In [75]:
layer = Conv2DLayer(4, 5, 5, 'same', 1)
test_layer(layer)

Test PASSED


In [76]:
class Conv2DTrLayer(Layer):
    def __init__(self, kernel_size=3, input_channels=2, output_channels=3,
                 padding=0, stride=1, K_init=None, b_init=None):
        # padding: число (сколько отрезать от модифицированной входной карты)
        # Работаем с квадратными ядрами, поэтому kernel_size - одно число
        # stride - одно число (коэффициент расширения)
        # Фильтр размерности [kernel_size, kernel_size, input_channels, output_channels]
        self.name = 'Conv2DTr'
        self.kernel_size = kernel_size
        self.input_channels = input_channels
        self.output_channels = output_channels
        if K_init is None or b_init is None:
            self.kernel = np.random.normal(0, np.sqrt(2/self.input_channels), (self.kernel_size, self.kernel_size, self.input_channels, self.output_channels))
            self.bias = np.zeros(self.output_channels, 'float32')
        else:
            self.kernel = K_init
            self.bias = b_init
        self.padding = padding
        self.stride = stride

    def forward(self, input_data):
        # На входе - четырехмерный тензор вида [batch, input_channels, height, width]
        # Вначале нужно проверить на согласование размерностей входных данных и ядра!
        # Нужно заполнить Numpy-тензор out
        c_in = self.input_channels
        c_out = self.output_channels
        k = self.kernel_size
        s = self.stride
        p = self.padding
        if input_data.shape[2] < k or input_data.shape[3] < k:
            raise ValueError("h or w < kernel_size")
        batch_size, _ , H_in, W_in = input_data.shape
        H_new = H_in + (H_in - 1) * (s - 1) + 2 * (k - p - 1)
        W_new = W_in + (W_in - 1) * (s - 1) + 2 * (k - p - 1)

        data = np.zeros((batch_size, c_in, H_new, W_new))
        h = (k - p - 1) + s * np.arange(H_in).reshape(H_in, 1)
        w = (k - p - 1) + s * np.arange(W_in).reshape(1, W_in)
        data[:, :, h, w] = input_data
        input_data = data
        batch_size, _ , H_in, W_in = input_data.shape
        H_out, W_out = H_in - k + 1, W_in - k + 1

        indh_in = self.get_indin(k, s, H_out)
        h = indh_in.reshape(H_out, 1, k, 1).repeat(W_out, axis=1)
        indw_in = self.get_indin(k, s, W_out)
        w = indw_in.reshape(1, W_out, 1, k).repeat(H_out, axis=0)
        piece = input_data[:, :, h, w].transpose(0, 2, 3, 4, 5, 1)
        output = (np.tensordot(piece, self.kernel, axes=([3, 4, 5], [0, 1, 2])) + self.bias).transpose(0, 3, 1, 2)
        return output

    def grad_x(self, input_data):
        c_in = self.input_channels
        c_out = self.output_channels
        k = self.kernel_size
        s = self.stride
        p = self.padding
        if input_data.shape[2] < k or input_data.shape[3] < k:
            raise ValueError("h or w < kernel_size")
        batch_size, _ , H_in, W_in = input_data.shape
        H_new = H_in + (H_in - 1) * (s - 1) + 2 * (k - p - 1)
        W_new = W_in + (W_in - 1) * (s - 1) + 2 * (k - p - 1)

        data = np.zeros((batch_size, c_in, H_new, W_new))
        h = (k - p - 1) + s * np.arange(H_in).reshape(H_in, 1)
        w = (k - p - 1) + s * np.arange(W_in).reshape(1, W_in)
        data[:, :, h, w] = input_data
        input_data = data
        batch_size, _ , H_in, W_in = input_data.shape
        H_out, W_out = H_in - k + 1, W_in - k + 1

        indh_in = self.get_indin(k, s, H_out)
        h = indh_in.reshape(H_out, 1, k, 1)
        indw_in = self.get_indin(k, s, W_out)
        w = indw_in.reshape(1, W_out, 1, k)

        indh_out = np.zeros((H_out, 1, 1, 1), dtype=int)
        indw_out = np.zeros((1, W_out, 1, 1), dtype=int)
        for i in range(H_out):
            indh_out[i, 0, 0, 0] = i
        for j in range(W_out):
            indw_out[0, j, 0, 0] = j

        grad = np.zeros((batch_size, H_out, W_out, H_in, W_in, c_in, c_out))
        grad[:, indh_out, indw_out, h, w, :, :] = self.kernel

        grad = grad.transpose(0, 6, 1, 2, 5, 3, 4)
        h_new = (H_in-2*(k - p - 1)-1)//s+1
        w_new = (W_in-2*(k - p - 1)-1)//s+1

        indh_out = np.zeros((h_new, 1), dtype=int)
        indw_out = np.zeros((1, w_new), dtype=int)
        for i in range(h_new):
            indh_out[i, 0] = i
        for j in range(w_new):
            indw_out[0, j] = j
        indh_out = (k - p - 1) + s * indh_out
        indw_out = (k - p - 1) + s * indw_out
        grad = grad[:, :, :, :, :, indh_out, indw_out]
        grad = grad.reshape(batch_size, c_out * H_out * W_out, -1)
        return grad

    def grad_kernel(self, input_data):
        c_in = self.input_channels
        c_out = self.output_channels
        k = self.kernel_size
        s = self.stride
        p = self.padding
        if input_data.shape[2] < k or input_data.shape[3] < k:
            raise ValueError("h or w < kernel_size")
        batch_size, _ , H_in, W_in = input_data.shape
        H_new = H_in + (H_in - 1) * (s - 1) + 2 * (k - p - 1)
        W_new = W_in + (W_in - 1) * (s - 1) + 2 * (k - p - 1)

        data = np.zeros((batch_size, c_in, H_new, W_new))
        h = (k - p - 1) + s * np.arange(H_in).reshape(H_in, 1)
        w = (k - p - 1) + s * np.arange(W_in).reshape(1, W_in)
        data[:, :, h, w] = input_data
        input_data = data
        batch_size, _ , H_in, W_in = input_data.shape
        H_out, W_out = H_in - k + 1, W_in - k + 1

        indh_in = self.get_indin(k, s, H_out)
        h = indh_in.reshape(H_out, 1, k, 1).repeat(W_out, axis=1)
        indw_in = self.get_indin(k, s, W_out)
        w = indw_in.reshape(1, W_out, 1, k).repeat(H_out, axis=0)
        piece = input_data[:, :, h, w].transpose(0, 2, 3, 4, 5, 1)

        grad = np.tile(np.eye(c_out), reps=(batch_size, H_out, W_out, k, k, c_in, 1, 1))
        grad[:, :, :, :, :, :, -1] = np.expand_dims(piece, axis=-1)

        grad = grad.transpose(0, 6, 1, 2, 3, 4, 5, 7)
        grad = grad.reshape(batch_size, c_out * H_out * W_out, k * k * c_in * c_out)

        return grad

    def get_indin(self, k, s, i_out):
        ind_in = []
        for i in range(i_out):
            base_index = s * i
            row = []
            for j in range(k):
                row.append(base_index + j)
            ind_in.append(row)
        return np.array(ind_in)

    def grad_param(self, input_data):
        return [self.grad_kernel(input_data)]

    def update_param(self, grads, learning_rate):
        self.kernel -= learning_rate * np.mean(grads[0], axis=0).reshape(self.kernel.shape)

Тест для Conv2DTrLayer

In [77]:
layer = Conv2DTrLayer(4, 5, 5, 1, 1)
test_layer(layer)

Test PASSED


#### 1.4 Теперь настало время теста.
#### Если вы всё сделали правильно, то запустив следующие ячейки у вас должна появиться надпись: Test PASSED

Переходить к дальнейшим заданиям не имеем никакого смысла, пока вы не добьётесь прохождение теста
    

#### Чтение данных

In [78]:
import numpy as np
np.random.seed(123)  # for reproducibility
from keras.utils import to_categorical
from keras.datasets import mnist

(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.reshape(X_train.shape[0], 1, 28, 28)
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255


Y_train = to_categorical(y_train, 10)
Y_test = to_categorical(y_test, 10)
print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step
(60000, 1, 28, 28) (60000, 10) (10000, 1, 28, 28) (10000, 10)


#### Подготовка моделей

In [79]:
import keras
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten, Input
from keras.layers import Convolution2D, Conv2D, MaxPooling2D

print(keras.__version__)

def get_keras_model():
    input_image = Input(shape=(1, 28, 28))
    pool1 = MaxPooling2D(pool_size=(2,2), data_format='channels_first')(input_image)
    flatten = Flatten()(pool1)
    dense1 = Dense(10, activation='softmax')(flatten)
    model = Model(inputs=input_image, outputs=dense1)

    from keras.optimizers import Adam, SGD
    sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True)
    model.compile(loss='categorical_crossentropy',
                  optimizer=sgd,
                  metrics=['accuracy'])

    history = model.fit(X_train, Y_train, validation_split=0.25,
                        batch_size=32, epochs=2, verbose=1)
    return model

3.8.0


In [80]:
def get_our_model(keras_model):
    maxpool = MaxPooling()
    flatten = FlattenLayer()
    dense = DenseLayer(196, 10, W_init=keras_model.get_weights()[0],
                       b_init=keras_model.get_weights()[1])
    softmax = Softmax()
    net = Network([maxpool, flatten, dense, softmax])
    return net

In [81]:
keras_model = get_keras_model()
our_model = get_our_model(keras_model)

Epoch 1/2
[1m1407/1407[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 4ms/step - accuracy: 0.7659 - loss: 0.8546 - val_accuracy: 0.8910 - val_loss: 0.3825
Epoch 2/2
[1m1407/1407[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - accuracy: 0.8894 - loss: 0.3914 - val_accuracy: 0.9011 - val_loss: 0.3436


In [82]:
keras_prediction = keras_model.predict(X_test)
our_model_prediction = our_model.predict(X_test)

[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step


In [83]:
if np.sum(np.abs(keras_prediction - our_model_prediction)) < 0.01:
    print('Test PASSED')
else:
    print('Something went wrong!')

Test PASSED


### 2. Вычисление производных по входу для слоёв нейронной сети

В данном задании запрещено использовать численные формулы для вычисления производных.

#### 2.1  Реализуйте метод forward для класса CrossEntropy
Напоминание: $$ crossentropy = L(p, y) =  - \sum\limits_i y_i log p_i, $$
где вектор $(p_1, ..., p_k) $ -  выход классификационного алгоритма, а $(y_1,..., y_k)$ - правильные метки класса в унарной кодировке (one-hot encoding)

In [84]:
class CrossEntropy(object):
    def __init__(self, eps=0.00001):
        self.name = 'CrossEntropy'
        self.eps = eps

    def forward(self, input_data, labels):
        return -np.sum(labels*np.log(input_data+self.eps), axis=1)

    def calculate_loss(self,input_data, labels):
        return self.forward(input_data, labels)

    def grad_x(self, input_data, labels):
        return -labels / (input_data+self.eps)

#### 2.2  Реализуйте метод grad_x класса CrossEntropy, который возвращает $\frac{\partial L}{\partial p}$

Проверить работоспособность кода поможет следующий тест:

In [85]:
def numerical_diff_net(net, x, labels):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (net.calculate_loss(x + delta, labels) - net.calculate_loss(x-delta, labels)) / (2*eps)
        right_answer.append(diff)
    return np.array(right_answer).T

def test_net(net):
    x = np.array([[1, 2, 3], [2, 3, 4]])
    labels = np.array([[0.3, 0.2, 0.5], [0.3, 0.2, 0.5]])
    num_grad = numerical_diff_net(net, x, labels)
    grad = net.grad_x(x, labels)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

loss = CrossEntropy()
test_net(loss)

Test PASSED


#### 2.3  Реализуйте метод grad_x класса Softmax, который возвращает $\frac{\partial Softmax}{\partial x}$

Проверить работоспособность кода поможет следующий тест:

In [86]:
def numerical_diff_layer(layer, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (layer.forward(x + delta) - layer.forward(x-delta)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_layer(layer):
    x = np.array([[1, 2, 3], [2, -3, 4]])
    num_grad = numerical_diff_layer(layer, x)
    grad = layer.grad_x(x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

layer = Softmax()
test_layer(layer)

Test PASSED


#### 2.4  Реализуйте метод grad_x для классов ReLU и DenseLayer

In [87]:
layer = ReLU()
test_layer(layer)

Test PASSED


In [88]:
layer = DenseLayer(3,4)
test_layer(layer)

Test PASSED


#### 2.5 (4 балла) Для класса Network реализуйте метод grad_x, который должен реализовывать взятие производной от лосса по входу

In [89]:
net = Network([DenseLayer(3, 10), ReLU(), DenseLayer(10, 3), Softmax()], loss=CrossEntropy())
test_net(net)

Test PASSED


### 3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

#### 3.1  Реализуйте функции grad_b и grad_W. При подготовке теста grad_W предполагается, что W является отномерным вектором.

In [90]:
def numerical_grad_b(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(b)):
        delta = np.zeros(b.shape)
        delta[i] = eps
        dense1 = DenseLayer(input_size, output_size, W_init=W, b_init=b+delta)
        dense2 = DenseLayer(input_size, output_size, W_init=W, b_init=b-delta)
        diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_b():
    input_size = 3
    output_size = 4
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((output_size,))
    x = np.random.random((2, input_size))

    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_b(x)

    num_grad = numerical_grad_b(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_b()

Test PASSED


In [91]:
def numerical_grad_W(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(W.shape[0]):
        for j in range(W.shape[1]):
            delta = np.zeros(W.shape)
            delta[i, j] = eps
            dense1 = DenseLayer(input_size, output_size, W_init=W+delta, b_init=b)
            dense2 = DenseLayer(input_size, output_size, W_init=W-delta, b_init=b)
            diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
            right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_W():
    input_size = 3
    output_size = 4
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((4,))
    x = np.random.random((2, input_size))

    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_W(x)

    num_grad = numerical_grad_W(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_W()

Test PASSED


#### 3.2 Полностью реализуйте метод обратного распространения ошибки в функции train_step класса Network


Рекомендуем реализовать сначала функцию Network.grad_param(), которая возвращает список длиной в количество слоёв и элементом которого является список градиентов по параметрам.
После чего, имея список градиентов, написать функцию обновления параметров для каждого слоя.

Совет: рекомендуем написать тест для кода подсчета градиента по параметрам, чтобы быть уверенным в том, что градиент через всю сеть считается правильно
    

#### 3.3 Ознакомьтесь с реализацией функции fit класса Network. Запустите обучение модели. Если всё работает правильно, то точность на валидации должна будет возрастать

In [92]:
net = Network([DenseLayer(784, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::3], Y_train[::3], validation_split=0.25,
            batch_size=16, nb_epoch=5, learning_rate=0.01)

100%|██████████| 937/937 [00:05<00:00, 177.48it/s]


1 epoch: val 0.85


100%|██████████| 937/937 [00:04<00:00, 231.78it/s]


2 epoch: val 0.87


100%|██████████| 937/937 [00:04<00:00, 228.07it/s]


3 epoch: val 0.88


100%|██████████| 937/937 [00:05<00:00, 187.27it/s]


4 epoch: val 0.89


100%|██████████| 937/937 [00:04<00:00, 227.58it/s]

5 epoch: val 0.89





In [93]:
net = Network([DenseLayer(784, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25,
            batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 468/468 [00:09<00:00, 49.09it/s]


1 epoch: val 0.28


100%|██████████| 468/468 [00:09<00:00, 50.37it/s]


2 epoch: val 0.43


100%|██████████| 468/468 [00:09<00:00, 50.52it/s]


3 epoch: val 0.53


100%|██████████| 468/468 [00:09<00:00, 49.22it/s]


4 epoch: val 0.62


100%|██████████| 468/468 [00:09<00:00, 49.24it/s]

5 epoch: val 0.68





#### 3.5 Продемонстрируйте, что ваша реализация позволяет обучать более глубокие нейронные сети

Нейронная сеть состоит из слоев

1. Conv2DLayer
2. Conv2DLayer
3. Conv2DLayer
4. MaxPooling
5. Conv2DTrLayer
6. FlattenLayer
7. DenseLayer
8. ReLU
9. DenseLayer
10. Softmax

In [97]:
net = Network([
    Conv2DLayer(kernel_size=3, input_channels=1, output_channels=1, padding='same', stride=1),
    Conv2DLayer(kernel_size=3, input_channels=1, output_channels=1, padding='valid', stride=1),
    Conv2DLayer(kernel_size=3, input_channels=1, output_channels=1, padding='same', stride=1),
    MaxPooling(),
    Conv2DTrLayer(3, 1, 1, 0, 1),
    FlattenLayer(),
    DenseLayer(15 * 15, 100),
    ReLU(),
    DenseLayer(100, 10),
    Softmax()
], loss=CrossEntropy())

net.fit(X_train[::6], Y_train[::6], validation_split=0.25,
            batch_size=16, nb_epoch=5, learning_rate=0.001)

100%|██████████| 468/468 [08:24<00:00,  1.08s/it]


1 epoch: val 0.28


100%|██████████| 468/468 [08:27<00:00,  1.08s/it]


2 epoch: val 0.37


100%|██████████| 468/468 [08:27<00:00,  1.08s/it]


3 epoch: val 0.43


100%|██████████| 468/468 [08:23<00:00,  1.08s/it]


4 epoch: val 0.43


100%|██████████| 468/468 [08:18<00:00,  1.07s/it]


5 epoch: val 0.44
