<a href="https://colab.research.google.com/github/jingvf/IDS/blob/main/Pytorch_CNNGRU_noDP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd

# 加载数据
data_path = '/content/drive/MyDrive/2025paper/dataset/small_dataset.csv'  # 替换为你的数据路径
df = pd.read_csv(data_path)

In [2]:
!pip install opacus -qq

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/254.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━[0m [32m245.8/254.4 kB[0m [31m8.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m254.4/254.4 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
# 输出类别数量
num_classes = df['Class'].nunique()
print(f"总类别数: {num_classes}")

# 输出每个类别的数量
class_counts = df['Class'].value_counts()
print("每个类别的样本数量：")
print(class_counts)
print(df.head())

总类别数: 5
每个类别的样本数量：
Class
Normal          946844
SpoofingRPM      65490
SpoofingGear     59725
DoS              10553
Fuzzy             8967
Name: count, dtype: int64
      Timestamp CAN ID  DLC Data[0] Data[1] Data[2] Data[3] Data[4] Data[5]  \
0  1.478192e+09    316    8      45      29      24      ff      29      24   
1  1.478192e+09    316    8      45      29      24      ff      29      24   
2  1.478195e+09   0140    8      00      00      00      00      08      28   
3  1.478191e+09   0545    8      d8      00      00      8a      00      00   
4  1.478195e+09   043f    8       1      45      60      ff      6b       0   

  Data[6] Data[7]         Class  
0       0      ff   SpoofingRPM  
1       0      ff   SpoofingRPM  
2      2f      15        Normal  
3      00      00        Normal  
4       0       0  SpoofingGear  


In [4]:
from sklearn.preprocessing import StandardScaler
import torch
from sklearn.preprocessing import LabelEncoder
# Define the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 1. CAN ID 转成 int（十六进制字符串转十进制数字）
df["CAN ID"] = df["CAN ID"].apply(lambda x: int(str(x), 16) if isinstance(x, str) else int(x))

# 2. 提取数值特征 (DLC + Data[0-7]) 并转为十进制
feature_cols = ["DLC"] + [f"Data[{i}]" for i in range(8)]
# Use .map instead of .applymap for consistency, although applymap was the one triggering the warning
X_features = df[feature_cols].astype(str).map(lambda x: int(x, 16) if isinstance(x, str) else int(x)).values

# 3. 标准化
scaler = StandardScaler()
X_features_scaled = scaler.fit_transform(X_features)

# 4. CAN ID 单独提取
X_canid = df["CAN ID"].values
le = LabelEncoder()
y_encoded = le.fit_transform(df["Class"].values)
# Calculate class weights
import numpy as np

counts = [946844, 65490, 59725, 10553, 8967] # Using the counts from the previous output
total = sum(counts)
class_weights = [total/c for c in counts]
class_weights = torch.tensor(class_weights, dtype=torch.float32).to(device)

Using device: cuda


In [5]:
from sklearn.model_selection import train_test_split

# ===== 第一步：划分训练集 + 临时集（验证+测试） =====
X_train_cid, X_temp_cid, X_train_feat, X_temp_feat, y_train, y_temp = train_test_split(
    X_canid, X_features_scaled, y_encoded, test_size=0.3, random_state=42
)
# 这里 test_size=0.3 表示 30% 留作验证+测试

# ===== 第二步：从临时集划分验证集和测试集 =====
X_val_cid, X_test_cid, X_val_feat, X_test_feat, y_val, y_test = train_test_split(
    X_temp_cid, X_temp_feat, y_temp, test_size=0.5, random_state=42
)
# test_size=0.5 表示临时集一半做测试，一半做验证

# ===== 输出检查 =====
print("原始数据：")
print(f"X_canid shape = {X_canid.shape}")
print(f"X_features_scaled shape = {X_features_scaled.shape}")
print(f"y_encoded shape = {y_encoded.shape}")

print("\n训练集：")
print(f"CAN IDs shape = {X_train_cid.shape}")
print(f"Features shape = {X_train_feat.shape}")
print(f"Labels shape = {y_train.shape}")

print("\n验证集：")
print(f"CAN IDs shape = {X_val_cid.shape}")
print(f"Features shape = {X_val_feat.shape}")
print(f"Labels shape = {y_val.shape}")

print("\n测试集：")
print(f"CAN IDs shape = {X_test_cid.shape}")
print(f"Features shape = {X_test_feat.shape}")
print(f"Labels shape = {y_test.shape}")

print("\n类别映射：")
print(dict(zip(le.classes_, le.transform(le.classes_))))


原始数据：
X_canid shape = (1091579,)
X_features_scaled shape = (1091579, 9)
y_encoded shape = (1091579,)

训练集：
CAN IDs shape = (764105,)
Features shape = (764105, 9)
Labels shape = (764105,)

验证集：
CAN IDs shape = (163737,)
Features shape = (163737, 9)
Labels shape = (163737,)

测试集：
CAN IDs shape = (163737,)
Features shape = (163737, 9)
Labels shape = (163737,)

类别映射：
{'DoS': np.int64(0), 'Fuzzy': np.int64(1), 'Normal': np.int64(2), 'SpoofingGear': np.int64(3), 'SpoofingRPM': np.int64(4)}


In [6]:
!pip install opacus==1.1.3 -q # 指定稳定版本
import opacus
print(opacus.__version__)

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/181.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m174.1/181.7 kB[0m [31m6.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m181.7/181.7 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h1.1.3


In [7]:
import torch
from torch.utils.data import Dataset, DataLoader

# ======================
# 1. 数据集定义
# ======================
SEQ_LEN = 5
class CANSequenceDataset(Dataset):
    def __init__(self, canid, features, labels, seq_len=SEQ_LEN):
        self.seq_len = seq_len
        self.features = torch.tensor(np.array(features, dtype=np.float32))
        self.canid = torch.tensor(np.array(canid, dtype=np.int64))
        self.labels = torch.tensor(np.array(labels, dtype=np.int64))

    def __len__(self):
        return len(self.labels) - self.seq_len + 1

    def __getitem__(self, idx):
        seq_canid = self.canid[idx:idx+self.seq_len].clone().detach()
        seq_features = self.features[idx:idx+self.seq_len].clone().detach()
        label = self.labels[idx+self.seq_len-1].clone().detach()
        return seq_canid, seq_features, label

In [12]:
BATCH_SIZE = 128

train_dataset = CANSequenceDataset(X_train_cid, X_train_feat, y_train, seq_len=SEQ_LEN)
val_dataset   = CANSequenceDataset(X_val_cid, X_val_feat, y_val, seq_len=SEQ_LEN)
test_dataset  = CANSequenceDataset(X_test_cid, X_test_feat, y_test, seq_len=SEQ_LEN)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
val_loader   = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)
test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)

# 打印一个 batch 的 shape
for batch in train_loader:
    cids, feats, labels = batch
    print("Batch CAN ID shape:", cids.shape)       # (BATCH_SIZE, SEQ_LEN)
    print("Batch Features shape:", feats.shape)    # (BATCH_SIZE, SEQ_LEN, 9)
    print("Batch Labels shape:", labels.shape)     # (BATCH_SIZE,)
    break

Batch CAN ID shape: torch.Size([128, 5])
Batch Features shape: torch.Size([128, 5, 9])
Batch Labels shape: torch.Size([128])


In [13]:
import torch
import torch.nn as nn

class CNN_GRU_Model(nn.Module):
    def __init__(self, num_features, hidden_dim, num_classes,
                 cnn_channels=64, kernel_size=3, num_layers=1):
        super(CNN_GRU_Model_V2, self).__init__()

        # CNN部分 - 处理每个时间步的特征
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=num_features, out_channels=cnn_channels,
                     kernel_size=kernel_size, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(cnn_channels),
            nn.Dropout(0.2),
            nn.Conv1d(in_channels=cnn_channels, out_channels=cnn_channels,
                     kernel_size=kernel_size, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(cnn_channels),
            nn.Dropout(0.2)
        )

        # GRU部分
        self.gru = nn.GRU(input_size=cnn_channels, hidden_size=hidden_dim,
                         batch_first=True, bidirectional=True,  # 使用双向GRU
                         num_layers=num_layers)

        # 分类层
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # 双向GRU输出维度是2*hidden_dim
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # x shape: (batch_size, seq_len, num_features)
        # 转换维度用于CNN: (batch, features, seq_len)
        x = x.transpose(1, 2)
        # CNN特征提取
        cnn_out = self.cnn(x)  # (batch, cnn_channels, seq_len)
        # 转换回GRU需要的维度: (batch, seq_len, cnn_channels)
        gru_input = cnn_out.transpose(1, 2)
        # GRU时序建模
        gru_out, _ = self.gru(gru_input)  # (batch, seq_len, hidden_dim*2)
        # 取最后一个时间步的输出
        last_output = gru_out[:, -1, :]  # (batch, hidden_dim*2)
        # 分类
        output = self.fc(self.dropout(last_output))
        return output

In [14]:
import torch.optim as optim
from tqdm import tqdm
EPOCHS=5

for epoch in range(EPOCHS):
    model.train()
    total_loss, total_correct = 0, 0

    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}", leave=False)
    for cids, feats, labels in progress_bar:
        feats, labels = feats.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(feats)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        preds = torch.argmax(outputs, dim=1)
        total_loss += loss.item() * labels.size(0)
        total_correct += (preds == labels).sum().item()

        # 实时更新进度条状态
        progress_bar.set_postfix({'loss': total_loss/((progress_bar.n+1)*BATCH_SIZE),
                                  'acc': total_correct/((progress_bar.n+1)*BATCH_SIZE)})

    train_loss = total_loss / len(train_dataset)
    train_acc  = total_correct / len(train_dataset)
    print(f"Epoch {epoch+1} → Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

    # 验证
    model.eval()
    val_correct, val_total = 0, 0
    with torch.no_grad():
        for cids, feats, labels in val_loader:
            feats, labels = feats.to(device), labels.to(device)
            outputs = model(feats)
            preds = torch.argmax(outputs, dim=1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)
    val_acc = val_correct / val_total
    print(f"Epoch {epoch+1} → Val Acc: {val_acc:.4f}")

    # 测试集最终评估
    # -------------------------------
    model.eval()
    test_correct, test_total = 0, 0
    with torch.no_grad():
        for cids, feats, labels in test_loader:
            feats, labels = feats.to(device), labels.to(device)
            outputs = model(feats)
            preds = torch.argmax(outputs, dim=1)
            test_correct += (preds == labels).sum().item()
            test_total += labels.size(0)

    test_acc = test_correct / test_total
    print(f"\nFinal Test Accuracy: {test_acc:.4f}")



Epoch 1 → Train Loss: 0.0102, Train Acc: 0.9877
Epoch 1 → Val Acc: 0.9889

Final Test Accuracy: 0.9889




Epoch 2 → Train Loss: 0.0086, Train Acc: 0.9883
Epoch 2 → Val Acc: 0.9886

Final Test Accuracy: 0.9885




Epoch 3 → Train Loss: 0.0081, Train Acc: 0.9885
Epoch 3 → Val Acc: 0.9893

Final Test Accuracy: 0.9892




Epoch 4 → Train Loss: 0.0077, Train Acc: 0.9886
Epoch 4 → Val Acc: 0.9892

Final Test Accuracy: 0.9892




Epoch 5 → Train Loss: 0.0074, Train Acc: 0.9887
Epoch 5 → Val Acc: 0.9893

Final Test Accuracy: 0.9893


After installing the library, please run the code cell again.