# 1.导包

In [7]:
import os
import gc
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
from PIL import Image
from scipy import signal
import torch.optim as optim
from scipy.io import loadmat
from scipy.signal import stft
from scipy.ndimage import zoom
import matplotlib.pyplot as plt
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from sklearn.preprocessing import LabelEncoder
from torch.optim.lr_scheduler import OneCycleLR
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from torchvision.models import vit_b_16, ViT_B_16_Weights

In [8]:
# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# 定义参数
FS = 512
TARGET_IMAGE_SIZE = 224
# 定义常量
WINDOW_SIZE = 512  # 减小窗口大小以减少内存使用
OVERLAP_RATE = 0.4  # 增加步长以减少生成的图像数量
STRIDE = int(WINDOW_SIZE * (1 - OVERLAP_RATE))
MAX_IMAGES_PER_COLUMN = 500 # 每列最多生成的图像数量
print('successful!')

cpu
successful!


# 2.数据预处理
## （1）数据转CSV格式

In [None]:
file_names = ['97.mat', '105.mat', '118.mat', '130.mat', '169.mat',
              '185.mat', '197.mat', '209.mat', '222.mat', '234.mat']
# 采用驱动端数据
data_columns = ['X097_DE_time', 'X105_DE_time', 'X118_DE_time', 'X130_DE_time', 'X169_DE_time',
                'X185_DE_time','X197_DE_time','X209_DE_time','X222_DE_time','X234_DE_time']
columns_name = ['de_normal','de_7_inner','de_7_ball','de_7_outer','de_14_inner','de_14_ball','de_14_outer','de_21_inner','de_21_ball','de_21_outer']
data_12k_1797_10c = pd.DataFrame()
for index in range(10):
    # 读取MAT文件
    data = loadmat(f'../data_deal/{file_names[index]}')
    dataList = data[data_columns[index]].reshape(-1)
    data_12k_1797_10c[columns_name[index]] = dataList[:121265]  # 121048  min: 121265
print(data_12k_1797_10c.shape)
# # 转换为CSV格式文件
# data_12k_1797_10c.set_index('de_normal',inplace=True)
# data_12k_1797_10c.to_csv('data_12k_1797_10c.csv')

(121265, 10)


## （2）一维数据转二维图像

In [10]:
def _stft_to_spectrogram(window, fs=FS):
    """
    使用短时傅里叶变换 (STFT) 计算信号的对数幅度谱图。
    
    :param window: 时域信号窗口 (1D numpy array)
    :param fs: 采样频率
    :return: 幅度谱矩阵 (频率维 x 时间维)
    """
    # 使用 STFT 计算：f: 频率轴, t: 时间轴, Zxx: STFT 结果 (复数)
    # nperseg 通常取 window_size，但为了简单和保持时间分辨率，此处使用默认。
    # 我们可以设置 nperseg=len(window)，然后 nfft 也可以是 len(window)
    f, t, Zxx = stft(window, fs=fs, nperseg=len(window), noverlap=0, nfft=len(window))
    
    # 取幅度，通常会取对数幅度 (dB) 来增强视觉效果
    # 转换为对数尺度 (分贝)
    spectrogram = np.log1p(np.abs(Zxx)) 
    
    # STFT 默认返回的是双边谱，但通常我们只关心单边谱 (0 到 Nyquist 频率)
    # nperseg=len(window) 时，STFT 返回的频率维度是 len(window)//2 + 1
    # stft(..., nperseg=N) 返回 N/2 + 1 个频率点。
    # Zxx 的维度是 (freq_bins, time_steps)
    
    # 由于 nperseg=len(window) 且 noverlap=0，time_steps 只有 1。
    # 这不是我们想要的。我们需要 STFT 在窗口内滑动。
    
    # 重新计算 STFT，以获得多个时间步
    # 我们希望 spectrogram 的形状是 (频率维, 时间维)，其中时间维要比 1 大。
    # 设置 nperseg 为一个较小的值（例如 1/8 窗口长度），并设置重叠
    nperseg = len(window) // 8
    noverlap = nperseg // 2
    
    f, t, Zxx = stft(window, fs=fs, nperseg=nperseg, noverlap=noverlap, nfft=nperseg)
    
    # 转换为对数尺度
    # 加上一个很小的数避免 log(0)
    spectrogram = np.log1p(np.abs(Zxx) + 1e-10)
    
    # 只取单边谱 (前半部分频率，包含 DC 分量)
    # Zxx 的维度是 (nperseg//2 + 1, num_time_slices)
    # spectrogram = spectrogram[:nperseg//2 + 1, :] # 已经是这个形状
    
    return spectrogram


