# Практическое задание 2



## Замечания

* Задание необходимо сдать боту до 06.12.2021
* Соблюдаем кодекс чести (по нулям и списавшему, и давшему списать)
* Можно (и нужно!) применять для реализации только библиотеку **Numpy**
* Ничего, крому Numpy, нельзя использовать для реализации 
* **Keras** используется только для тестирования Вашей реализации
* Если какой-то из классов не проходит приведенные тесты, то соответствующее задание не оценивается
* Возможно использование дополнительных (приватных) тестов
 

## Реализация собственного нейросетевого пакета для запуска и обучения нейронных сетей

Задание состоит из трёх частей:
1. Реализация прямого вывода нейронной сети (первое практическое задание)
2. Реализация градиентов по входу и распространения градиента по сети (back propagation)
3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

###  1. Реализация вывода собственной нейронной сети

1.1 Внимательно ознакомьтесь с интерфейсом слоя. Любой слой должен содержать как минимум три метода:
- конструктор
- прямой вывод 
- обратный вывод, производные по входу и по параметрам

In [122]:
class Layer(object):
    def __init__(self):
        self.name = 'Layer'       
    def forward(self, input_data):
        pass
    def backward(self, input_data):
        return [self.grad_x(input_data), self.grad_param(input_data)]
    
    def grad_x(self, input_data):
        pass
    def grad_param(self, input_data):
        return []
    
    def update_param(self, grads, learning_rate):
        pass


1.2 Ниже предствален интерфейс класса  Network. Обратите внимание на реализацию метода predict, который последовательно обрабатывает входные данные слой за слоем.

In [123]:
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm import tqdm

class Network(object):
    def __init__(self, layers, loss=None):
        self.name = 'Network'
        self.layers = layers
        self.loss = loss
    
    def forward(self, input_data):
        return self.predict(input_data)

    def grad_x(self, input_data, labels):
        grads_x = []
        input = input_data
        for layer in self.layers:
            grads_x.append(layer.grad_x(input))
            input = layer.forward(input) 
        out = np.reshape(self.loss.grad_x(input, labels), (self.loss.grad_x(input, labels).shape[0], 1, self.loss.grad_x(input, labels).shape[1]))
        for out_layer in reversed(grads_x):
            out = np.matmul(out, out_layer)
        return np.reshape(out, (out.shape[0], out.shape[2]))

    def grad_param(self, input_data, labels):
        out = []
        input = input_data
        for layer in self.layers:
            out.append(layer.backward(input))
            input = layer.forward(input) 
        out.append([self.loss.grad_x(input, labels), []])
        return out

    def update(self, grad_list, learning_rate):
        grad_x = grad_list[-1][0]
        grad_x = np.reshape(grad_x, (grad_x.shape[0], 1, grad_x.shape[1]))
        for i in range(len(grad_list)-2, -1, -1):
            if len(grad_list[i][1]) != 0:
                self.layers[i].update_param([np.matmul(grad_x, grad_list[i][1][0]), np.matmul(grad_x, grad_list[i][1][1])], learning_rate)
            grad_x = np.matmul(grad_x, grad_list[i][0])
    
    
    def predict(self, input_data):
        current_input = input_data
        for layer in self.layers:
            current_input = layer.forward(current_input)     
        return current_input
    
    def calculate_loss(self, input_data, labels):
        return self.loss.forward(self.predict(input_data), labels)
    
    def train_step(self, input_data, labels, learning_rate=0.001):
        grad_list = self.grad_param(input_data, labels)
        self.update(grad_list, learning_rate)
    
    def fit(self, trainX, trainY, validation_split=0.25, 
            batch_size=1, nb_epoch=1, learning_rate=0.01):
        
        train_x, val_x, train_y, val_y = train_test_split(trainX, trainY, 
                                                          test_size=validation_split,
                                                          random_state=42)
        for epoch in range(nb_epoch):
            #train one epoch
            for i in tqdm(range(int(len(train_x)/batch_size))):
                batch_x = train_x[i*batch_size: (i+1)*batch_size]
                batch_y = train_y[i*batch_size: (i+1)*batch_size]
                self.train_step(batch_x, batch_y, learning_rate)
            #validate
            val_accuracy = self.evaluate(val_x, val_y)
            print("\n",'%d epoch: val %.2f' %(epoch+1, val_accuracy), "\n")
            
    def evaluate(self, testX, testY):
        y_pred = np.argmax(self.predict(testX), axis=1)            
        y_true = np.argmax(testY, axis=1)
        val_accuracy = np.sum((y_pred == y_true))/(len(y_true))
        return val_accuracy

