In [0]:
import numpy as np
import random

# активационная функция
def sigmoid(z):
    return 1.0 / (1.0 + np.exp(-z))

# производная активационной функции
def sigmoid_prime(z):
    return sigmoid(z) * (1 - sigmoid(z))

# квадратичная целевая функция
# test_data - кортеж из входных значений и списка значений классов
def cost_function(network, test_data, classes_count):
    c = 0
    for example, y in test_data:
        y_hat = network.feedforward(example)
        c += np.sum((y_hat - y) ** 2)
    return c /( classes_count * len(test_data))


In [0]:
class Network:
    # shape - топология сети: [кол-во нейронов во входном слое, 1 скрытом, ..., в выходном слое]
    # смещение и веса инициализируются случайными значениями (для всех слоев, кроме первого)
    def __init__(self, shape, activation_function, activation_function_derivative, debug= True):
        self.layers = len(shape)
        self.shape = shape
        self.biases = [np.random.randn(y, 1) for y in shape[1:]]
        self.weights = [np.random.randn(y, x) for x, y in zip(shape[:-1], shape[1:])]
        self.activation_function = activation_function
        self.activation_function_derivative = activation_function_derivative
        self.debug = debug

    # прогнать до конца пример из input_matrix - вектора входов размерностью (m, 1)
    # n - количество примеров, m - количество атрибутов
    # Возвращает матрицу (classes_count, 1)
    def feedforward(self, input_matrix):
        for b, w in zip(self.biases, self.weights):
            # weigts - (массив матриц)
            input_matrix = self.activation_function(np.dot(w, input_matrix) + b)
        return input_matrix

    # обновление параметров нейронной сети (веса, смещения), сделав шаг градиентного спуска
    # на основе алгоритма обратного распространения ошибки, примененного к одному mini batch
    # alpha - learning rate
    def update_mini_batch(self, mini_batch, alpha):
        # значения dJ/db для каждого слоя
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        # значения dJ/dw (ошибки) для каждого слоя
        nabla_w = [np.zeros(w.shape) for w in self.weights]

        # для каждого примера из батча применяем бек пропогейшн
        for x, y in mini_batch:
            delta_nabla_b, delta_nabla_w = self.backprop(x, y)
            nabla_b = [nb + dnb for nb, dnb in zip(nabla_b, delta_nabla_b)]
            nabla_w = [nw + dnw for nw, dnw in zip(nabla_w, delta_nabla_w)]
            
        eps = alpha / len(mini_batch)

        # обновляем параметры сети
        self.weights = [w - eps * nw for w, nw in zip(self.weights, nabla_w)]
        self.biases  = [b - eps * nb for b, nb in zip(self.biases,  nabla_b)]

    # прогоняем тестовую выборку и возвращаем количество правильных выдач
    def evaluate(self, test_data):
        test_results = [(self.feedforward(x), self.activation_function(y)) for (x, y) in test_data]
        return sum([np.sum(abs(x - y) < 1e-3) // 2 for (x, y) in test_results])

    # алгоритм стохастического (mini-batch) градиентного спуска
    # Если предоставлен опциональный аргумент ``test_data``, 
    # то после каждой эпохи обучения сеть будет протестирована на этих данных 
    # и промежуточный результат обучения будет выведен в консоль.
    def SGD(self, training_data, epochs, mini_batch_size, alpha, test_data= None):
        if test_data is not None: 
            tests_count = len(test_data)

        n = len(training_data)
        success_tests = 0
        for j in range(epochs):
            random.shuffle(training_data)
            # выбираем несолько батчей, покрывающих всю тренировочную выборку
            mini_batches = [training_data[k:k+mini_batch_size] for k in range(0, n, mini_batch_size)]
            # обновляем параметры сети (веса, биасы) бекпропогейшном для каждого из батчей
            for mini_batch in mini_batches:
                self.update_mini_batch(mini_batch, alpha)

            if test_data is not None and self.debug:
                success_tests = self.evaluate(test_data)
                print(f'Эпоха {j}: {success_tests} / {tests_count}')
            elif self.debug:
                print(f'Эпоха {j} завершена')

        # в случае наличия тестовой выборки возвращаем количество правильных активаций сети
        if test_data is not None:
            return success_tests / tests_count
    
    # возвращает вектор частных производных квадратичной целевой функции по активациям выходного слоя
    def cost_derivative(self, output_activations, y):
        return output_activations - self.activation_function(y)

    # алгоритм обратного распространения ошибки для одного примера из тренировочной выборки
    # возвращает кортеж (nabla_b, nabla_w) - градиентов по смещениям и весам соответственно
    # nabla_b, nabla_w - списки массивов ndarray для каждого слоя
    def backprop(self, x, y):
        nabla_b = [np.zeros(b.shape) for b in self.biases]
        nabla_w = [np.zeros(w.shape) for w in self.weights]

        # прямое распространение (forward pass)
        activations = [x]
        summatories = []
        for b, w in zip(self.biases, self.weights):
            summatories.append(np.dot(w, activations[-1]) + b)
            activation = self.activation_function(summatories[-1])
            activations.append(activation)

        # обратное распространение (backward pass)
        
        # ошибка для выходного слоя
        delta = self.cost_derivative(activations[-1], y) * self.activation_function_derivative(summatories[-1])
        # производная J по биасам выходного слоя
        nabla_b[-1] = delta
        # производная J по весам выходного слоя
        nabla_w[-1] = delta.dot(activations[-2].T)

        # Здесь l = 1 означает последний слой, l = 2 - предпоследний и так далее.  
        for l in range(2, self.layers):
            derivative = self.activation_function_derivative(summatories[-l])
            # суммарные ошибки следующих слоев
            total_deltas = self.weights[-l + 1].T.dot(delta)
            # ошибка на слое L-l
            delta = derivative * total_deltas
            # производная J по смещениям L-l-го слоя
            nabla_b[-l] = delta
            # производная J по весам L-l-го слоя
            nabla_w[-l] = delta.dot(activations[-l - 1].T) 
        return nabla_b, nabla_w

    def test(self, test_data):
        for (x, y) in test_data:
            result = self.feedforward(x)
            print(f'result for {list(x)} is {result} should be: {sigmoid(y)}')

In [46]:
test_data = [(np.array([[1.], [1.]]), np.array([[1.], [1.]])),
             (np.array([[2.], [1.]]), np.array([[2.], [1.]])),
             (np.array([[3.], [2.]]), np.array([[3.], [2.]]))]

network = Network([2,3,2], sigmoid, sigmoid_prime, True)
network.SGD(test_data, 100000, 3, 0.01, test_data)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
 [0.00988533]]
[[0.05091527]
 [0.00559603]]
[[0.12904235]
 [0.10998418]]
Эпоха 2239: 0 / 3
[[0.0508851 ]
 [0.00557819]]
[[0.1289985]
 [0.1099729]]
[[0.14469058]
 [0.00986485]]
Эпоха 2240: 0 / 3
[[0.05085496]
 [0.00556037]]
[[0.12895471]
 [0.10996161]]
[[0.14470663]
 [0.00984437]]
Эпоха 2241: 0 / 3
[[0.14472267]
 [0.00982391]]
[[0.12891098]
 [0.10995032]]
[[0.05082486]
 [0.00554254]]
Эпоха 2242: 0 / 3
[[0.12886729]
 [0.10993904]]
[[0.14473868]
 [0.00980345]]
[[0.0507948 ]
 [0.00552472]]
Эпоха 2243: 0 / 3
[[0.05076477]
 [0.00550691]]
[[0.14475467]
 [0.009783  ]]
[[0.12882365]
 [0.10992775]]
Эпоха 2244: 0 / 3
[[0.05073478]
 [0.0054891 ]]
[[0.12878007]
 [0.10991647]]
[[0.14477064]
 [0.00976255]]
Эпоха 2245: 0 / 3
[[0.14478658]
 [0.00974212]]
[[0.05070483]
 [0.0054713 ]]
[[0.12873653]
 [0.10990518]]
Эпоха 2246: 0 / 3
[[0.05067491]
 [0.00545351]]
[[0.12869305]
 [0.1098939 ]]
[[0.14480251]
 [0.00972169]]
Эпоха 2247: 0 / 3
[[0.05

KeyboardInterrupt: ignored

In [7]:
test_data = [(np.array([[1], [1]]), np.array([[1], [1]])),
             (np.array([[2], [1]]), np.array([[2], [1]]))]
class test_network:
    def feedforward(self, example):
        return np.array([[0], [0]])

cost_function(test_network(), test_data, 2)

1.75