In [339]:
from typing import List, Tuple

from numpy import ndarray, dot, transpose, ones_like, sum, exp, mean, power, sqrt
from numpy.random import seed, randn, permutation

In [340]:
def assert_same_shape(array:ndarray,
                      array_grad: ndarray):
    assert array.shape == array_grad.shape, \
    "Two ndarrays should have the same shape:" \
    "instead, first ndarray's shae is {0}" \
    "and second ndarray's shape is {1}".format(tuple(array_grad.shape),
                                               tuple(array.shape))

In [341]:
# В Python 3 нет разницы: с object или без него. в Python 2 была, можно оставить для совместимости
class Operation(object):
    """
    Базовый класс операции в нейросети
    """
    def __init__(self):
        pass

    def forward(self, input_: ndarray):
        """
        Хранение ввода в атрибуте экземпляра self.input_
        :param input_:
        :return:
        """
        self.input_ = input_

        self.output = self._output()

        return self.output

    def backward(self, output_grad: ndarray) -> ndarray:
        """
        Вызов функции self._input_grad().
        Проверка совпадения размерностей.
        :param output_grad:
        :return:
        """
        assert_same_shape(self.output, output_grad)

        self.input_grad = self._input_grad(output_grad)

        assert_same_shape(self.input_, self.input_grad)

        return self.input_grad

    def _output(self) -> ndarray:
        """
        Метод _output определяется для каждой операции
        :return:
        """
        raise  NotImplementedError()

    def _input_grad(self, output_grad: ndarray) -> ndarray:
        """
        Метод _input_grad определяется для каждой операции
        :param output_grad:
        :return:
        """
        raise NotImplementedError()

In [342]:
class ParamOperation(Operation):
    """
    Операция с параметрами
    """
    def __init__(self, param:ndarray):
        super().__init__()
        self.param = param

    def backward(self, output_grad: ndarray) -> ndarray:
        """
        Вызов self._input_grad и self._param_grad
        Проверка размерностей
        :param output_grad:
        :return:
        """
        assert_same_shape(self.output, output_grad)

        self.input_grad = self._input_grad(output_grad)
        self.param_grad = self._param_grad(output_grad)

        assert_same_shape(self.input_, self.input_grad)
        assert_same_shape(self.param, self.param_grad)

        return self.input_grad

    def _param_grad(self, output_grad: ndarray) -> ndarray:
        """
        Во всех подклассах ParamOperation должна быть реализация метода _param_grad
        :param output_grad:
        :return:
        """
        raise NotImplementedError()

In [343]:
class WeightMultiply(ParamOperation):
    """
    Умножение весов в нейронной сети.
    """
    def __init__(self, W: ndarray):
        super().__init__(W)

    def _output(self) -> ndarray:
        """
        Вычисление выхода
        :return:
        """
        return dot(self.input_, self.param)
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        """
        Вычисление градиента
        :param output_grad:
        :return:
        """
        return dot(output_grad, transpose(self.param, (1, 0)))
    def _param_grad(self, output_grad: ndarray) -> ndarray:
        """
        Вычисление градиента параметров
        :param output_grad:
        :return:
        """
        return dot(transpose(self.input_, (1,0)), output_grad)

In [344]:
class BiassAdd(ParamOperation):
    """
    Прибавление отклонений
    """
    def __init__(self, B:ndarray):
        assert B.shape[0] == 1
        super().__init__(B)

    def _output(self) -> ndarray:
        """
        Вычисление выхода
        :return:
        """
        return self.input_ + self.param
    def _input_grad(self, output_grad: ndarray) -> ndarray:
        return ones_like(self.input_) * output_grad

    def _param_grad(self, output_grad: ndarray) -> ndarray:
        """
        Вычисление градиента параметров
        :param output_grad:
        :return:
        """
        param_grad = ones_like(self.param) * output_grad

        return sum(param_grad, axis=0).reshape(1, param_grad.shape[1])