#### 1.1 (6 баллов) Необходимо реализовать метод forward для вычисления следующих слоёв:

- DenseLayer
- ReLU
- Softmax
- FlattenLayer
- MaxPooling

In [124]:
#импорты
import numpy as np

In [125]:
class DenseLayer(Layer):
    def __init__(self, input_dim, output_dim, W_init=None, b_init=None):
        self.name = 'Dense'
        self.input_dim = input_dim
        self.output_dim = output_dim
        if W_init is None or b_init is None:
            self.W = np.random.normal(0, np.sqrt(2. / input_dim), size = (input_dim, output_dim))
            self.b = np.zeros(output_dim, 'float32')
        else:
            self.W = W_init
            self.b = b_init
    def forward(self, input_data):
        return np.matmul(input_data, self.W) + self.b
    def grad_x(self, input_data):
        out = np.zeros(shape = (input_data.shape[0], self.output_dim, self.input_dim))
        for o in out:
          for i in range(self.output_dim):
            for j in range(self.input_dim):
                o[i, j] = self.W[j, i]
        return out
    def grad_b(self, input_data):
        out = np.zeros(shape = (input_data.shape[0], self.output_dim, self.output_dim))
        for o in out:
            for i in range(self.output_dim):
                o[i, i] = 1
        return out 
    def grad_W(self, input_data):
        out = np.zeros(shape = (input_data.shape[0], self.output_dim, self.input_dim * self.output_dim))
        k=0
        for o in out:
          for i in range(self.output_dim):
            for j in range(self.input_dim):
                o[i, j*self.output_dim + i] = input_data[k, j]
          k +=1
        return out
    
    def update_W(self, grad, learning_rate):
        self.W -= learning_rate * np.mean(grad, axis=0).reshape(self.W.shape)
    
    def update_b(self, grad,  learning_rate):
        self.b -= learning_rate * np.mean(grad, axis=0).reshape(self.b.shape)
        
    def update_param(self, params_grad, learning_rate):
        self.update_W(params_grad[0], learning_rate)
        self.update_b(params_grad[1], learning_rate)
    
    def grad_param(self, input_data):
        return [self.grad_W(input_data), self.grad_b(input_data)]
    

class ReLU(Layer):
    def __init__(self):
        self.name = 'ReLU'
    def forward(self, input_data):
      out = np.zeros(input_data.shape)
      for (index, x) in np.ndenumerate(input_data):
        out[index] = x if x > 0 else 0
      return out
    def grad_x(self, input_data):
        out = np.zeros((input_data.shape[0], input_data.shape[1], input_data.shape[1]))
        for b in range(input_data.shape[0]):
            for i in range(input_data.shape[1]):
              out[b][i][i] =  1 if (input_data[b][i] > 0) else 0;
        return out 
    
    
class Softmax(Layer):
    def __init__(self):
        self.name = 'Softmax'
    def forward(self, input_data):
      out = np.zeros(input_data.shape)
      for batch in range(input_data.shape[0]):
        out[batch] = np.exp(input_data[batch]) / np.sum(np.exp(input_data[batch]))
      return out
    def grad_x(self, input_data):
        out = np.zeros(shape = (input_data.shape[0], input_data.shape[1], input_data.shape[1]))
        for batch in range(input_data.shape[0]):
            expon = np.exp(input_data[batch])
            for i in range(input_data.shape[1]):
                for j in range(input_data.shape[1]):
                    out[batch, i, j] = - (expon[i] * expon[j]) / np.sum(expon)**2 + (expon[i] / np.sum(expon) if i == j else 0) 
        return out
    

class FlattenLayer(Layer):
    def __init__(self):
        self.name = 'Flatten'
    def forward(self, input_data):
      out = np.zeros([input_data.shape[0], input_data.shape[1] * input_data.shape[2] * input_data.shape[3]])
      for batch in range(input_data.shape[0]):
          counter = 0
          for j in range(input_data.shape[2]):
              for k in range(input_data.shape[3]):
                  for i in range(input_data.shape[1]):
                      out[batch, counter] = input_data[batch, i, j, k]
                      counter += 1
      return out
    def grad_x(self, input_data):
      out = np.zeros((input_data.shape[0], input_data.shape[1] * input_data.shape[2] * input_data.shape[3], input_data.shape[1] * input_data.shape[2] * input_data.shape[3]))
      for batch in range(input_data.shape[0]):
        for i in range(input_data.shape[1] * input_data.shape[2] * input_data.shape[3]):
          out[batch, i, i] = 1
      return out


