In [1]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import classification_report, f1_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np
from itertools import product
import copy
import os
import json
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [28]:
# ===================== 1. Load and preprocess data =====================
df = pd.read_excel('DataNT.xlsx')
X = df.iloc[:, 6:].values         # Features: sensor1 to sensor30
y = df.iloc[:, 1:5].values        # Labels: 6 gas presence (1/0)

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42
)

X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=32, shuffle=False)

In [29]:
# ===================== 2. Define models =====================
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, dropout, num_layers):
        super().__init__()
        layers = []
        dim = input_dim
        for _ in range(num_layers):
            layers.append(nn.Linear(dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            dim = hidden_dim
        layers.append(nn.Linear(dim, 4))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)


class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, dropout, num_layers):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_dim, 4)

    def forward(self, x):
        x = x.unsqueeze(1)  # (batch, seq=1, feature)
        _, (h_n, _) = self.lstm(x)
        out = self.fc(h_n[-1])
        return out


class TransformerModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_heads, num_layers, dropout=0.1):
        super().__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim,
            nhead=num_heads,
            dropout=dropout,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        self.fc = nn.Linear(hidden_dim, 4)  # 输出 6 维

    def forward(self, x):
        x = x.unsqueeze(1)  # [32, 60] -> [32, 1, 60]
        x = self.embedding(x)
        x = self.encoder(x)
        x = x.squeeze(1)  # [32, 1, hidden_dim] -> [32, hidden_dim]
        x = self.fc(x)
        return x

In [4]:
# ===================== 3. Train & Evaluate Function =====================
# 训练和评估函数
def train_and_evaluate(model, train_loader, test_loader, epochs, lr):
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCEWithLogitsLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

    # 评估
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = torch.sigmoid(outputs).cpu().numpy()
            all_preds.append(preds)
            all_labels.append(labels.cpu().numpy())
    
    y_pred = (np.vstack(all_preds) > 0.5).astype(int)
    y_true = np.vstack(all_labels)

    # 多标签 accuracy（推荐：macro accuracy）
    acc_per_label = (y_true == y_pred).mean(axis=0)
    macro_acc = acc_per_label.mean()

    return macro_acc, model, y_pred

In [36]:
# ===================== 4. Grid Search Settings =====================
# 网格搜索
param_grid = {
    "hidden_dim": [48, 64, 128],
    "num_heads": [2],
    "lr": [0.001, 0.0005],
    "epochs": [300,500],
    "dropout": [0.1],
    "num_layers": [3]
}

# 过滤 hidden_dim 和 num_heads 兼容性
valid_params = []
for params in product(*param_grid.values()):
    hidden_dim, num_heads, lr, epochs, dropout, num_layers = params
    if hidden_dim % num_heads == 0:
        valid_params.append(params)

param_names = list(param_grid.keys())

models = {
    "MLP": MLP,
    "LSTM": LSTMModel,
    "Transformer": TransformerModel
}

results = []
best_acc = -1
best_config = None
best_model = None

