In [7]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler  # 使用MinMaxScaler进行归一化
import torch
import torch.nn as nn
from torch import optim

# 设置随机种子以确保实验的可重复性
np.random.seed(42)
torch.manual_seed(42)

# Step 1: 生成合成数据集
def generate_data(n_samples=420):
    """
    生成合成的数据集,包含多个特征和目标变量“净电力”.
    :param n_samples: 生成的样本数量
    :return: 包含特征和目标变量的DataFrame
    """
    gross_income = np.random.rand(n_samples) * 1000  # 总收入
    population = np.random.rand(n_samples) * 10000  # 人口
    hourly_load = np.random.rand(n_samples) * 20  # 每小时负荷
    imp = np.random.rand(n_samples) * 5  # 进口
    exp = np.random.rand(n_samples) * 5  # 出口
    gross_production = np.random.rand(n_samples) * 15  # 总生产
    transmitted_energy = np.random.rand(n_samples) * 20  # 传输能量
    # 目标变量“净电力”,由特征的线性组合加上噪声生成
    net_electricity = (gross_income * 0.5 + population * 0.3 + hourly_load * 0.2) + np.random.randn(n_samples) * 100
    data = pd.DataFrame({
        'Gross Income': gross_income,
        'Population': population,
        'Hourly load': hourly_load,
        'Import': imp,
        'Export': exp,
        'Gross Production': gross_production,
        'Transmitted energy': transmitted_energy,
        'Net Electricity': net_electricity
    })
    return data

# 生成10000个样本的数据集
data = generate_data(n_samples=10000)
print(data.head())

# Step 2: 数据集划分和归一化
# 特征矩阵(去除目标变量)
X = data.drop('Net Electricity', axis=1).values
# 目标变量
y = data['Net Electricity'].values

# 将数据集划分为训练集、验证集和测试集
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# 使用MinMaxScaler对数据进行归一化处理
scaler = MinMaxScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# 对目标变量进行归一化处理
y_scaler = MinMaxScaler()
y_train = y_scaler.fit_transform(y_train.reshape(-1, 1)).flatten()
y_val = y_scaler.transform(y_val.reshape(-1, 1)).flatten()
y_test = y_scaler.transform(y_test.reshape(-1, 1)).flatten()

# Step 3: 定义神经网络模型
class NeuralNet(nn.Module):
    """
    定义一个简单的全连接神经网络模型.
    :param input_size: 输入特征的数量
    :param num_layers: 隐藏层的数量
    :param num_neurons: 每层神经元的数量
    """
    def __init__(self, input_size, num_layers, num_neurons):
        super(NeuralNet, self).__init__()
        layers = []
        # 第一层:输入层到隐藏层
        layers.append(nn.Linear(input_size, num_neurons))
        layers.append(nn.ReLU())
        # 添加额外的隐藏层
        for _ in range(num_layers - 1):
            layers.append(nn.Linear(num_neurons, num_neurons))
            layers.append(nn.ReLU())
        # 最后一层:隐藏层到输出层
        layers.append(nn.Linear(num_neurons, 1))
        self.model = nn.Sequential(*layers)
    
    def forward(self, x):
        """
        定义前向传播过程.
        :param x: 输入数据
        :return: 模型的输出
        """
        return self.model(x)