def generate_stft_images(column_data, window_size, stride, max_images_per_column, target_size=TARGET_IMAGE_SIZE):
    """
    使用短时傅里叶变换（STFT）生成频谱图图像。
    
    :param column_data: 列数据（1D numpy array）
    :param window_size: 滑动窗口大小 (用于提取数据片段)
    :param stride: 步长
    :param max_images_per_column: 每列最多生成的图像数量
    :param target_size: 降采样后的目标图像尺寸
    :return: 包含 (起始索引, 图像数组) 的列表
    """
    images = []
    image_count = 0
    
    for start in range(0, len(column_data) - window_size + 1, stride):
        if image_count >= max_images_per_column:
            break
            
        window = column_data[start:start + window_size].copy()
        
        # 1. 计算 STFT 频谱图
        # spectrogram 矩阵维度: (频率维, 时间维)
        spectrogram = _stft_to_spectrogram(window)
        
        # 2. 归一化到 [0, 1]
        min_val = np.min(spectrogram)
        max_val = np.max(spectrogram)
        if max_val - min_val > 1e-6:
            normalized_spec = (spectrogram - min_val) / (max_val - min_val)
        else:
            normalized_spec = spectrogram * 0.0

        # 3. 图像缩放/插值到目标尺寸 (目标尺寸是 TARGET_IMAGE_SIZE x TARGET_IMAGE_SIZE)
        h, w = normalized_spec.shape
        
        # STFT 时间维 w 依赖于 nperseg 和 noverlap，通常 w << window_size。
        # H 是 STFT 的频率维 (nperseg//2 + 1)。
        
        # 计算缩放比例： target_size / actual_size
        scale_h = target_size / h # 频率轴缩放比例
        scale_w = target_size / w # 时间轴缩放比例
        
        # 使用三次插值进行缩放
        # zoom 会自动处理目标尺寸不整除的情况，结果会是 (target_size, target_size)
        downscaled_image = zoom(normalized_spec, (scale_h, scale_w), order=3) 

        # 4. 堆叠成 3 通道 (RGB)，用于 CNN 输入
        image = np.stack((downscaled_image,) * 3, axis=-1)
        
        images.append((start, image))
        image_count += 1
        
    return images


# --- 4. 图像处理与保存函数 (修改名称和输出信息) ---

def process_and_display_column(data, column_name, window_size, stride, max_images_per_column, target_size=TARGET_IMAGE_SIZE):
    """
    处理并保存指定列的 STFT 频谱图图像。
    """
    print(f"\n--- 正在使用 STFT 处理列: {column_name} ---")
    column_data = data[column_name].values
    
    # 1. 生成 STFT 图像列表
    stft_images = generate_stft_images(column_data, window_size, stride, max_images_per_column, target_size)
    
    # 创建 'dataset' 文件夹
    dataset_folder = 'dataset'
    if not os.path.exists(dataset_folder):
        os.makedirs(dataset_folder)
    
    # 创建列名对应的文件夹
    column_folder = os.path.join(dataset_folder, column_name)
    if not os.path.exists(column_folder):
        os.makedirs(column_folder)
    
    print(f"生成的图像数量: {len(stft_images)}")

    # 2. 遍历并保存图像
    for image_index, (start, image) in enumerate(stft_images):
        plt.figure(figsize=(2.24, 2.24)) 
        # 频谱图通常也用热力图，且通常低频在底部 (origin='lower')
        plt.imshow(image[:, :, 0], cmap='jet', origin='lower') # 换一个 STFT 常用色图
        plt.title(f'{column_name} STFT - Index: {start}', fontsize=8)
        plt.axis('off') # 不显示坐标轴
        
        # 保存图像到指定文件夹
        image_filename = os.path.join(column_folder, f'{column_name}_STFT_{image_index + 1}_S{start}.png')
        plt.savefig(image_filename, bbox_inches='tight', pad_inches=0.1) 
        plt.close() # 立即关闭 figure，释放内存
        
        # 内存管理
        if (image_index + 1) % 50 == 0:
            gc.collect() 
    
    # 3. 内存清理
    del column_data, stft_images
    gc.collect()