In [31]:
for model_name, model_class in models.items():
    for param_values in valid_params:
        params = dict(zip(param_names, param_values))

        if model_name == "Transformer":
            model = model_class(
                input_dim=60,
                hidden_dim=params["hidden_dim"],
                num_heads=params["num_heads"],
                num_layers=params["num_layers"],
                dropout=params["dropout"]
            )
        elif model_name == "LSTM":
            model = model_class(
                input_dim=60,
                hidden_dim=params["hidden_dim"],
                num_layers=params["num_layers"],
                dropout=params["dropout"]
            )
        else:  # MLP
            model = model_class(
                input_dim=60,
                hidden_dim=params["hidden_dim"],
                num_layers=params["num_layers"],
                dropout=params["dropout"]
            )

        acc, trained_model, y_pred = train_and_evaluate(
            model, train_loader, test_loader, 
            params["epochs"], params["lr"]
        )

        results.append((model_name, params, acc))
        print(f"Model: {model_name}, Params: {params}, Accuracy: {acc:.4f}")

        if acc > best_acc:
            best_acc = acc
            best_config = (model_name, params)
            best_model = copy.deepcopy(trained_model)

Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.001, 'epochs': 300, 'dropout': 0.1, 'num_layers': 3}, Accuracy: 0.7396
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.001, 'epochs': 300, 'dropout': 0.1, 'num_layers': 4}, Accuracy: 0.7083
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.001, 'epochs': 500, 'dropout': 0.1, 'num_layers': 3}, Accuracy: 0.7708
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.001, 'epochs': 500, 'dropout': 0.1, 'num_layers': 4}, Accuracy: 0.7396
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.0005, 'epochs': 300, 'dropout': 0.1, 'num_layers': 3}, Accuracy: 0.7500
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.0005, 'epochs': 300, 'dropout': 0.1, 'num_layers': 4}, Accuracy: 0.6875
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr': 0.0005, 'epochs': 500, 'dropout': 0.1, 'num_layers': 3}, Accuracy: 0.7917
Model: MLP, Params: {'hidden_dim': 32, 'num_heads': 2, 'lr'

In [32]:
# 创建保存目录
os.makedirs("model_output", exist_ok=True)

# 保存模型权重
torch.save(best_model.state_dict(), "model_output/best_model.pth")

# 保存配置信息（模型名 + 参数 + F1 分数）
best_model_info = {
    "model_name": best_config[0],
    "params": best_config[1],
    "Acc_score": best_acc
}

with open("model_output/best_model_config.json", "w") as f:
    json.dump(best_model_info, f, indent=4)
print(best_acc)
print("✅ 模型和配置已保存！")


0.8229166666666666
✅ 模型和配置已保存！


In [42]:
from sklearn.metrics import classification_report, multilabel_confusion_matrix
# ==== 读取保存的模型配置信息 ====
with open("model_output/best_model_config_4.0.1.json.", "r") as f:
    model_info = json.load(f)

model_name = model_info["model_name"]
params = model_info["params"]

# ==== 重新构建模型结构 ====
if model_name == "MLP":
    model = MLP(input_dim=60, hidden_dim=params["hidden_dim"], dropout=params["dropout"], num_layers=params["num_layers"])
elif model_name == "LSTM":
    model = LSTMModel(input_dim=60, hidden_dim=params["hidden_dim"], dropout=params["dropout"], num_layers=params["num_layers"])
elif model_name == "Transformer":
    model = TransformerModel(input_dim=60, hidden_dim=params["hidden_dim"], dropout=params["dropout"], num_layers=params["num_layers"])
else:
    raise ValueError("Unknown model name!")

# ==== 加载模型参数 ====
model.load_state_dict(torch.load("model_output/best_model_4.0.1.pth"))
model.eval()

print("✅ 模型加载完毕！现在可以用 model(inputs) 进行预测啦~")
#具体如何预测
with torch.no_grad():
    outputs = model(torch.tensor(X_test, dtype=torch.float32).to(device))
    preds = (torch.sigmoid(outputs) > 0.5).int().cpu().numpy()

    print("\n📊 Classification Report:")
    print(classification_report(y_test, preds))

    # ==== 混淆矩阵 ====
    cm = multilabel_confusion_matrix(y_test, preds)
    print("\n🌀 Confusion Matrix:")
    print(cm)


✅ 模型加载完毕！现在可以用 model(inputs) 进行预测啦~

📊 Classification Report:
              precision    recall  f1-score   support

           0       1.00      0.83      0.91        12
           1       0.54      0.88      0.67         8
           2       0.83      0.91      0.87        11
           3       0.77      0.83      0.80        12

   micro avg       0.77      0.86      0.81        43
   macro avg       0.79      0.86      0.81        43
weighted avg       0.81      0.86      0.82        43
 samples avg       0.77      0.81      0.76        43


🌀 Confusion Matrix:
[[[12  0]
  [ 2 10]]

 [[10  6]
  [ 1  7]]

 [[11  2]
  [ 1 10]]

 [[ 9  3]
  [ 2 10]]]


  model.load_state_dict(torch.load("model_output/best_model_4.0.1.pth"))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
