# numpy实现MLP回归

## 安装依赖

In [None]:
import numpy as np
import pandas as pd
import math
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

## 获取数据集

因为keras现在不能直接调用load_boston()函数获取数据
我们选择使用url加载数据集并划分训练集和测试集
之后将特征标准化

In [None]:
def load_data():
    data_url = "http://lib.stat.cmu.edu/datasets/boston"
    raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
    print(raw_df.shape)
    data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
    target = raw_df.values[1::2, 2]
    X = data
    y = target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    #标准化特征
    scale = StandardScaler()
    X_train_s = scale.fit_transform(X_train)
    X_test_s = scale.transform(X_test)
    return X_train_s, y_train, X_test_s, y_test

## 初始化训练参数 定义激活函数

In [None]:
def initialize_parameters(layer_dims):
    parameters = {}
    L = len(layer_dims)
    for l in range(1, L):
        parameters['W' + str(l)] = np.random.randn(layer_dims[l], layer_dims[l-1]) * 0.01
        parameters['b' + str(l)] = np.zeros((layer_dims[l], 1))
    return parameters

def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x < 0, 0, 1)

def linear_activation_forward(A_prev, W, b):
    Z = np.dot(W, A_prev) + b
    A = relu(Z)  # 使用 Relu 激活函数
    return A

def compute_cost(AL, Y):
    m = Y.shape[0]
    cost = np.mean((AL - Y)**2)
    return cost

def linear_activation_backward(dA, A_prev, W, A):
    m = A.shape[1]
    dZ = dA * relu_derivative(A)
    dW = np.dot(dZ, A_prev.T) / m
    db = np.sum(dZ, axis=1, keepdims=True) / m
    dA_prev = np.dot(W.T, dZ)
    return dA_prev, dW, db

def update_parameters(parameters, grads, learning_rate):
    L = len(parameters) // 2
    for l in range(L):
        parameters["W" + str(l+1)] -= learning_rate * grads["dW" + str(l+1)]
        parameters["b" + str(l+1)] -= learning_rate * grads["db" + str(l+1)]
    return parameters

## MLP模型

In [None]:
def MLP_model(X, Y, layers_dims, learning_rate = 0.0075, num_iterations = 5000):
    np.random.seed(1)
    costs = []
    parameters = initialize_parameters(layers_dims)
    for i in range(num_iterations):
        A = X.T
        cache = []
        # 前向传播
        for l in range(1, len(layers_dims)):
            A_prev = A
            A = linear_activation_forward(A_prev, parameters['W' + str(l)], parameters['b' + str(l)])
            cache.append((A_prev, A, parameters['W' + str(l)], parameters['b' + str(l)]))

        cost = compute_cost(A, Y)
        costs.append(cost)

        # 启动反向传播
        grads = {}
        dA = A - Y.reshape(A.shape) 
        for l in reversed(range(len(cache))):
            A_prev, A, W, b = cache[l]
            dA, dW, db = linear_activation_backward(dA, A_prev, W, A)
            grads["dW" + str(l+1)] = dW
            grads["db" + str(l+1)] = db

        parameters = update_parameters(parameters, grads, learning_rate)

        if i % 100 == 0:
            print("Cost after iteration {}: {}".format(i, cost))

    return parameters, costs

## 读取数据进行训练

In [None]:
X_train, y_train, X_test, y_test = load_data()
layers_dims = [13, 100, 50, 1]  # 输入层13个特征、输出层1个单元
parameters, costs = MLP_model(X_train, y_train, layers_dims)

## 预测并对结果进行评价

In [4]:
def predict(X, parameters, layers_dims):
    A = X.T
    for l in range(1, len(layers_dims)):
        A_prev = A
        W = parameters['W' + str(l)]
        b = parameters['b' + str(l)]
        A = linear_activation_forward(A_prev, W, b)
    return A

def rmse(predictions, targets):
    MSE = 0
    MSE = pow(predictions - targets, 2).mean()
    RMSE = math.sqrt(MSE)
    return RMSE

def rsquare(predictions, targets):
    MSE = 0
    MSE = pow(predictions - targets, 2).mean()
    Rsquare = 1 - MSE / np.var(targets)
    return Rsquare

def mse(predictions, targets):
    MSE = 0
    MSE = pow(predictions - targets, 2).mean()
    return MSE

# 使用测试集进行预测
predictions = predict(X_test, parameters, layers_dims).flatten()

# 计算性能指标
test_rmse = rmse(predictions, y_test)
test_r2 = rsquare(predictions, y_test)
test_mse = mse(predictions, y_test)

print(f"Test RMSE: {test_rmse}")
print(f"Test MSE: {test_mse}")
print(f"Test Rsquare: {test_r2}")

print("Mean squared error: %.2f" % mean_squared_error(predictions, y_test))
print('Variance score: %.2f' % explained_variance_score(y_test, predictions))

(1012, 11)
Cost after iteration 0: 606.5528045123015
Cost after iteration 100: 20.098778089310056
Cost after iteration 200: 20.362578282122755
Cost after iteration 300: 19.849909108458384
Cost after iteration 400: 19.20070716232758
Cost after iteration 500: 18.860574782135647
Cost after iteration 600: 18.838029609944353
Cost after iteration 700: 18.462708517500886
Cost after iteration 800: 18.273200081790712
Cost after iteration 900: 18.216676668401576
Cost after iteration 1000: 18.052823974412718
Cost after iteration 1100: 17.933686596996214
Cost after iteration 1200: 17.903966866996615
Cost after iteration 1300: 17.81764847547403
Cost after iteration 1400: 17.74270917474376
Cost after iteration 1500: 17.745836974570448
Cost after iteration 1600: 17.686345485328545
Cost after iteration 1700: 17.63641890481917
Cost after iteration 1800: 17.59294591127666
Cost after iteration 1900: 17.61739103589796
Cost after iteration 2000: 17.581587193207877
Cost after iteration 2100: 17.547531098919

# 调用sklearn中MLP回归方法

## 安装依赖

In [None]:
import numpy as np
import math
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error,explained_variance_score

## 获取数据集

In [None]:
def load_data():
    data_url = "http://lib.stat.cmu.edu/datasets/boston"
    raw_df = pd.read_csv(data_url, sep="\s+", skiprows=22, header=None)
    print(raw_df.shape)
    data = np.hstack([raw_df.values[::2, :], raw_df.values[1::2, :2]])
    target = raw_df.values[1::2, 2]
    X = data
    y = target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    #标准化特征
    scale = StandardScaler()
    X_train_s = scale.fit_transform(X_train)
    X_test_s = scale.transform(X_test)
    return X_train_s, y_train, X_test_s, y_test

## 加载数据进行训练

In [None]:
X_train, y_train, X_test, y_test = load_data()
regr = MLPRegressor(hidden_layer_sizes=(100,50), activation='relu', random_state=1, max_iter=5000).fit(X_train, y_train)

## 预测并对结果进行评价

In [6]:
y_pred = regr.predict(X_test)
regr.score(X_test, y_test)
MSE = 0
MSE = pow(y_pred-y_test,2).mean()
RMSE = math.sqrt(MSE)
Rsquare = 1-MSE/np.var(y_test)
print(f"RMSE: {RMSE:0.4f}, MSE: {MSE:0.4f}, Rsquare: {Rsquare:0.4f}")

print("Mean squared error: %.2f" % mean_squared_error(y_pred, y_test))
print('Variance score: %.2f' % explained_variance_score(y_test, y_pred))

(1012, 11)
RMSE: 3.6094, MSE: 13.0280, Rsquare: 0.8223
Mean squared error: 13.03
Variance score: 0.83