# 处理每一列的数据 (保持原代码结构)
for col in data_12k_1797_10c.columns:
    process_and_display_column(data_12k_1797_10c, col, WINDOW_SIZE, STRIDE, MAX_IMAGES_PER_COLUMN)

print("\n所有列处理完成。请检查 'dataset' 文件夹以查看生成的 STFT 频谱图图像。")


--- 正在使用 STFT 处理列: de_normal ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_7_inner ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_7_ball ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_7_outer ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_14_inner ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_14_ball ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_14_outer ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_21_inner ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_21_ball ---
生成的图像数量: 394

--- 正在使用 STFT 处理列: de_21_outer ---
生成的图像数量: 394

所有列处理完成。请检查 'dataset' 文件夹以查看生成的 STFT 频谱图图像。


# 3.数据集划分

In [None]:
# 根据需要模型接口调整图像大小
IMAGE_SIZE = (224, 224) 

# 定义数据集路径
dataset_path = 'dataset'

# 初始化数据列表和标签列表
images = []
labels = []

# 遍历数据集文件夹
for class_name in os.listdir(dataset_path):
    class_path = os.path.join(dataset_path, class_name)
    if os.path.isdir(class_path):
        for image_name in os.listdir(class_path):
            image_path = os.path.join(class_path, image_name)
            if image_path.endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                # 打开并调整图像大小
                img = Image.open(image_path).resize(IMAGE_SIZE, Image.LANCZOS)
                # 将图像转换为numpy数组
                img_array = np.array(img)
                # 添加到图像列表
                images.append(img_array)
                # 添加对应标签
                labels.append(class_name)

# 将图像数据和标签转换为numpy数组
images = np.array(images)
labels = np.array(labels)

# 将类别名称转换为数字标签
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

# 打印数据和标签的形状
print(f'Images shape: {images.shape}')
print(f'Labels shape: {labels_encoded.shape}')

# 归一化数据
images = images.astype('float32')
images = images / 255.0

# 划分数据集
X_train, X_temp, y_train, y_temp = train_test_split(images, labels_encoded, test_size=0.3, random_state=42, stratify=labels_encoded)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.333, random_state=42, stratify=y_temp)

# 打印划分后的数据集形状
print(f'Train images shape: {X_train.shape}')
print(f'Train labels shape: {y_train.shape}')
print(f'Validation images shape: {X_val.shape}')
print(f'Validation labels shape: {y_val.shape}')
print(f'Test images shape: {X_test.shape}')
print(f'Test labels shape: {y_test.shape}')

# # 将数据集保存到文件
# np.save('X_train.npy', X_train)
# np.save('y_train.npy', y_train)
# np.save('X_val.npy', X_val)
# np.save('y_val.npy', y_val)
# np.save('X_test.npy', X_test)
# np.save('y_test.npy', y_test)


Images shape: (3940, 224, 224, 4)
Labels shape: (3940,)
Train images shape: (2758, 224, 224, 4)
Train labels shape: (2758,)
Validation images shape: (788, 224, 224, 4)
Validation labels shape: (788,)
Test images shape: (394, 224, 224, 4)
Test labels shape: (394,)


# 4.定义数据加载器，导入模型

In [12]:
# # X_train 和 y_train 已经是 numpy 数组
# X_train = np.load('/kaggle/working/X_train.npy')
# y_train = np.load('/kaggle/working/y_train.npy')
# X_val = np.load('/kaggle/working/X_val.npy')
# y_val = np.load('/kaggle/working/y_val.npy')

