# 构建线性回归模型

### 必要的头文件

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns # 查看代码相关性
sns.set()    #设置画图空间为 Seaborn 默认风格
# 汉化字体
matplotlib.rc("font", family='DengXian')

### 加载数据集

In [None]:
def load_csv():
    data = pd.read_csv("boston.csv")
    print(data.head(5))
    return data

data = load_csv()

### 查看代码相关性，使用seaborn热力图

In [None]:
corrboston = data.corr()
plt.figure(figsize=(10,10))    #设置画布
sns.heatmap(corrboston,annot=True,cmap='RdGy')
plt.show()

### 数据预处理
- 查看缺失值

In [None]:
data.info()

- 重复值

In [None]:
data.drop_duplicates(subset=['name'], keep='first', inplace=True)

- 离群值

In [None]:
# 箱线图方法
# 使用箱线图的上下边界来判断异常值。
def remove_outliers_std(data, col):
    Q1 = data[col].quantile(0.25)
    Q3 = data[col].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    data = data[(data[col] >= lower_bound) & (data[col] <= upper_bound)]
    return data

# 标准差方法
# 根据数据的均值和标准差，将与均值相差超过几倍标准差的值视为异常值。
def remove_outliers_std(data, col, threshold=3):
    mean = data[col].mean()
    std = data[col].std()
    lower_bound = mean - threshold * std
    upper_bound = mean + threshold * std
    data = data[(data[col] >= lower_bound) & (data[col] <= upper_bound)]
    return data

# Z-score 方法
# 根据标准化后的数值来判断异常值，Z-score 超过一定阈值则被视为异常值。
def remove_outliers_zscore(data, col, threshold=3):
    z_scores = (data[col] - data[col].mean()) / data[col].std()
    data = data[abs(z_scores) < threshold]
    return data

### 归一化

In [4]:
# 最大最小z—score
def z_score_normalize_data(data, target_cols):
    for col in target_cols:
        col_min = data[col].min()
        col_max = data[col].max()
        data[col] = (data[col] - col_min) / (col_max - col_min)
    return data

# 标准化，适用标准差
def standardize_data(data, target_cols):
    for col in target_cols:
        mean = data[col].mean()
        std = data[col].std()
        data[col] = (data[col] - mean) / std
    return data

# 小数定标标准化
def decimal_scaling_normalize_data(data, target_cols):
    for col in target_cols:
        max_val = data[col].abs().max()
        data[col] = data[col] / 10**len(str(int(max_val)))
    return data

# 零均值归一化
# 零均值归一化是将数据转换为均值为0的分布，适用于处理不带明显上下界的数据集。
def zero_mean_normalize_data(data, target_cols):
    for col in target_cols:
        mean = data[col].mean()
        data[col] = data[col] - mean
    return data

代码执行

