<a href="https://colab.research.google.com/github/llshao/NLP/blob/master/%E6%8B%9F%E5%90%88%E5%AE%9E%E9%AA%8C%E4%BB%A3%E7%A0%81.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **唐伟老师需要的预测模型**

In [None]:
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# ---- 1. 数据准备 ----
class CustomDataset(Dataset):
    def __init__(self, X_scaled, y_scaled):
        # 示例数据生成（根据实际任务修改）
        self.features = X_scaled.astype(np.float32)
        self.labels = y_scaled.astype(np.float32)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return {
            'input': torch.tensor(self.features[idx]),
            'target': torch.tensor(self.labels[idx])
        }

# ---- 2. 模型定义 ----
class FixedParamNetwork(nn.Module):
    def __init__(self, fixed_params):
        super().__init__()
        hidden_size = 512
        predict_size = 41
        self.register_buffer('fixed_params', fixed_params)
        self.encoder = nn.Sequential(
            nn.Linear(3, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, predict_size)
        )
        self.decoder = nn.Sequential(
            nn.Linear(predict_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, predict_size)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        modulated = encoded * self.fixed_params
        return self.decoder(modulated)

# ---- 3. 训练配置 ----
def train_model(list_step, X_scaled, y_scaled, model_savepath):
    # 固定参数初始化（需与数据规模匹配）
    fixed_params = torch.from_numpy(list_step).float()  # 小随机初始化

    # 模型实例化
    model = FixedParamNetwork(fixed_params)

    # 数据加载
    dataset = CustomDataset(X_scaled, y_scaled)
    train_loader = DataLoader(dataset, batch_size=2, shuffle=True)

    # 优化器配置（仅训练网络参数，排除固定参数）
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()  # 回归任务常用损失

    # ---- 4. 训练循环 ----
    num_epochs = 1000
    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0

        for batch in train_loader:
            inputs = batch['input']
            targets = batch['target']

            # 前向传播
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        # 打印训练信息
        avg_loss = epoch_loss / len(train_loader)
        if epoch % 100 == 0:
          print(f"Epoch {epoch+1}/{num_epochs} | Loss: {avg_loss:.4f}")
          # 加入早停机制
          if avg_loss < 0.0009:
            break

    # ---- 5. 模型保存 ----
    torch.save(model.state_dict(), model_savepath)

# if __name__ == "__main__":
#     train_model(list_step_, X_scaled_, y_scaled_, f"trained_model.pth")

In [None]:


# ------------------- 2. 加载模型权重 -------------------
def load_model(model_path, fixed_params):
    # 必须重新初始化模型（包含相同固定参数）
    model = FixedParamNetwork(fixed_params)

    # 加载权重
    state_dict = torch.load(model_path, map_location='cpu')  # 自动适配CPU/GPU

    # 处理可能的键名不匹配问题
    model.load_state_dict(state_dict, strict=False)  # 处理部分加载

    # 检查参数完整性
    missing, unexpected = model.load_state_dict(state_dict, strict=True)
    if missing:
        print(f"警告：缺少参数 {missing}")
    if unexpected:
        print(f"警告：意外参数 {unexpected}")

    return model.eval()  # 重要：设置为评估模式

# ------------------- 3. 数据预处理 -------------------
class TestDataset(Dataset):
    def __init__(self, data):
        # 确保数据预处理与训练时一致
        self.data = torch.tensor(data, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

# ------------------- 4. 执行预测 -------------------
def predict(model, test_data):
    device = next(model.parameters()).device  # 自动获取模型所在设备

    # 创建数据加载器
    test_dataset = TestDataset(test_data)
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    predictions = []
    with torch.no_grad():  # 关闭梯度计算
        for batch in test_loader:
            batch = batch.to(device)
            outputs = model(batch)
            predictions.append(outputs.cpu().numpy())

    return np.concatenate(predictions, axis=0)

# ------------------- 5. 使用示例 -------------------
# if __name__ == "__main__":
#     # 参数配置
#     MODEL_PATH = "trained_model.pth"
#     FIXED_PARAMS = torch.from_numpy(list_step).float()

#     # 模拟测试数据（实际替换为真实数据）
#     raw_test_data = X_scaled

#     # 加载模型
#     model = load_model(MODEL_PATH, FIXED_PARAMS)

#     # 执行预测
#     predictions = predict(model, raw_test_data)
#     it_predictions = scaler_y.inverse_transform(predictions)


In [None]:
def plot(y, y_, list_step, suffix_, savepath):
  # 创建专业级可视化图表
  plt.figure(figsize=(12, 6), dpi=100)
  ax = plt.gca()

  # 绘制真实值与预测值曲线
  true_line = ax.plot(list_step, y,
                    color='#2c7bb6',
                    linewidth=2.5,
                    linestyle='-',
                    marker='o',
                    markersize=6,
                    markerfacecolor='white',
                    markeredgewidth=1.5,
                    label='True Values')

  pred_line = ax.plot(list_step, y_,
                    color='#d7191c',
                    linewidth=2.5,
                    linestyle='--',
                    marker='s',
                    markersize=6,
                    markerfacecolor='white',
                    markeredgewidth=1.5,
                    label='Predicted Values')

  plt.tight_layout()

  # 图表样式设置
  ax.set_title('True vs Predicted Values Comparison', fontsize=14, pad=20)
  ax.set_xlabel('Frequency', fontsize=12, labelpad=10)
  ax.set_ylabel('Values', fontsize=12, labelpad=10)
  ax.grid(True, color='gray', linestyle='--', linewidth=0.5, alpha=0.6)
  ax.legend(loc='upper left', frameon=True, fontsize=10)
  plt.show()
  plt.savefig(savepath, dpi=300, bbox_inches='tight')

In [None]:
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


def calculate_errors(true, pred):
    """
    计算回归任务的主要误差指标

    参数：
    true : array-like 真实值数组
    pred : array-like 预测值数组

    返回：
    dict 包含各种误差指标的字典
    """
    # 基础统计量
    errors = {
        'MSE': mean_squared_error(true, pred),
        'RMSE': np.sqrt(mean_squared_error(true, pred)),
        'MAE': mean_absolute_error(true, pred),
        'R²': r2_score(true, pred),
        'MaxError': np.max(np.abs(true - pred)),
        'MAPE': np.mean(np.abs((true - pred) / true)) * 100  # 百分比形式
    }

    # 添加相对误差指标
    errors['RelativeError'] = np.linalg.norm(true - pred) / np.linalg.norm(true)

    # 添加分位数误差
    quantiles = [0.25, 0.5, 0.75]
    quantile_errors = np.quantile(np.abs(true - pred), quantiles)
    for q, err in zip(quantiles, quantile_errors):
        errors[f'{int(q*100)}%分位数误差'] = err

    return errors



In [None]:
def data_prepare(X, y):
  start = 92.0
  stop = 96.0
  step = 0.1

  # 生成序列（包含 96.0）
  list_step = np.arange(start, stop + step/2, step)  # +step/2 确保包含终点
  # # delta = [3+i for i in range(0,44,1)]
  # delta = [3+i for i in range(1, 41)]

  # 数据标准化（关键步骤[5](@ref)）
  scaler_X = StandardScaler()
  scaler_y = StandardScaler()
  X_scaled = scaler_X.fit_transform(X)
  y_scaled = scaler_y.fit_transform(y)
  return scaler_X, scaler_y, X_scaled, y_scaled, list_step

df_data = pd.read_csv("/content/drive/MyDrive/tangwei/汇总-MA-训练v3.csv")
starts = [i for i in range(3, 11)]
deltas = [i*8 for i in range(0, 41)]

df_test = pd.read_csv("/content/drive/MyDrive/tangwei/汇总-MA-测试.csv")

predicted_results = []
predicted_columns = []

for start in starts:
  pos = [start+delta for delta in deltas]
  columns = [df_data.columns[i] for i in pos]
  predicted_columns+=columns
  suffix = columns[0][-2:]
  X = df_data.iloc[:, :3].values
  y = df_data.iloc[:, pos].values
  y_test = df_test.iloc[:, pos].values
  X_test = df_test.iloc[:, :3].values
  scaler_X_, scaler_y_, X_scaled_, y_scaled_, list_step_ = data_prepare(X, y)
  scaled_X_test = scaler_X_.transform(X_test)

  # print(f"training the {start}-th model ...")
  # train_model(list_step_, X_scaled_, y_scaled_, f"/content/drive/MyDrive/tangwei/trained_model_{start}.pth")

  # 参数配置
  MODEL_PATH = f"/content/drive/MyDrive/tangwei/trained_model_{start}.pth"
  FIXED_PARAMS = torch.from_numpy(list_step_).float()

  # 模拟测试数据（实际替换为真实数据）
  raw_test_data = scaled_X_test

  # 加载模型
  model = load_model(MODEL_PATH, FIXED_PARAMS)

  # 执行预测
  predictions = predict(model, raw_test_data)
  inverse_transform_predictions = scaler_y_.inverse_transform(predictions)

  # 计算误差指标
  error_metrics = calculate_errors(y_test, inverse_transform_predictions)

  # 格式化输出结果
  print("误差指标评估报告：")
  print("="*40)
  for metric, value in error_metrics.items():
      if metric in ['MAPE']:
          print(f"{metric:>15}: {value:.2f}%")
      elif metric in ['R²']:
          print(f"{metric:>15}: {value:.4f}")
      else:
          print(f"{metric:>15}: {value:.4f}")
  predicted_results.append(inverse_transform_predictions)

  for i in range(len(X_test)):
    plot(y_test[i], inverse_transform_predictions[i], list_step_, suffix,
         f"/content/drive/MyDrive/tangwei/predicted_true_comparison_{suffix}.png")



In [None]:
results = np.concatenate(predicted_results, axis=1)
results = np.concatenate([X_test,results], axis=1)
columns = df_test.columns.tolist()[:3]+predicted_columns

In [None]:
df_preditced = pd.DataFrame(data=results, columns=columns)[df_test.columns]

In [None]:
df_test

In [None]:
df_preditced.to_csv("/content/drive/MyDrive/tangwei/汇总-MA-预测.csv")