In [345]:
class Sigmoid(Operation):
    """
    Сигмоидная функция активации
    """
    def __init__(self) -> None:
        super().__init__()

    def _output(self) -> ndarray:
        """
        Вычисление выхода
        :return:
        """
        return 1.0/(1.0+exp(-1.0*self.input_))

    def _input_grad(self, output_grad: ndarray) -> ndarray:
        """Вычисление градиента"""
        sigmoid_backward = self.output * (1.0 - self.output)
        input_grad = sigmoid_backward * output_grad
        return input_grad

In [346]:
class Linear(Operation):
    """
    Уникальная функция активации
    """
    def __init__(self) -> None:
        super().__init__()

    def _output(self) -> ndarray:
        return self.input_

    def _input_grad(self, output_grad: ndarray) -> ndarray:
        return output_grad

Метод _setup_layer нельзя вызывать прямо в __init__, потмоу что еще нет входных параметров на данном этапе
Он будет выпонлятся при тренировке, если self.first == True

In [347]:
class Layer(object):
    """
    Слой нейронов в нейросети
    """
    def __init__(self, neurons: int):
        """
        :param neurons: Число нейронов (~ ширина слоя)
        """
        self.neurons = neurons
        self.first = True # Первый ли проход через этот слой
        self.params: List[ndarray] = []
        self.param_grads: List[ndarray] = []
        self.operations: List[Operation] = []


    def _setup_layer(self, input_: ndarray) -> None:
        """
        Функция реализуется в каждом слое
        :param self:
        :param input_:
        :return:
        """
        raise NotImplementedError()

    def _forward(self, input_:ndarray) -> ndarray:
        """
        Передача входа вперед через серию операций
        :param input_:
        :return:
        """
        if self.first:
            # Эти настройки производить, если первый проход через слой
            self._setup_layer(input_)
            self.first = False

        self.input_ = input_

        for operation in self.operations:
            input_ = operation.forward(input_)

        self.output = input_

        self._params() # Не было в книге
        return self.output

    def _backward(self, output_grad: ndarray) -> ndarray:
        """
        Передача градиента назад через серию операций
        Проверка размерностей
        :param output_grad:
        :return:
        """
        assert_same_shape(self.output, output_grad)

        for operation in reversed(self.operations):
            output_grad = operation.backward(output_grad)

        input_grad = output_grad

        self._param_grads()
        return  input_grad

    def _param_grads(self) -> ndarray:
        """
        Извлечение param_grads из операций слоя
        :return:
        """
        self.param_grads = []
        for operation in self.operations:
            # является ли ParamOperation подклассом operation.__class__ (класс - подкласс самого себя)
            if issubclass(operation.__class__, ParamOperation):
                self.param_grads.append(operation.param_grad)
        return self.param_grads # Не было в книге

    def _params(self) -> ndarray:
        """
        Извлечение _params из операций слоя
        :return:
        """
        self.params = []
        for operation in self.operations:
            if issubclass(operation.__class__, ParamOperation):
                self.params.append(operation.param)
        return self.params # Не было в книге

В _setup_layer тип параметров не совпадает с типом параметров _setup_layer родителя

In [348]:
class Dense(Layer):
    """
    Полносвязаный слой, наследующий от Layer
    """
    def __init__(self,
                 neurons: int,
                 activation: Operation = Sigmoid()) -> None:
        """
        Для инициализации нужна (нелинейная) функция активации
        :param neurons:
        :param activation:
        """
        super().__init__(neurons)
        # seed_ потом передасться с помощью setattr
        self.seed_ = None
        self.activation = activation

    def _setup_layer(self, input_:ndarray) -> None:
        """
        Определение операций для полносвязного
        :param input_:
        :return:
        """
        if self.seed_:
            seed(self.seed_)

        self.params = []

        # веса
        self.params.append(randn(input_.shape[1], self.neurons))

        # отклонения
        self.params.append(randn(1, self.neurons))

        self.operations = [WeightMultiply(self.params[0]),
                           BiassAdd(self.params[1]),
                           self.activation]
        return None