In [None]:
columns_to_process = ['CRIM', 'ZN', 'INDUS', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PIRATIO', 'B', 'LSTAT']
for col in columns_to_process:
    data = remove_outliers_std(data, col)
target_cols = ['ZN', 'INDUS', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PIRATIO', 'B', 'LSTAT']
data = z_score_normalize_data(data, target_cols)

### 数据集划分7:3或8:2

In [None]:
# 特征和标签分开
x = data.iloc[:, :-1].values
y = data.iloc[:, -1].values.reshape(-1, 1)
# 划分训练集和测试集
offset = int(x.shape[0] * 0.7)
x_train, x_test = x[:offset], x[offset:]
y_train, y_test = y[:offset], y[offset:]

### 两种方法训练模型

- 最小二乘法

In [None]:
# 采用正规方程来直接得出参数
def train_with_normal_equation(self, x, y):
    # 添加偏置项列
    X_b = np.c_[np.ones((x.shape[0], 1)), x]
    # 使用最小二乘法计算参数估计值
    theta = np.dot(np.dot(np.linalg.inv(np.dot(X_b.T, X_b)), X_b.T), y)
    self.b = theta[0]
    self.w = theta[1:]

def print_model_info(self):
     print("Weights (w):")
     print(self.w)
     print("Bias (b):")
     print(self.b)

- 梯度下降法，结合成本函数（损失函数），正则化等

In [None]:
# 通过设置学习率，计算偏导数不断更新参数
class Network(object):
    # 初始化ωc参数，
    def __init__(self, num_of_weights, lambda_reg=1):
        np.random.seed(0)
        self.w = -2 * np.random.randn(num_of_weights, 1)
        self.b = 0.
        self.lambda_reg = lambda_reg  # 正则化参数

    # 计算预测值
    def forward(self, x):
        z = np.dot(x, self.w) + self.b
        return z

    # 成本函数
    def loss(self, z, y):
        cost = np.mean((z - y) ** 2)
        return cost

    # 正则化成本函数——稍后补充
    def loss_with_reg(self, z, y):
        cost = np.mean((z - y) ** 2)
        # 计算 L1 正则化项
        reg = self.lambda_reg * np.sum(np.abs(self.w))
        return cost + reg

    # 计算梯度
    def gradient(self, x, y):
        z = self.forward(x)
        error = z - y
        gradient_w = (np.dot(x.T, error) + self.lambda_reg * self.w) / x.shape[0]  #
        gradient_b = np.mean(error)
        return gradient_w, gradient_b
    
    # 计算R方
    def r_squared(self, x, y):
        z = self.forward(x)
        SS_res = np.sum((z - y) ** 2)  # 残差平方和
        SS_tot = np.sum((y - np.mean(y)) ** 2)  # 总平方和
        r2 = 1 - SS_res / SS_tot
        return r2

    # 更新参数
    def update(self, gradient_w, gradient_b, eta=0.01):
        self.w -= eta * gradient_w
        self.b -= eta * gradient_b

    # 训练模型
    def train(self, x, y, iterations=100, eta=0.01, lambda_reg=1):
        losses = []
        for i in range(iterations):
            z = self.forward(x)
            L = self.loss(z, y)
            gradient_w, gradient_b = self.gradient(x, y)
            self.update(gradient_w, gradient_b, eta)
            losses.append(L)
            if (i + 1) % 1000 == 0:
                print('iter {}, loss {}'.format(i, L))
                r2_test = self.r_squared(x, y)
                print('R方: {}'.format(r2_test))
        return losses, self.w, self.b

- 特别备注：正则化，防止模型过拟合

In [None]:
# L2正则化
def loss_with_reg(self, z, y):
    cost = np.mean((z - y) ** 2)
    # Calculate L2 regularization term
    # 损失加上参数绝平方的总和乘以正则化系数系数的积
    reg = self.lambda_reg * np.sum(self.w ** 2)
    return cost + reg

# L1正则化
def loss_with_reg(self, z, y):
    cost = np.mean((z - y) ** 2)
    # 损失加上参数绝对值的总和乘以正则化系数系数的积
    reg = self.lambda_reg * np.sum(np.abs(self.w)) 
    return cost + reg

### 执行代码

In [None]:
net = Network(x_train.shape[1], lambda_reg=0.1)
num_iterations = 10000
losses, fw, fb= net.train(x_train, y_train, iterations=num_iterations, eta=0.01)


# 在测试集上验证模型性能
predictions_test = np.dot(x_test, fw) + fb
test_loss = np.mean((predictions_test - y_test) ** 2)
r2_test = net.r_squared(x_test, y_test)
print('测试集损失:', test_loss)
print('测试集R方:', r2_test)

# 使用最小二乘法训练模型
net.train_with_normal_equation(x_train, y_train)
train_loss = net.loss(net.forward(x_train), y_train)
train_r2 = net.r_squared(x_train, y_train)
test_r2_ne = net.r_squared(x_test, y_test)


### 可视化

In [None]:
plt.figure(figsize=(8, 8), num="损失函数")
plot_x = np.arange(num_iterations)
plot_y = np.array(losses)
plt.plot(plot_x, plot_y)
plt.ylabel("损失率", fontsize=15)
plt.title("损失率变化图", fontsize=25)
plt.show()