In [13]:
import os
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from scipy.ndimage import zoom
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import random

In [14]:
def load_and_split_data(path):
    # 创建字典存储不同物种的文件路径
    species_dict = {}
    
    # 遍历文件夹收集文件
    for filename in os.listdir(path):
        if filename.endswith('.npy'):
            parts = filename.split('_')
            if len(parts) >= 2 and parts[0].isdigit():
                species_id = int(parts[0])  # 转换为整数
                file_path = os.path.join(path, filename)
                
                if species_id not in species_dict:
                    species_dict[species_id] = []
                species_dict[species_id].append(file_path)
    
    # 初始化存储列表
    all_train_data = []
    all_train_label = []
    all_test_data = []
    all_test_label = []
    
    # 处理每个物种
    for species_id, file_list in species_dict.items():
        # 确保每个物种至少有2个样本才能划分
        if len(file_list) < 2:
            continue
            
        # 分层抽样：90%训练，10%测试
        train_files, test_files = train_test_split(
            file_list, test_size=0.1, random_state=42
        )
        
        # 处理训练样本
        for file_path in train_files:
            arr = np.load(file_path)
            # 调整大小为128x128
            resized = zoom(arr, (1, 128/arr.shape[1]), order=1)
            all_train_data.append(resized)
            all_train_label.append(species_id)  # 添加标签
        
        # 处理测试样本
        for file_path in test_files:
            arr = np.load(file_path)
            resized = zoom(arr, (1, 128/arr.shape[1]), order=1)
            all_test_data.append(resized)
            all_test_label.append(species_id)  # 添加标签
    
    # 将数据列表转换为三维数组
    train_data = np.dstack(all_train_data)  # 形状为 (128, 128, n_train)
    test_data = np.dstack(all_test_data)    # 形状为 (128, 128, n_test)
    
    # 将标签列表转换为numpy数组
    train_label = np.array(all_train_label)
    test_label = np.array(all_test_label)
    
    return train_data, test_data, train_label, test_label


In [15]:
file_paths = "Bird_Song/Mel"
train_data, test_data, train_label, test_label = load_and_split_data(file_paths)

In [16]:
print(np.shape(train_data))
print(np.shape(test_data))

(128, 128, 2794)
(128, 128, 350)


In [17]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")

使用设备: cuda


In [18]:
# 声学数据增强类
class AcousticTransform:
    """声学数据增强，避免几何变换"""
    def __init__(self, time_warp=0.1, freq_mask=2, time_mask=2, noise_std=0.05):
        self.time_warp = time_warp
        self.freq_mask = freq_mask
        self.time_mask = time_mask
        self.noise_std = noise_std
    
    def __call__(self, spectrogram):
        # 时间扭曲（Time Warping）
        if self.time_warp > 0 and torch.rand(1).item() > 0.5:
            _, width = spectrogram.shape
            warp_amount = int(self.time_warp * width * torch.randn(1).item())
            warp_amount = max(1, min(width-2, abs(warp_amount)))
            start = torch.randint(0, width - warp_amount, (1,)).item()
            spectrogram[:, start:start+warp_amount] = zoom(
                spectrogram[:, start:start+warp_amount].numpy(), 
                (1, 1 + 0.1 * torch.randn(1).item()), 
                order=1
            )
        
        # 频率掩码（Frequency Masking）
        if self.freq_mask > 0 and torch.rand(1).item() > 0.5:
            height, _ = spectrogram.shape
            mask_height = int(self.freq_mask * torch.rand(1).item() * height)
            mask_start = torch.randint(0, height - mask_height, (1,)).item()
            spectrogram[mask_start:mask_start+mask_height, :] = 0
        
        # 时间掩码（Time Masking）
        if self.time_mask > 0 and torch.rand(1).item() > 0.5:
            _, width = spectrogram.shape
            mask_width = int(self.time_mask * torch.rand(1).item() * width)
            mask_start = torch.randint(0, width - mask_width, (1,)).item()
            spectrogram[:, mask_start:mask_start+mask_width] = 0
        
        # 添加高斯噪声
        if self.noise_std > 0 and torch.rand(1).item() > 0.5:
            noise = torch.randn_like(spectrogram) * self.noise_std
            spectrogram += noise
        
        return spectrogram

In [19]:
# 自定义数据集类
class AcousticDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return self.data.shape[2]
    
    def __getitem__(self, idx):
        # 获取单个频谱图 (128, 128)
        spectrogram = self.data[:, :, idx]
        
        # 转换为张量并添加通道维度
        spectrogram = torch.tensor(spectrogram, dtype=torch.float32).unsqueeze(0)
        
        # 应用数据增强
        if self.transform:
            spectrogram = self.transform(spectrogram)
        
        # 获取标签
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        return spectrogram, label

In [20]:
# 声学分类CNN模型
class AcousticCNN(nn.Module):
    def __init__(self, num_classes=85):
        super(AcousticCNN, self).__init__()
        
        # 卷积块1
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # 卷积块2
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # 卷积块3
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # 卷积块4
        self.conv4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # 自适应池化
        self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))
        
        # 全连接层
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 4 * 4, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            nn.Linear(512, num_classes)
        )
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