In [349]:
class Loss(object):
    """
    Потери нейросети
    """
    def __init__(self):
        pass
    def _forward(self, prediction: ndarray, target: ndarray) -> float:
        """
        Вычисление значения потери
        :param prediction:
        :param target:
        :return:
        """
        assert_same_shape(prediction, target)

        self.prediction = prediction
        self.tarrget = target

        loss_value = self._output()

        return loss_value

    def _backward(self) -> ndarray:
        """
        Вычисление градиента потерь по входам функции потерь
        :return:
        """
        self.input_grad = self._input_grad()

        assert_same_shape(self.prediction, self.input_grad)

        return self.input_grad

    def _output(self) -> float:
        """
        Функция _output должна реализовываться всем подклассом класса Loss
        :return:
        """
        raise NotImplementedError()

    def _input_grad(self) -> ndarray:
        """
        Функция должна реализовыватсья всеми подклассами класса Loss
        :return:
        """
        raise NotImplementedError()

In [350]:
class MeanSquaredError(Loss):

    def __init__(self):
        super().__init__()

    def _output(self) -> float:
        """
        Вычисление среднего квадрата ошибки
        :return:
        """
        loss = mean(power(self.prediction - self.tarrget, 2))

        return loss

    def _input_grad(self) -> ndarray:
        """
        Вычисление градиента ошибки по входу MSE
        :return:
        """
        return 2.0 * (self.prediction - self.tarrget) / self.prediction.shape[0]

In [351]:
class NeuralNetwork(object):
    """
    Класс нейронной сети
    """
    def __init__(self, layers: List[Layer],
                 loss: Loss,
                 seed_: float = 1):
        """
        Нейросети нужны слои и поетри
        :param layers:
        :param loss:
        :param seed_:
        """
        self.layers = layers
        self.loss = loss
        self.seed_ = seed_
        if seed_:
            for layer in self.layers:
                # в объекте layer добавить значение атрибуту seed_ (или добавить такой атрибут)
                setattr(layer, "seed_", self.seed_)

    def _forward(self, x_batch: ndarray) -> ndarray:
        """
        Передача данных через последовательность слоев
        :param x_batch:
        :return:
        """
        x_out = x_batch
        for layer in self.layers:
            x_out = layer._forward(x_out)
        return x_out

    def _backward(self, loss_grad: ndarray) -> None:
        """
        Передача данных назад через последовательность слоев
        :param loss_grad:
        :return:
        """
        grad = loss_grad
        for layer in reversed(self.layers):
            grad = layer._backward(grad)

        return None

    def train_batch(self,
                    x_batch: ndarray,
                    y_batch: ndarray) -> float:
        """
        Передача данных вперед через последовательность слоев.
        Вычисление потерь.
        Передача данных назад через последовательность слоев.
        :param x_batch:
        :param y_batch:
        :return:
        """
        predictions = self._forward(x_batch)

        loss = self.loss._forward(predictions, y_batch)

        self._backward(self.loss._backward())

        return loss

    def params(self):
        """
        Получение параметров нейросети
        :return:
        """
        for layer in self.layers:
            yield from layer.params

    def param_grads(self):
        """
        Получение градиента потерь по отношению к параметрам нейросети
        :return:
        """
        for layer in self.layers:
            # yield добавляет в генератор итераторы
            # from из
            yield from layer.param_grads
            # == for lp in layer.param_grads: yield lp

In [352]:
class Optimizer(object):
    """
    Базовый класс отмизатора нейросети (Способ обновления весов)
    """
    def __init__(self, learning_rate: float = 0.01):
        self.learning_rate = learning_rate

    def step(self) -> None:
        """
        Обновить веса на основе текущих градиентов
        :return:
        """
        ...