# Step 4: 实现SSA优化器
class SSAOptimizer:
    """
    实现麻雀搜索算法(Sparrow Search Algorithm, SSA)来优化神经网络的超参数.
    :param hyperparameter_space: 超参数搜索空间
    :param population_size: 种群大小
    :param max_iterations: 最大迭代次数
    """
    def __init__(self, hyperparameter_space, population_size=10, max_iterations=50):
        self.hyperparameter_space = hyperparameter_space
        self.population_size = population_size
        self.max_iterations = max_iterations
        self.population = self.initialize_population()
    
    def initialize_population(self):
        """
        初始化种群,每个个体包含一组超参数.
        :return: 初始化的种群
        """
        population = []
        for _ in range(self.population_size):
            individual = {}
            for param, bounds in self.hyperparameter_space.items():
                if isinstance(bounds, list):
                    # 如果是离散值,随机选择一个
                    individual[param] = np.random.choice(bounds)
                else:
                    # 如果是连续值,在范围内随机生成
                    individual[param] = np.random.uniform(bounds[0], bounds[1])
            population.append(individual)
        return population
    
    def optimize(self, model_class, X_train, y_train, X_val, y_val):
        """
        优化过程,找到最佳的超参数组合.
        :param model_class: 神经网络模型类
        :param X_train: 训练集特征
        :param y_train: 训练集目标
        :param X_val: 验证集特征
        :param y_val: 验证集目标
        :return: 最佳超参数组合
        """
        best_hyperparams = None
        best_loss = float('inf')
        for iter in range(self.max_iterations):
            for idx, individual in enumerate(self.population):
                # 根据当前超参数创建模型
                model = model_class(input_size=X_train.shape[1], num_layers=int(individual['num_layers']), num_neurons=int(individual['num_neurons']))
                # 使用Adam优化器
                optimizer = optim.Adam(model.parameters(), lr=individual['learning_rate'])
                # 定义损失函数
                loss_fn = nn.MSELoss()
                # 训练模型
                for epoch in range(100):
                    inputs = torch.from_numpy(X_train).float()
                    targets = torch.from_numpy(y_train).float().unsqueeze(1)
                    optimizer.zero_grad()
                    outputs = model(inputs)
                    loss = loss_fn(outputs, targets)
                    loss.backward()
                    optimizer.step()
                # 验证模型
                model.eval()
                with torch.no_grad():
                    val_inputs = torch.from_numpy(X_val).float()
                    val_targets = torch.from_numpy(y_val).float().unsqueeze(1)
                    val_outputs = model(val_inputs)
                    val_loss = loss_fn(val_outputs, val_targets)
                print(f"Iteration {iter+1}, Sparrow {idx+1}: Val Loss = {val_loss.item()}")
                # 更新最佳超参数
                if val_loss < best_loss:
                    best_loss = val_loss
                    best_hyperparams = individual
        return best_hyperparams

# Step 5: 执行超参数优化
# 定义超参数搜索空间
hyperparameter_space = {
    'learning_rate': [1e-6, 1e-5, 1e-4, 1e-3, 1e-2, 1e-1],  # 学习率
    'num_layers': [1, 2, 3],  # 隐藏层数量
    'num_neurons': [16, 32, 64]  # 每层神经元数量
}

# 创建SSA优化器
ssa_optimizer = SSAOptimizer(hyperparameter_space, population_size=5, max_iterations=2)
# 执行优化,找到最佳超参数
best_hyperparams = ssa_optimizer.optimize(NeuralNet, X_train, y_train, X_val, y_val)

# Step 6: 使用最佳超参数训练最终模型
# 创建最终模型
final_model = NeuralNet(input_size=X_train.shape[1], num_layers=int(best_hyperparams['num_layers']), num_neurons=int(best_hyperparams['num_neurons']))
# 使用Adam优化器
optimizer = optim.Adam(final_model.parameters(), lr=best_hyperparams['learning_rate'])
# 定义损失函数
loss_fn = nn.MSELoss()

# 记录训练损失
train_losses = []