In [21]:
# 测试函数（用于训练中的中间测试）
def evaluate_on_subset(model, test_loader, subset_size=200):
    model.eval()
    correct = 0
    total = 0
    all_preds = []
    all_labels = []
    
    # 随机选择子集索引
    total_samples = len(test_loader.dataset)
    indices = random.sample(range(total_samples), min(subset_size, total_samples))
    subset = Subset(test_loader.dataset, indices)
    subset_loader = DataLoader(subset, batch_size=32, shuffle=False)
    
    with torch.no_grad():
        for inputs, labels in subset_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = 100 * correct / total
    print(f'测试子集准确率: {accuracy:.2f}% ({correct}/{total})')
    
    # 计算每个类别的准确率
    class_correct = np.zeros(85)
    class_total = np.zeros(85)
    
    for label, pred in zip(all_labels, all_preds):
        class_total[label] += 1
        if label == pred:
            class_correct[label] += 1
    
    # 打印表现最差的3个类别
    class_acc = class_correct / (class_total + 1e-8) * 100
    worst_classes = np.argsort(class_acc)[:3]
    print("表现最差的3个类别:")
    for c in worst_classes:
        print(f'类别 {c}: 准确率 {class_acc[c]:.2f}% ({int(class_correct[c])}/{int(class_total[c])})')
    
    return accuracy

In [22]:
# 训练函数
def train_model(model, train_loader, val_loader, test_loader, criterion, optimizer, num_epochs=50):
    best_acc = 0.0
    train_losses = []
    val_losses = []
    val_accs = []
    test_subset_accs = []
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        # 训练阶段
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_loss)
        
        # 验证阶段
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_loss = val_loss / len(val_loader.dataset)
        val_losses.append(val_loss)
        val_acc = 100 * correct / total
        val_accs.append(val_acc)
        
        print(f'Epoch {epoch+1}/{num_epochs} - '
              f'Train Loss: {epoch_loss:.4f} - '
              f'Val Loss: {val_loss:.4f} - '
              f'Val Acc: {val_acc:.2f}%')
        
        # 每5个epoch在测试集子集上评估
        if (epoch + 1) % 5 == 0 or epoch == 0:
            test_acc = evaluate_on_subset(model, test_loader, subset_size=200)
            test_subset_accs.append((epoch+1, test_acc))
        
        # 保存最佳模型
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            print(f'保存新的最佳模型，验证准确率: {best_acc:.2f}%')
    
    # 绘制训练曲线
    plt.figure(figsize=(15, 10))
    
    plt.subplot(2, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.title('训练和验证损失')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(2, 2, 2)
    plt.plot(val_accs, label='Val Accuracy')
    plt.title('验证准确率')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    
    # 绘制测试子集准确率
    if test_subset_accs:
        epochs, accs = zip(*test_subset_accs)
        plt.subplot(2, 2, 3)
        plt.plot(epochs, accs, 'ro-', label='Test Subset Accuracy')
        plt.title('测试子集准确率')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy (%)')
        plt.legend()
    
    plt.tight_layout()
    plt.savefig('training_curve.png')
    
    return model

In [23]:
# 最终测试函数
def test_model(model, test_loader):
    model.eval()
    all_labels = []
    all_preds = []
    
    with torch.no_grad():
        correct = 0
        total = 0
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
    
    accuracy = 100 * correct / total
    print(f'测试准确率: {accuracy:.2f}%')
    
    # 生成分类报告
    print("\n分类报告:")
    print(classification_report(all_labels, all_preds, target_names=[f'Class {i}' for i in range(85)], zero_division=0))
    
    # 绘制混淆矩阵
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(20, 20))
    sns.heatmap(cm, annot=False, fmt='d', cmap='Blues', 
                xticklabels=False, yticklabels=False)
    plt.title('混淆矩阵')
    plt.xlabel('预测标签')
    plt.ylabel('真实标签')
    plt.savefig('confusion_matrix.png', dpi=300)
    
    return accuracy


In [None]:
transform = AcousticTransform(
    time_warp=0.1, 
    freq_mask=2, 
    time_mask=2, 
    noise_std=0.05
)
# 创建数据集
train_dataset = AcousticDataset(train_data, train_label, transform=transform)
test_dataset = AcousticDataset(test_data, test_label)

# 划分训练集和验证集 (90% 训练, 10% 验证)
train_size = int(0.9 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = random_split(
    train_dataset, [train_size, val_size]
)

# 创建数据加载器
batch_size = 32
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

# 初始化模型
model = AcousticCNN(num_classes=85).to(device)
print(f"模型参数数量: {sum(p.numel() for p in model.parameters()):,}")

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

# 训练模型
print("开始训练...")
model = train_model(model, train_loader, val_loader, test_loader, criterion, optimizer, num_epochs=50)

# 加载最佳模型
print("加载最佳模型进行最终测试...")
model.load_state_dict(torch.load('best_model.pth'))

# 在测试集上评估
test_acc = test_model(model, test_loader)
print(f"最终测试准确率: {test_acc:.2f}%")

模型参数数量: 2,530,069
开始训练...