In [353]:
class SGD(Optimizer):
    """
    Стохастический градиентный оптимизатор
    """
    def __init__(self,
                 learning_rate: float = 0.01):
        super().__init__(learning_rate)
        # Сеть потом передасться с помощью setattr
        self.net = None

    def step(self):
        """
        Для каждого параметра настраивается направление.
        Амплитуда регулировки зависит от скорости обучения
        :return:
        """
        for (param, param_grad) in zip(self.net.params(),
                                       self.net.param_grads()):
            param -= self.learning_rate * param_grad


In [354]:
def permute_data(X, y):
    assert X.shape[0] == len(y), \
        "Размерности x и y не совпдают"
    perm = permutation(X.shape[0])
    return X[perm], y[perm]

 Ответ на вопрос в конце этого файла
 Не выйдет ли за пределы массива/списка X[ii:ii+size], y[ii:ii+size]

In [355]:
class Trainer(object):
    """
    Обучение нейросети
    """
    def __init__(self,
                 net: NeuralNetwork,
                 optim: Optimizer):
        """
        Для обучения нужны нейросеть и оптимизатор. Нейросеть
        назначается атрибутом экземпляра оптимизатора.
        :param net:
        :param optim:
        """
        self.net = net
        self.optim = optim
        setattr(self.optim, 'net', self.net)

    def generate_batches(self, X: ndarray, y: ndarray, size: int=32) -> Tuple[ndarray]:
        """
        Генерация партий для тренировки
        :param X:
        :param y:
        :param size:
        :return:
        """
        assert X.shape[0] == y.shape[0], \
        "Размерности x и y не совпдают"

        N = X.shape[0]

        # НЕ БУДЕТ ЛИ ВЫХОДА ЗА ПРЕДЕЛЫ ?
        for ii in range(0, N, size):
            X_batch, y_batch = X[ii:ii+size], y[ii:ii+size]

            yield X_batch, y_batch

    def fit(self, X_train: ndarray, y_train: ndarray,
            X_test: ndarray, y_test: ndarray,
            epochs: int=10,
            eval_every: int=10,
            batch_size: int=32,
            seed_: int=1,
            restart: bool=True) -> None:
        """
        Подгонка нейросети под обучающие данные за некоторое число эпох.
        :param X_train:
        :param y_train:
        :param X_test: На обучение не влияет, нужно для оценки
        :param y_test: На обучение не влияет, нужно для оценки
        :param epochs:
        :param eval_every: Через каждые eval_every эпох выполняется оценка
        :param batch_size:
        :param seed_:
        :param restart: Если True, повторно инициализировать случайно параметры модели при вызове fit
        :return:
        """
        seed(seed_)

        if restart:
            for layer in self.net.layers:
                layer.first = True
                # first = True => запуск setup в слое => инициализация весов

        for e in range(epochs):
            X_train, y_train = permute_data(X_train, y_train)

            batch_generator = self.generate_batches(X_train, y_train, batch_size)

            for ii, (X_batch, y_batch) in enumerate(batch_generator):
                # Прогон веперд и назад (обновление градиентов)
                self.net.train_batch(X_batch, y_batch)
                # Обновление весов
                self.optim.step()

            # Пора ли делать оценку?
            if (e+1) % eval_every == 0:
                # Аналог predict
                test_preds = self.net._forward(X_test)
                loss = self.net.loss._forward(test_preds, y_test)
                print(f"Validation loss after {e+1} epochs is {loss:.3f}")

                # Далее по книге реализован откат на шаг к предыдущей оценке (net и optim), если оценка стала хуже. Но для обучения нельзя использвать тестовые данные. Хотя если разделить данные на train, validation и test, то можно использовать validation

In [356]:
def mae(y_true: ndarray, y_pred: ndarray):
    """
    Найти срднюю абсолную ошибку
    :param y_true:
    :param y_pred:
    :return:
    """
    return mean(abs(y_true - y_pred))

def rmse(y_true: ndarray, y_pred: ndarray):
    """
    Найти среднюю квадратическую ошибку
    :param y_true:
    :param y_pred:
    :return:
    """
    return sqrt(mean(power(y_true - y_pred, 2)))

