## 手动实现bp网络

In [None]:
#! /usr/bin/env python
# -*-coding: utf-8 -*-

"""
function: realize multiple perceptron and back propagation by numpy
"""
import sys
sys.path.append("../d2l_func/")
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from data_prepare import data_iter
from sqdm import sqdm


class MLP(object):
    """
    function: define MLP network
    params input_num: the neuron number in input layer
    params hidden_num: the neuron number in hidden layer
    params out_num: the neuron number in output layer
    """
    def __init__(self, input_num, hidden_num, output_num, alpha=0.01):
        self.input_num = input_num
        self.hidden_num = hidden_num
        self.output_num = output_num
        self.sample_num = None
        self.w1 = np.random.normal(0, 0.01, size=(input_num, hidden_num))
        self.b1 = np.zeros(hidden_num)
        self.w2 = np.random.normal(0, 0.01, size=(hidden_num, output_num))
        self.b2 = np.zeros(output_num)
        self.alpha = alpha

    @staticmethod
    def relu(y_pred):
        return np.maximum(0, y_pred)

    @staticmethod
    def softmax(y_pred):
        return np.exp(y_pred)/(np.expand_dims(np.exp(y_pred).sum(axis=1), 1))

    @staticmethod
    def linreg(X, w, b):
        return X@w + b

    def predict_prod(self, X):
        a1 = self.relu(self.linreg(X, self.w1, self.b1))
        a2 = self.softmax(self.linreg(a1, self.w2, self.b2))
        return a2

    @staticmethod
    def entropy_loss(y_pred, y):
        return -(y*np.log(y_pred)).sum()/len(y)

    @staticmethod
    def cal_error_grad(y_pred, y):
        """
        the derivative of Loss about z is "[diag(a)- aa^T] @ diag(1/a) @ y^T",
        in the formula above, Loss is "y^T@log(a)", y is the true label
        in one-hot, a is the softmax output, z is the output before softmax,
        the shape of y_pred and y is c x 1
        """
        diag_a = np.diag(1 / y_pred)
        diag_f = np.diag(y_pred) - y_pred @ y_pred.T
        error_grad = diag_f @ diag_a @ y
        return error_grad.T

    def cal_batch_error_grad(self, y_pred, y):
        """
        the function "cal_error_grad" calculate the error item grad of a sample,
        but we often enter a batch sample, so this function is used to
        calculate in batch sample
        """
        error_grad = np.zeros((1, self.output_num))
        for index in range(self.sample_num):
            temp_y = y[index]
            temp_pred = y_pred[index]
            error_grad = np.vstack((error_grad, self.cal_error_grad(temp_pred, temp_y)))

        error_grad = -error_grad[1:] / self.sample_num

        return error_grad

    def back_propagation(self, X, y_pred, y):
        # output in each layer
        a0 = X
        a1 = self.relu(self.linreg(X, self.w1, self.b1))

        # error item
        error_grad2 = self.cal_batch_error_grad(y_pred, y)
        error_grad1 = error_grad2 @ self.w2.T @ self.relu(np.diag(a1))

        print(error_grad2.shape)
        print(error_grad1.shape)

        # grad of wight and bias
        dw2 = a1.T @ error_grad2
        db2 = error_grad2
        dw1 = a0.T @ error_grad1
        db1 = error_grad1

        # print(db1)
        # print(db1.shape)
        # print(dw1)
        # print(dw1.shape)
        # print(db2)
        # print(db2.shape)
        # print(dw2)
        # print(dw2.shape)

        return db1, dw1, db2, dw2

    def fit(self, X, y):
        # sample num in each iteration
        self.sample_num = int(X.size / self.input_num)
        # predict
        y_pred = self.predict_prod(X)

        # bp
        db1, dw1, db2, dw2 = self.back_propagation(X, y_pred, y)

        # update grad
        self.w1 -= self.alpha * dw1
        self.b1 -= self.alpha * db1
        self.w2 -= self.alpha * dw2
        self.b2 -= self.alpha * db2

    def predict(self, X):
        y_pred = self.predict_prod(X)
        y_pred = np.argmax(y_pred, axis=1).reshape(-1)
        return y_pred

    def score(self, X, y):
        y_pred = self.predict(X)
        acc = (y_pred == y).sum() / len(y)
        return acc


params = {
    "epoch_num": 10,
    "batch_size": 1,
}

# deal with label in iris data, such as (0 --> [1, 0, 0])
label_dict = {
    0: [1, 0, 0],
    1: [0, 1, 0],
    2: [0, 0, 1]
}

iris = load_iris()
x = iris.data
y = np.array(list(map(lambda x: label_dict[x], list(iris.target))))

model = MLP(4, 6, 3)
xtrain, xtest, ytrain, ytest = train_test_split(x, y)

for epoch in range(params["epoch_num"]):
    for xdata, ydata in data_iter(params["batch_size"], xtrain, ytrain):
        model.fit(xdata, ydata)
        print(model.predict(xdata))
        print(model.score(xdata, ydata))