# 将 numpy 数组转换为 torch 张量
X_train_tensor = torch.from_numpy(X_train).float()
y_train_tensor = torch.from_numpy(y_train).long()
X_val_tensor = torch.from_numpy(X_val).float()
y_val_tensor = torch.from_numpy(y_val).long()

# 创建 TensorDataset
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)

# 定义数据加载器
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

print("Data loaded succesfull!")

Data loaded succesfull!


# 5.加载预训练模型，模型微调

In [25]:
# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 假设你有 10 个类别
num_classes = 10 

# 使用效果最好的 ViT-B/16 预训练权重版本
model = vit_b_16(weights=ViT_B_16_Weights.DEFAULT)
# 冻结所有参数 (特征提取阶段)
for param in model.parameters():
    param.requires_grad = False
# 解冻最后一个 Block (索引 11) 的参数
for param in model.encoder.layers[-1].parameters():
    param.requires_grad = True
print("注意：已解冻最后一个 Transformer 编码器块的参数。")

# 获取 ViT 分类头 (model.head) 的输入特征数
# 对于 ViT-B/16，这个值是 768
num_ftrs = 768

# 修改分类头 (model.head) 以适应你的分类任务
# ViT 的分类头就是模型在 `cls` token 上接的一个全连接层
model.head = nn.Sequential(
    # 第一层从 ViT 的输出 (768) 开始
    nn.Linear(num_ftrs, 768),  # 注意：这里我们使用 2048 来匹配你 ResNet 示例中的结构
    nn.ReLU(),
    nn.Dropout(0.4),
    
    # 输出层：连接到你的类别数
    nn.Linear(768, num_classes)
)

# 将模型移动到GPU（如果可用）
model.to(device)

# 打印模型结构
print(model)

# 打印可训练的参数数量 (用于确认冻结策略是否生效)
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"\n模型总参数量: {total_params / 1e6:.2f} M")
print(f"可训练参数量: {trainable_params / 1e6:.2f} M")
print(f"微调层参数量 (model.head): {sum(p.numel() for p in model.head.parameters()) / 1e6:.2f} M")