def eval_regression_model(model: NeuralNetwork,
                          X_test: ndarray,
                          y_test: ndarray):
    """
    Найти абсолютную и квадратическую ошибку
    :param model:
    :param X_test:
    :param y_test:
    :return:
    """
    preds = model._forward(X_test).reshape(-1, 1)
    print("Mean absolute error {:.2f}".format(mae(preds, y_test)))
    print("Root mean squared error {:.2f}".format(rmse(preds, y_test)))

In [357]:
linear_regression = NeuralNetwork(
    layers=[Dense(neurons=1,
            activation=Linear())],
    loss = MeanSquaredError(),
    seed_=20190501
)

In [358]:
neural_network = NeuralNetwork(
    layers=[Dense(neurons=13,
                 activation=Sigmoid()),
            Dense(neurons=1, activation=Linear())],
    loss = MeanSquaredError(),
    seed_=20190501
)

In [359]:
dl = NeuralNetwork(
    layers=[Dense(neurons=13,
                  activation=Sigmoid()),
            Dense(neurons=13,
                  activation=Sigmoid()),
            Dense(neurons=1,
                  activation=Linear())],
    loss=MeanSquaredError(),
    seed_=20190501
)

In [360]:
from sklearn.datasets import load_boston

boston = load_boston()
data = boston.data
target = boston.target
features = boston.feature_names

In [361]:
from sklearn.preprocessing import StandardScaler
s = StandardScaler()
data = s.fit_transform(data)

In [362]:
def to_2d_np(a: ndarray,
             type: str="col") -> ndarray:
    """
    1D into 2D
    :param a:
    :param type:
    :return:
    """
    assert a.ndim == 1, \
    "Размерность должна быть 1"
    if type == "col":
        return a.reshape(-1, 1)
    elif type == "row":
        return a.reshape(1, -1)

In [363]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.3, random_state=80718)

y_train, y_test = to_2d_np(y_train), to_2d_np(y_test)

In [364]:
list_models = [linear_regression, neural_network, dl]
for model in list_models:
    optimizer = SGD(learning_rate = 0.01)
    trainer = Trainer(model, optimizer)
    trainer.fit(X_train, y_train, X_test, y_test,
                epochs=50,
                eval_every=10,
                seed_=20190501)

    eval_regression_model(model, X_test, y_test)
    print("________________________")

Validation loss after 10 epochs is 30.293
Validation loss after 20 epochs is 28.469
Validation loss after 30 epochs is 26.293
Validation loss after 40 epochs is 25.541
Validation loss after 50 epochs is 25.087
Mean absolute error 3.52
Root mean squared error 5.01
________________________
Validation loss after 10 epochs is 27.435
Validation loss after 20 epochs is 21.839
Validation loss after 30 epochs is 18.918
Validation loss after 40 epochs is 17.195
Validation loss after 50 epochs is 16.215
Mean absolute error 2.60
Root mean squared error 4.03
________________________
Validation loss after 10 epochs is 44.143
Validation loss after 20 epochs is 25.278
Validation loss after 30 epochs is 22.339
Validation loss after 40 epochs is 16.500
Validation loss after 50 epochs is 14.655
Mean absolute error 2.45
Root mean squared error 3.83
________________________


In [369]:
# Тестп "Почему не выходит за пределы массива"
def gener():
    N = 32
    X = range(N)
    y = range(N)
    print(X[30:35])
    size = 5
    for ii in range(0, N, size):
        X_batch, y_batch = X[ii:ii+size], y[ii:ii+size]
        yield X_batch, y_batch
print([i for i in gener()])

range(30, 32)
[(range(0, 5), range(0, 5)), (range(5, 10), range(5, 10)), (range(10, 15), range(10, 15)), (range(15, 20), range(15, 20)), (range(20, 25), range(20, 25)), (range(25, 30), range(25, 30)), (range(30, 32), range(30, 32))]