# 训练模型
num_epochs = 100
for epoch in range(num_epochs):
    inputs = torch.from_numpy(X_train).float()
    targets = torch.from_numpy(y_train).float().unsqueeze(1)
    optimizer.zero_grad()
    outputs = final_model(inputs)
    loss = loss_fn(outputs, targets)
    loss.backward()
    optimizer.step()
    train_losses.append(loss.item())  # 记录每轮的损失
    if epoch % (num_epochs // 10) == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Step 8: 评估模型
final_model.eval()
with torch.no_grad():
    test_inputs = torch.from_numpy(X_test).float()
    test_targets = torch.from_numpy(y_test).float().unsqueeze(1)
    test_outputs = final_model(test_inputs)
    test_loss = loss_fn(test_outputs, test_targets)
    print(f"Test Loss: {test_loss.item()}")

# Step 9: 可视化测试集效果
# 计算测试集的预测值
predictions = final_model(torch.from_numpy(X_test).float()).detach().numpy().squeeze()

# 反归一化预测值和真实值
predictions = y_scaler.inverse_transform(predictions.reshape(-1, 1)).flatten()
y_test_original = y_scaler.inverse_transform(y_test.reshape(-1, 1)).flatten()

# 计算评估指标
rmse_val = np.sqrt(np.mean((y_test_original - predictions) ** 2))
mape_val = np.mean(np.abs((y_test_original - predictions) / y_test_original)) * 100
r2 = 1 - (np.sum((y_test_original - predictions) ** 2) / np.sum((y_test_original - np.mean(y_test_original)) ** 2))
pcd_val = np.mean(np.abs(predictions - y_test_original) / y_test_original) * 100

# 输出评估结果
print(f"RMSE: {rmse_val}")
print(f"MAPE: {mape_val}%")
print(f"MSE: {test_loss.item()}")
print(f"R²: {r2}")
print(f"PCD: {pcd_val}%")

   Gross Income   Population  Hourly load    Import    Export  \
0    374.540119  3736.408185    14.599966  3.190723  1.494560   
1    950.714306  3329.120962     3.690240  2.296462  0.474089   
2    731.993942  1761.539125     6.932794  4.822493  0.631796   
3    598.658484  6072.666701    13.265613  1.094892  0.903356   
4    156.018640  4766.241605     9.641787  2.939282  1.018267   

   Gross Production  Transmitted energy  Net Electricity  
0         12.708549           14.831104      1227.238914  
1          7.417756           17.622038      1377.514922  
2          2.931984            9.263598       953.224988  
3         11.049627            5.783575      2077.593870  
4          6.280172            6.376931      1552.279551  
Iteration 1, Sparrow 1: Val Loss = 0.0007156179053708911
Iteration 1, Sparrow 2: Val Loss = 0.12109171599149704
Iteration 1, Sparrow 3: Val Loss = 0.351617693901062
Iteration 1, Sparrow 4: Val Loss = 0.0007220886182039976
Iteration 1, Sparrow 5: Val Loss 

In [11]:
import plotly.graph_objs as go
import plotly.offline as pyo

# 1. 训练损失的交互式可视化
train_loss_fig = go.Figure()

# 添加训练损失曲线
train_loss_fig.add_trace(go.Scatter(
    x=list(range(1, num_epochs + 1)),  # 横坐标是训练的轮数
    y=train_losses,  # 纵坐标是训练损失
    mode='lines',  # 线形显示
    name='Training Loss',  # 曲线的标签
    line=dict(color='royalblue', width=3, dash='dash')  # 设置颜色、宽度和虚线样式
))

# 更新图表的布局
train_loss_fig.update_layout(
    title='GBDT-Prediction: Training Loss Curve',  # 图表标题
    title_font=dict(size=20, family='Times New Roman', color='black'),  # 设置标题字体
    xaxis=dict(title='Epoch', title_font=dict(size=14, family='Times New Roman', color='black'), gridcolor='lightgray'),  # X轴标签
    yaxis=dict(title='Loss', title_font=dict(size=14, family='Times New Roman', color='black'), gridcolor='lightgray'),  # Y轴标签
    plot_bgcolor='whitesmoke',  # 设置图表背景色
    paper_bgcolor='whitesmoke',  # 设置图表外部背景色
    font=dict(family='Times New Roman', size=12, color='black'),  # 字体设置
    legend=dict(
        x=0.5, y=1.05,  # 设置图例位置
        xanchor='center',  # 图例水平居中
        yanchor='bottom',  # 图例垂直居上
        traceorder='normal',  # 图例顺序
        orientation='h',  # 图例显示为水平排列(即一行多列)
        font=dict(family='Times New Roman', size=12, color='black'),
        bgcolor='rgba(255, 255, 255, 0.5)',  # 图例背景颜色
        bordercolor='black', borderwidth=1  # 图例边框颜色和宽度
    ),
    showlegend=True,  # 显示图例
    width=1200,  # 设置图表宽度
    height=600  # 设置图表高度
)

# 设置图表边缘的阴影效果
train_loss_fig.update_layout(
    xaxis=dict(showgrid=True, zeroline=False, showline=True, linecolor='gray'),  # X轴样式
    yaxis=dict(showgrid=True, zeroline=False, showline=True, linecolor='gray'),  # Y轴样式
    margin=dict(l=50, r=50, t=50, b=50)  # 设置图表的边距
)

# 显示训练损失曲线
train_loss_fig.show()

# 2. 预测与实际结果的交互式可视化
# 创建实际结果的曲线
timestep = 100
actual_trace = go.Scatter(
    x=list(range(timestep)),  # 横坐标是时间步
    y=y_test_original[:timestep].flatten(),  # 纵坐标是实际测试数据(前100个数据点)
    mode='lines+markers',  # 线条和数据点显示
    name='Actual',  # 曲线名称
    line=dict(color='mediumseagreen', width=3, shape='spline'),  # 设置颜色、宽度和曲线类型
    marker=dict(symbol='circle', size=8, color='white', line=dict(color='mediumseagreen', width=2))  # 数据点样式(空心)
)

# 创建预测结果的曲线
predicted_trace = go.Scatter(
    x=list(range(timestep)),  # 横坐标是时间步
    y=predictions[:timestep].flatten(),  # 纵坐标是模型的预测数据(前100个数据点)
    mode='lines+markers',  # 线条和数据点显示
    name='Predicted',  # 曲线名称
    line=dict(color='indianred', width=3, dash='dot'),  # 设置颜色、宽度和虚线样式
    marker=dict(symbol='x', size=8, color='white', line=dict(color='indianred', width=2))  # 数据点样式(空心)
)

# 创建包含实际和预测数据的图表
comparison_fig = go.Figure(data=[actual_trace, predicted_trace])

# 更新布局设置
comparison_fig.update_layout(
    title='GBDT-Prediction: Actual vs Predicted',  # 图表标题
    title_font=dict(size=20, family='Times New Roman', color='black'),  # 标题字体
    xaxis=dict(title='Time Step', title_font=dict(size=14, family='Times New Roman', color='black'), gridcolor='lightgray'),  # X轴标签
    yaxis=dict(title='Load', title_font=dict(size=14, family='Times New Roman', color='black'), gridcolor='lightgray'),  # Y轴标签
    plot_bgcolor='whitesmoke',  # 图表背景颜色
    paper_bgcolor='whitesmoke',  # 外部背景颜色
    font=dict(family='Times New Roman', size=12, color='black'),  # 字体设置
    legend=dict(
        x=0.5, y=1.05,  # 设置图例位置
        xanchor='center',  # 图例水平居中
        yanchor='bottom',  # 图例垂直居上
        traceorder='normal',  # 图例顺序
        orientation='h',  # 图例显示为水平排列(即一行多列)
        font=dict(family='Times New Roman', size=12, color='black'),
        bgcolor='rgba(255, 255, 255, 0.5)',  # 图例背景颜色
        bordercolor='black', borderwidth=1  # 图例边框颜色和宽度
    ),
    showlegend=True,  # 显示图例
    width=1200,  # 设置图表宽度
    height=600  # 设置图表高度
)

# 为图表添加渐变背景色
comparison_fig.update_layout(
    paper_bgcolor='rgba(240, 240, 240, 0.9)',  # 外部背景渐变色
    plot_bgcolor='rgba(255, 255, 255, 0.9)',  # 图表区域背景渐变色
)

# 显示预测与实际结果的图表
comparison_fig.show()