class MaxPooling(Layer):
    def __init__(self, pool_size=(2, 2), strides=2):
        self.name = 'MaxPooling'
        self.pool_size = pool_size
        self.strides = 2
    def forward(self, input_data):
      out = np.zeros([input_data.shape[0], input_data.shape[1],  (input_data.shape[2] - self.pool_size + self.stride)//self.stride, (input_data.shape[3] - self.pool_size + self.stride)//self.stride])
      for batch in range(input_data.shape[0]):
          for ch in range(input_data.shape[1]):
              for x in range(0, input_data.shape[2] - self.pool_size + 1, self.stride):
                  for y in range(0, input_data.shape[3] - self.pool_size + 1, self.stride):
                      out[batch, ch, x // self.strides, y // self.strides] = np.amax(input_data[batch, ch, x:x + self.pool_size, y:y + self.pool_size])
      return out    
    def grad_x(self, input_data):
        (p1, p2) = self.pool_size
        out = np.zeros([input_data.shape[0], input_data.shape[1]*(input_data.shape[2] - p1 +self.stride) //self.stride*(input_data.shape[3] - p2 +self.stride) //self.stride, input_data.shape[1]*input_data.shape[2]*input_data.shape[3]])
        for batch in range(input_data.shape[0]):
          for channel in range(input_data.shape[1]):
            for x in range(0, input_data.shape[2] - p1 + 1,self.stride):
              for y in range(0, input_data.shape[3] - p2 + 1,self.stride):
                for i in range(x,x+p1):
                  for j in range(y,y+p2):
                    out[batch, channel*(input_data.shape[2] - p1 +self.stride) //self.stride*(input_data.shape[3] - p2 +self.stride) //self.stride + (x//s)*(input_data.shape[3] - p2 +self.stride) //self.stride + (y//s), channel*input_data.shape[2]*input_data.shape[3] + i*input_data.shape[3] + j] = 1 if input_data[batch, channel, i, j] == np.amax(input_data[batch, channel, x:x+p1, y:y+p2]) else 0
        return out    

#### 1.2 (3 балла) Реализуйте теперь свёртночный слой   (опционально)

In [126]:
class Conv2D(Layer):
    def __init__(self, kernel_size, input_channels, output_channels, 
                 kernels_init=None, bias_init=None):
        self.name = 'Conv2D'
        self.kernel_size = kernel_size
        self.input_channels = input_channels
        self.input_channels = input_channels
        self.output_channels = output_channels
        if kernels_init is None or bias_init is None:
          self.kernel = np.random.normal(0, np.sqrt(2. / (kernel_size*kernel_size*output_channels)), size =  (kernel_size,kernel_size,input_channels,output_channels))
          self.bias = np.zeros(output_channels, 'float32')
        else:
          self.kernels = kernels_init
          self.bias = bias_init
            
    def forward(self, input_data):
      def point(x,y):
        return (np.moveaxis(x, 0, -1)*y).sum()

      input = input_data.copy()
      if self.padding == 'same':
          input = np.pad(input, pad_width=((0, 0), (0, 0), (max(self.kernel_size - (input.shape[2] - 1) % self.stride - 1, 0) // 2, max(self.kernel_size - (input.shape[2] - 1) % self.stride - 1, 0)-(max(self.kernel_size - (input.shape[2] - 1) % self.stride - 1, 0) // 2)),
                        (max(self.kernel_size - (input.shape[3] - 1) % self.stride - 1, 0) // 2,  max(self.kernel_size - (input.shape[3] - 1) % self.stride - 1, 0) - max(self.kernel_size - (input.shape[3] - 1) % self.stride - 1, 0) // 2)), mode='constant',
                        constant_values=0)
      out = np.zeros([input.shape[0], self.output_channels, (input.shape[2] - self.kernel_size + 1 + self.stride - 1) // self.stride, (input.shape[3] - self.kernel_size + 1 + self.stride - 1) // self.stride])
      for batch in range(input.shape[0]):
          for channel in range(self.output_channels):
              for x in range(0, input.shape[2] - self.kernel_size + 1, self.stride):
                  for y in range(0, input.shape[3] - self.kernel_size + 1, self.stride):
                      a = input[batch, :, x:x + self.kernel_size, y:y + self.kernel_size]
                      b = self.kernel[:, :, :, channel]
                      out[batch, channel, x // self.stride, y // self.stride] = point(a,b) \
                          + self.bias[channel]
      return out
    def grad_x(self):
        pass
    def grad_kernel(self):
        pass

#### 1.4 Теперь настало время теста. 
#### Если вы всё сделали правильно, то запустив следующие ячейки у вас должна появиться надпись: Test PASSED

Переходить к дальнейшим заданиям не имеем никакого смысла, пока вы не добьётесь прохождение теста
    

#### Чтение данных

In [127]:
import numpy as np
np.random.seed(123)  # for reproducibility
from keras.utils import np_utils
from keras.datasets import mnist
 
(X_train, y_train), (X_test, y_test) = mnist.load_data()

X_train = X_train.reshape(X_train.shape[0], 1, 28, 28)
X_test = X_test.reshape(X_test.shape[0], 1, 28, 28)
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
 

Y_train = np_utils.to_categorical(y_train, 10)
Y_test = np_utils.to_categorical(y_test, 10)
print(X_train.shape, Y_train.shape, X_test.shape, Y_test.shape)

(60000, 1, 28, 28) (60000, 10) (10000, 1, 28, 28) (10000, 10)


#### Подготовка моделей

### 2. Вычисление производных по входу для слоёв нейронной сети

#### 2.1 (1 балл) Реализуйте метод forward для класса CrossEntropy
Напоминание: $$ crossentropy = L(p, y) =  - \sum\limits_i y_i log p_i, $$
где вектор $(p_1, ..., p_k) $ -  выход классификационного алгоритма, а $(y_1,..., y_k)$ - правильные метки класса в унарной кодировке (one-hot encoding)

In [128]:
class CrossEntropy(object):
    def __init__(self, eps=0.00001):
        self.name = 'CrossEntropy'
        self.eps = eps
    
    def forward(self, input_data, labels):
        log = np.log(input_data + self.eps)
        out = - np.sum(labels * log, axis = 1)
        return out
    
    def calculate_loss(self,input_data, labels):
        return self.forward(input_data, labels)
    
    def grad_x(self, input_data, labels):
        out = np.zeros_like(input_data, dtype = np.float64)
        for batch in range(input_data.shape[0]):
          out[batch] = (-labels[batch] / (input_data[batch]+self.eps))
        return out

#### 2.2 (2 баллa) Реализуйте метод grad_x класса CrossEntropy, который возвращает $\frac{\partial L}{\partial p}$

Проверить работоспособность кода поможет следующий тест:

In [129]:
def numerical_diff_net(net, x, labels):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (net.calculate_loss(x + delta, labels) - net.calculate_loss(x-delta, labels)) / (2*eps)
        right_answer.append(diff)
    return np.array(right_answer).T

def test_net(net):
    x = np.array([[1, 2, 3], [2, 3, 4]])
    labels = np.array([[0.3, 0.2, 0.5], [0.3, 0.2, 0.5]])
    num_grad = numerical_diff_net(net, x, labels)
    grad = net.grad_x(x, labels)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)
        
loss = CrossEntropy()
test_net(loss)

Test PASSED


#### 2.3 (2 балла)   Реализуйте метод grad_x класса Softmax, который возвращает $\frac{\partial Softmax}{\partial x}$

Проверить работоспособность кода поможет следующий тест:

In [130]:
def numerical_diff_layer(layer, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(x[0])):
        delta = np.zeros(len(x[0]))
        delta[i] = eps
        diff = (layer.forward(x + delta) - layer.forward(x-delta)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_layer(layer):
    x = np.array([[1, 2, 3], [2, -3, 4]])
    num_grad = numerical_diff_layer(layer, x)
    grad = layer.grad_x(x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)
        
layer = Softmax()
test_layer(layer)

Test PASSED


#### 2.4 (5 баллов) Реализуйте метод grad_x для классов ReLU и DenseLayer

In [131]:
layer = ReLU()
test_layer(layer)

Test PASSED


In [132]:
layer = DenseLayer(3,4)
test_layer(layer)

Test PASSED


#### 2.5 (4 балла) Для класса Network реализуйте метод grad_x, который должен реализовывать взятие производной от лосса по входу

In [133]:
net = Network([DenseLayer(3, 10), ReLU(), DenseLayer(10, 3), Softmax()], loss=CrossEntropy())
test_net(net)

Test PASSED


### 3. Реализация градиентов по параметрам и метода обратного распространения ошибки с обновлением парметров сети

#### 3.1 (4 балла) Реализуйте функции grad_b и grad_W. При подготовке теста grad_W предполагается, что W является отномерным вектором.

In [134]:
def numerical_grad_b(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(len(b)):
        delta = np.zeros(b.shape)
        delta[i] = eps
        dense1 = DenseLayer(input_size, output_size, W_init=W, b_init=b+delta)
        dense2 = DenseLayer(input_size, output_size, W_init=W, b_init=b-delta)
        diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
        right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_b():
    input_size = 3
    output_size = 4 
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((output_size,))
    x = np.random.random((2, input_size))
    
    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_b(x)

    num_grad = numerical_grad_b(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_b()

Test PASSED


In [135]:
def numerical_grad_W(input_size, output_size, b, W, x):
    eps = 0.00001
    right_answer = []
    for i in range(W.shape[0]):
        for j in range(W.shape[1]):
            delta = np.zeros(W.shape)
            delta[i, j] = eps
            dense1 = DenseLayer(input_size, output_size, W_init=W+delta, b_init=b)
            dense2 = DenseLayer(input_size, output_size, W_init=W-delta, b_init=b)
            diff = (dense1.forward(x) - dense2.forward(x)) / (2*eps)
            right_answer.append(diff.T)
    return np.array(right_answer).T

def test_grad_W():
    input_size = 3
    output_size = 4 
    W_init = np.random.random((input_size, output_size))
    b_init = np.random.random((4,))
    x = np.random.random((2, input_size))
        
    dense = DenseLayer(input_size, output_size, W_init, b_init)
    grad = dense.grad_W(x)

    num_grad = numerical_grad_W(input_size, output_size, b_init, W_init, x)
    if np.sum(np.abs(num_grad - grad)) < 0.01:
        print('Test PASSED')
    else:
        print('Something went wrong!')
        print('Numerical grad is')
        print(num_grad)
        print('Your gradiend is ')
        print(grad)

test_grad_W()

Test PASSED


#### 3.2 (4 балла) Полностью реализуйте метод обратного распространения ошибки в функции train_step класса Network


Рекомендуем реализовать сначала функцию Network.grad_param(), которая возвращает список длиной в количество слоёв и элементом которого является список градиентов по параметрам.
После чего, имея список градиентов, написать функцию обновления параметров для каждого слоя. 

Совет: рекомендуем написать тест для кода подсчета градиента по параметрам, чтобы быть уверенным в том, что градиент через всю сеть считается правильно
    

#### 3.3 Ознакомьтесь с реализацией функции fit класса Network. Запустите обучение модели. Если всё работает правильно, то точность на валидации должна будет возрастать

In [138]:
net = Network([DenseLayer(784, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::3], Y_train[::3], validation_split=0.25, 
            batch_size=16, nb_epoch=10, learning_rate=0.01)

100%|██████████| 937/937 [02:47<00:00,  5.59it/s]



 1 epoch: val 0.85 



100%|██████████| 937/937 [02:50<00:00,  5.50it/s]



 2 epoch: val 0.87 



100%|██████████| 937/937 [02:46<00:00,  5.61it/s]



 3 epoch: val 0.88 



100%|██████████| 937/937 [02:48<00:00,  5.55it/s]



 4 epoch: val 0.89 



100%|██████████| 937/937 [02:46<00:00,  5.62it/s]



 5 epoch: val 0.89 



100%|██████████| 937/937 [02:48<00:00,  5.55it/s]



 6 epoch: val 0.89 



100%|██████████| 937/937 [02:46<00:00,  5.62it/s]



 7 epoch: val 0.90 



100%|██████████| 937/937 [02:48<00:00,  5.56it/s]



 8 epoch: val 0.90 



100%|██████████| 937/937 [02:46<00:00,  5.64it/s]



 9 epoch: val 0.90 



100%|██████████| 937/937 [02:46<00:00,  5.63it/s]



 10 epoch: val 0.90 



#### 3.5 (2 балла) Продемонстрируйте, что ваша реализация позволяет обучать более глубокие нейронные сети 

In [None]:
net = Network([DenseLayer(784, 200), ReLU(), DenseLayer(200, 100), ReLU(), DenseLayer(100, 20), ReLU(), DenseLayer(20, 10), Softmax()], loss=CrossEntropy())
trainX = X_train.reshape(len(X_train), -1)
net.fit(trainX[::6], Y_train[::6], validation_split=0.25, 
            batch_size=16, nb_epoch=5, learning_rate=0.01)    

100%|██████████| 468/468 [31:50<00:00,  4.08s/it]



 1 epoch: val 0.83 



100%|██████████| 468/468 [31:37<00:00,  4.05s/it]



 2 epoch: val 0.87 



100%|██████████| 468/468 [31:39<00:00,  4.06s/it]



 3 epoch: val 0.89 



  8%|▊         | 39/468 [02:38<29:41,  4.15s/it]