VisionTransformer(
  (conv_proj): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
  (encoder): Encoder(
    (dropout): Dropout(p=0.0, inplace=False)
    (layers): Sequential(
      (encoder_layer_0): EncoderBlock(
        (ln_1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (self_attention): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
        )
        (dropout): Dropout(p=0.0, inplace=False)
        (ln_2): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (mlp): MLPBlock(
          (0): Linear(in_features=768, out_features=3072, bias=True)
          (1): GELU(approximate='none')
          (2): Dropout(p=0.0, inplace=False)
          (3): Linear(in_features=3072, out_features=768, bias=True)
          (4): Dropout(p=0.0, inplace=False)
        )
      )
      (encoder_layer_1): EncoderBlock(
        (ln_1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (self_a

In [27]:
# 定义损失函数
criterion = nn.CrossEntropyLoss()

# 自定义优化器和学习率调度器
# ViT 通常使用更低的 LR。
LR_CUSTOM_HEAD = 1e-4 # 0.0001 (自定义分类头，可以稍高)
LR_LAST_BLOCK = 1e-5   # 0.00001 (解冻的最后一个 Transformer Block，非常低)

param_groups = [
    # 参数组 1: 自定义分类头 (model.head)
    # 这部分是新初始化的，使用相对较高的 LR
    {'params': model.head.parameters(), 'lr': LR_CUSTOM_HEAD, 'group_name': 'custom_head'},
    
    # 参数组 2: 解冻的最后一个 Transformer 编码器块 (model.encoder.layers[-1])
    # 这部分是预训练的，使用非常低的 LR 进行微调
    {'params': model.encoder.layers[-1].parameters(), 'lr': LR_LAST_BLOCK, 'group_name': 'last_block_fine_tune'}
]

optimizer = optim.AdamW(
    param_groups, 
    weight_decay=1e-4 # 应用于所有参数组
)

# 定义总训练周期和每个周期的批次数量
num_epochs = 200
steps_per_epoch = len(train_loader) 

# OneCycleLR 的 max_lr 参数：对应优化器中每个参数组的最大学习率
scheduler = OneCycleLR(
    optimizer, 
    max_lr=[LR_CUSTOM_HEAD, LR_LAST_BLOCK], # 顺序必须与 param_groups 保持一致
    steps_per_epoch=steps_per_epoch, 
    epochs=num_epochs
)

print("Successful! Vision Transformer 学习率调度策略已设置。")

Successful! Vision Transformer 学习率调度策略已设置。


# 6.模型训练和验证

In [None]:
# --- 早停策略配置 ---
patience = 40 # 连续10个epoch验证损失没有改善就停止
best_val_loss = float('inf') # 初始最佳验证损失设置为无穷大
BEST_LOSS_THRESHOLD = 0.01 # 新增的停止阈值
epochs_no_improve = 0 # 记录没有改善的epoch数量

# 训练模型
for epoch in range(num_epochs):
    # 训练阶段
    model.train()
    train_losses = []
    train_accuracies = []
    running_loss = 0.0
    correct = 0
    total = 0
    for batch_idx, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs[:, :3, :, :]
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        running_loss += loss.item()
        
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    train_accuracy = correct / total
    train_losses.append(epoch_loss)
    train_accuracies.append(train_accuracy)
    
    print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')

    # 验证阶段
    model.eval()
    val_loss = 0.0
    val_losses = []
    val_accuracies = []
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.permute(0, 3, 1, 2)
            inputs = inputs[:, :3, :, :]
            inputs, labels = inputs.to(device), labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    val_accuracy = correct / total
    val_epoch_loss = val_loss / len(val_loader)
    val_losses.append(val_epoch_loss)
    val_accuracies.append(val_accuracy)
    
    print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

    # 策略 1: 检查是否达到预设的最低损失阈值
    if val_epoch_loss <= BEST_LOSS_THRESHOLD:
        print(f"\n✨ 验证损失 {val_epoch_loss:.4f} 达到或低于阈值 {BEST_LOSS_THRESHOLD}，停止训练。")
        torch.save(model.state_dict(), "best_vit_model.pth")
        print("模型已保存。")
        break
        
    # 策略 2: 标准的基于 patience 的早停
    if val_epoch_loss < best_val_loss:
        best_val_loss = val_epoch_loss
        epochs_no_improve = 0
        # 保存最佳模型
        torch.save(model.state_dict(), "best_vit_model.pth")
        print("验证损失降低，保存最佳模型。")
    else:
        epochs_no_improve += 1
        print(f"验证损失未改善，耐心计数: {epochs_no_improve}/{patience}")

    if epochs_no_improve >= patience:
        print(f"\n连续 {patience} 个epoch验证损失没有改善，停止训练。")
        break

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

# 7.模型测试

In [None]:
# # 加载 numpy 数组
# X_test = np.load('/kaggle/working/X_test.npy')
# y_test = np.load('/kaggle/working/y_test.npy')

# 将 numpy 数组转换为 torch 张量
# X_test 转换为 float 类型以匹配模型输入
X_test_tensor = torch.from_numpy(X_test).float()
# y_test 转换为 long 类型以匹配 CrossEntropyLoss 的期望
y_test_tensor = torch.from_numpy(y_test).long()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 创建 TensorDataset
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# 定义数据加载器
batch_size = 32
# 测试集通常不需要打乱，因此 shuffle=False
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

MODEL_PATH = "best_vit_model.pth"
print(f"\n正在加载模型文件：{MODEL_PATH}")
# 使用 map_location 将模型加载到正确的设备上
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
print("模型加载完成。")

print("\n开始进行最终测试...")
model.eval()
all_labels = []
all_preds = []
test_loss = 0.0
with torch.no_grad():
    for inputs, labels in test_loader:
        # 数据预处理
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs[:, :3, :, :]
        inputs, labels = inputs.to(device), labels.to(device)
        
        # 前向传播
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        
        _, predicted = torch.max(outputs.data, 1)
        
        # 收集所有真实标签和预测标签
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())

test_accuracy = np.mean(np.array(all_labels) == np.array(all_preds))
test_epoch_loss = test_loss / len(test_loader)
print(f'Test Loss: {test_epoch_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')