In [None]:
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
%pip install tensorboard

In [None]:
import torchvision.models as models


In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np


class MyDataset(Dataset):
    def __init__(self, folder_path):
        self.data = []
        self.labels = []
        self.max_length = 0
        self.feature_count = 0
        label_map = {}  # 用于映射字符标签到整数标签的字典
        label_index = 0
        for file_name in os.listdir(folder_path):
            if file_name.endswith('embedding_txt'):
                self.feature_count += 1
                file_path = os.path.join(folder_path, file_name)
                with open(file_path, 'r') as f:
                    for line in f:
                        mfcc_features = [float(x) for x in line.split()]
                        if len(mfcc_features) > self.max_length:
                           self.max_length = len(mfcc_features)
        dataset_size = self.feature_count
        print("Feature amounts: ",self.feature_count)
        print("Dataset size: ",dataset_size)
                    
        for file_name in os.listdir(folder_path):
            if file_name.endswith('embedding_txt'):
                file_path = os.path.join(folder_path, file_name)
                with open(file_path, 'r') as f:
                    for line in f:
                        mfcc_features = [float(x) for x in line.split()]
                        # 填充特征向量到相同长度
                        if len(mfcc_features)< self.max_length:
                            mfcc_features = self.paddingByMaxLength(mfcc_features)
                        mfcc_features_tensor = torch.tensor(mfcc_features, dtype=torch.float32)
                        self.data.append(mfcc_features_tensor)
                        speaker_id = os.path.basename(file_path)[:3]
                        if speaker_id not in label_map:
                            label_map[speaker_id] = label_index
                            label_index += 1
                        self.labels.append(label_map[speaker_id])
        print("Label size: ",len(self.labels))
                        
    def paddingByMaxLength(self, features):
        if len(features) < self.max_length:
            padded_features = features + [0.0] * (self.max_length - len(features))
            return padded_features
        return features

    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        features_tensor = self.data[idx]
        label_tensor = torch.tensor(int(self.labels[idx]), dtype=torch.long)
        return {'features': features_tensor, 'label': label_tensor}


dataset_size = 0       
batch_size = 64
train_path = '../../split_data/train'
valid_path = '../../split_data/validation'
test_path = '../../split_data/test'
train_dataset = MyDataset(train_path)
val_dataset = MyDataset(valid_path)
test_dataset = MyDataset(test_path)
def train_dataloader(train_dataset):
        return DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

def val_dataloader(val_dataset):
        return DataLoader(dataset=val_dataset, batch_size=batch_size)

def test_dataloader(test_dataset):
        return DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)

#loading dataloader
trainloader = train_dataloader(train_dataset)
validationloader = val_dataloader(val_dataset)
testloader = test_dataloader(test_dataset)


In [None]:
from PIL import Image
from torchvision import transforms

class MyMelDataset(Dataset):
    
    def __init__(self, folder_path,transform):
        self.transform = transform
        self.data = []
        self.labels = []
        self.feature_count = 0
        label_map = {}  # 用于映射字符标签到整数标签的字典
        label_index = 0
        for file_name in os.listdir(folder_path):
            if file_name.endswith('png'):
                self.feature_count += 1
                img_name = os.path.join(folder_path,file_name)
                image = Image.open(img_name)
                image_tensor = self.transform(image)
                self.data.append(image_tensor)
                speaker_id = os.path.basename(file_name)[:3]
                if speaker_id not in label_map:
                            label_map[speaker_id] = label_index
                            label_index += 1
                self.labels.append(label_map[speaker_id])
        print("Feature amounts: ", self.feature_count)        
        print("Label size: ",len(self.labels))
        
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        features_tensor = self.data[idx]
        label_tensor = torch.tensor(int(self.labels[idx]), dtype=torch.long)
        return {'features': features_tensor, 'label': label_tensor}
    
transform = transforms.Compose([
    transforms.Resize((277, 109)),  # 调整图像大小
    transforms.ToTensor(),           # 转换为张量
    transforms.Normalize(mean=[0.5], std=[0.5])  # 标准化
])

train_path_Mel = '../../split_data/train_Mel'
valid_path_Mel = '../../split_data/validation_Mel'
test_path_Mel = '../../split_data/test_Mel'

train_dataset_Mel= MyMelDataset(train_path_Mel,transform=transform)
val_dataset_Mel = MyMelDataset(valid_path_Mel,transform = transform)
test_dataset_Mel = MyMelDataset(test_path_Mel, transform= transform)

trainloader_Mel = train_dataloader(train_dataset_Mel)
validationloader_Mel = val_dataloader(val_dataset_Mel)
testloader_Mel = test_dataloader(test_dataset_Mel)

In [None]:

for data in trainloader_Mel:
    print("Features shape:", data['features'].shape)
    print("Labels shape:", data['label'].shape)
    print(data['label'])

In [None]:
resNetModel = models.resnet50(pretrained = True)
# 获取第一个卷积层的权重
conv1_weight = resNetModel.conv1.weight

# 将通道数修改为1
modified_conv1_weight = conv1_weight[:, :4, :, :]

# 修改模型的第一个卷积层的权重
resNetModel.conv1 = torch.nn.Conv2d(4, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
#resNetModel.conv1.weight.data = modified_conv1_weight

# 检查修改后的模型结构
print(resNetModel)

In [None]:
from torch import nn
from torch import optim
from torch.utils.tensorboard import SummaryWriter
from matplotlib import pyplot as plt

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resNetModel.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(resNetModel.parameters(), lr=0.001)

log_dir = "./logs/resNet50"
writer = SummaryWriter(log_dir)

best_accuracy = 0.0  # 初始化最佳准确率为0
best_model_path = "./best_restNetmodel.pth"  # 模型保存路径

train_losses = []
validations = []
print("Number of mini-batches in one epoch:", len(trainloader_Mel))
for epoch in range(100):
    running_loss = 0.0  
    for i, data in enumerate(trainloader_Mel, 0):
        inputs_origin = data['features']
        inputs_new = inputs_origin.squeeze(1).squeeze(2)
        inputs, labels = inputs_new.to(device), data['label'].to(device)  
        optimizer.zero_grad()

        outputs = resNetModel(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % len(trainloader_Mel) == len(trainloader_Mel) - 1:
            average_loss = running_loss / 100
            print(f"Epoch {epoch+1}, Batch {i+1}, Loss: {average_loss:.6f}")

            train_losses.append(average_loss)

            running_loss = 0.0

    # 在每个epoch结束后执行验证集评估
    correct = 0
    total = 0
    with torch.no_grad():
        for data in validationloader_Mel:
            inputs_origin = data['features']
            inputs_new = inputs_origin.squeeze(1).squeeze(2)
            inputs, labels = inputs_new.to(device), data['label'].to(device)
            outputs = resNetModel(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * (correct / total)
    validations.append(accuracy)
    print(f"Epoch {epoch+1}, Validation Accuracy: {accuracy:.6f}%")

    # 如果当前模型在验证集上表现优于之前的最佳表现，则保存当前模型参数
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(resNetModel.state_dict(), best_model_path)
        print("Best model saved with validation accuracy:", best_accuracy)
print("After training, the best model saved with validation accuracy:", best_accuracy)    
plt.plot(validations, label = "Validation Accuracy")
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend()
print("resNetModel Training finished")

In [None]:
mobileNetModel = models.mobilenet_v2(pretrained = True)
# 修改第一个卷积层的输入通道数
mobileNetModel.features[0][0] = nn.Conv2d(4, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)

# 打印修改后的模型结构
print(mobileNetModel)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
mobileNetModel.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(mobileNetModel.parameters(), lr=0.001)

log_dir = "./logs/mobileNet_v2"
writer = SummaryWriter(log_dir)

best_accuracy = 0.0  # 初始化最佳准确率为0
best_model_path = "./best_mobilemodel.pth"  # 模型保存路径

validations = []

print("Number of mini-batches in one epoch:", len(trainloader_Mel))
for epoch in range(100):
    running_loss = 0.0  
    for i, data in enumerate(trainloader_Mel, 0):
        inputs_origin = data['features']
        inputs_new = inputs_origin.squeeze(1).squeeze(2)
        inputs, labels = inputs_new.to(device), data['label'].to(device)  
        optimizer.zero_grad()

        outputs = mobileNetModel(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % len(trainloader_Mel) == len(trainloader_Mel) - 1:
            average_loss = running_loss / 100
            print(f"Epoch {epoch+1}, Batch {i+1}, Loss: {average_loss:.6f}")

            running_loss = 0.0

    # 在每个epoch结束后执行验证集评估
    correct = 0
    total = 0
    with torch.no_grad():
        for data in validationloader_Mel:
            inputs_origin = data['features']
            inputs_new = inputs_origin.squeeze(1).squeeze(2)
            inputs, labels = inputs_new.to(device), data['label'].to(device)
            outputs = mobileNetModel(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * (correct / total)
    validations.append(accuracy)
    print(f"Epoch {epoch+1}, Validation Accuracy: {accuracy:.6f}%")

    # 如果当前模型在验证集上表现优于之前的最佳表现，则保存当前模型参数
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        torch.save(mobileNetModel.state_dict(), best_model_path)
        print("Best model saved with validation accuracy:", best_accuracy)

print("After training, the best model saved with validation accuracy:", best_accuracy)   
plt.plot(validations, label = "Validation Accuracy")
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend()
print("mobileNetModel Training finished")

In [None]:
denseNetModel = models.densenet201(pretrained = True)
# 修改第一个卷积层的输入通道数
denseNetModel.features.conv0 = nn.Conv2d(4, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

# 打印修改后的模型结构
print(denseNetModel)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
denseNetModel.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(denseNetModel.parameters(), lr=0.001)

log_dir = "./logs/resNet50"
writer = SummaryWriter(log_dir)

best_accuracy = 0.0  # 初始化最佳准确率为0
best_model_path = "./best_denseNetmodel.pth"  # 模型保存路径
validations = []

print("Number of mini-batches in one epoch:", len(trainloader_Mel))
for epoch in range(100):
    running_loss = 0.0  
    for i, data in enumerate(trainloader_Mel, 0):
        inputs_origin = data['features']
        inputs_new = inputs_origin.squeeze(1).squeeze(2)
        inputs, labels = inputs_new.to(device), data['label'].to(device)  
        optimizer.zero_grad()

        outputs = denseNetModel(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % len(trainloader) == len(trainloader) - 1:
            average_loss = running_loss / 100
            print(f"Epoch {epoch+1}, Batch {i+1}, Loss: {average_loss:.6f}")

            global_step = epoch * len(trainloader) + i
            writer.add_scalar("Loss", average_loss, global_step)

            running_loss = 0.0

    # 在每个epoch结束后执行验证集评估
    correct = 0
    total = 0
    with torch.no_grad():
        for data in validationloader_Mel:
            inputs_origin = data['features']
            inputs_new = inputs_origin.squeeze(1).squeeze(2)
            inputs, labels = inputs_new.to(device), data['label'].to(device)
            outputs = denseNetModel(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * (correct / total)
    validations.append(accuracy)
    print(f"Epoch {epoch+1}, Validation Accuracy: {accuracy:.6f}%")

    # 如果当前模型在验证集上表现优于之前的最佳表现，则保存当前模型参数
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        
        torch.save(denseNetModel.state_dict(), best_model_path)
        print("Best model saved with validation accuracy:", best_accuracy)
print("After training, the best model saved with validation accuracy:", best_accuracy)   
plt.plot(validations, label = "Validation Accuracy")
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend()
print("mobileNetModel Training finished")
print("denseNetModel Training finished")

In [None]:
from torch import nn
from torch import optim
from torch.utils.tensorboard import SummaryWriter
from matplotlib import pyplot as plt

class _Layer1(nn.Module):
    def __init__(self, in_channels, growth_rate):
        super(_Layer1,self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels, growth_rate, kernel_size=3,padding=1, stride=1,bias=False)
        self.bn1 = torch.nn.BatchNorm2d(growth_rate)
        self.relu1 = torch.nn.ReLU(inplace=True)
        self.conv2 = torch.nn.Conv2d(growth_rate,32,kernel_size=3, stride=1, padding=1,bias=False)
    
    def forward(self, x):
        out = self.conv2(self.relu1(self.bn1(self.conv1(x))))
        out = torch.cat([x,out],1)
        return out
    

class _Layer2(nn.Module):
    def __init__(self, in_channels, growth_rate):
        super(_Layer2,self).__init__()
        self.bn1 = torch.nn.BatchNorm2d(in_channels)
        self.relu1 = torch.nn.ReLU(inplace=True)
        self.conv1 = torch.nn.Conv2d(in_channels, growth_rate, kernel_size=3, stride=1,padding=1,bias=False)
        self.bn2 = torch.nn.BatchNorm2d(growth_rate)
        self.relu2 = torch.nn.ReLU(inplace=True)
        self.conv2 = torch.nn.Conv2d(growth_rate,32,kernel_size=3, stride=1,padding=1, bias=False)
    
    def forward(self, x):
        out = self.conv1(self.relu1(self.bn1(x)))
        out = self.conv2(self.relu2(self.bn2(out)))
        out = torch.cat([x,out],1)
        return out
    
class _myNet(nn.Module):
    def __init__(self):
        super(_myNet,self).__init__()
        self.features = torch.nn.Sequential(
        torch.nn.Conv2d(4,64,kernel_size=3,stride=1,padding=1,bias=False),
        torch.nn.BatchNorm2d(64),
        torch.nn.ReLU(inplace=True),
        )
        self.layer1 = _Layer1(64,128)
        self.layer2 = _Layer2(96,128)
        self.layer3 = _Layer2(128,128)
        self.norm = nn.BatchNorm2d(160)
        self.classifier = nn.Linear(in_features=160, out_features=10, bias=False)
        
    
    def forward(self, x):
        out = self.features(x)
        out = self.layer3(self.layer2(self.layer1(out)))
        return out

myModel = _myNet()
print(myModel)

In [None]:

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(myModel.parameters(), lr=0.001)

best_accuracy = 0.0  # 初始化最佳准确率为0
best_model_path = "./best_mymodel.pth"  # 模型保存路径
validations = []

print("Number of mini-batches in one epoch:", len(trainloader_Mel))
for epoch in range(100):
    running_loss = 0.0  
    for i, data in enumerate(trainloader_Mel, 0):
        inputs_origin = data['features']
        inputs_new = inputs_origin.squeeze(1).squeeze(2)
        inputs, labels = inputs_new.to(device), data['label'].to(device)  
        optimizer.zero_grad()
        print("Input size:", inputs_new.size())
        
        outputs = myModel(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % len(trainloader) == len(trainloader) - 1:
            average_loss = running_loss / 100
            print(f"Epoch {epoch+1}, Batch {i+1}, Loss: {average_loss:.6f}")
            running_loss = 0.0

    # 在每个epoch结束后执行验证集评估
    correct = 0
    total = 0
    with torch.no_grad():
        for data in validationloader_Mel:
            inputs_origin = data['features']
            inputs_new = inputs_origin.squeeze(1).squeeze(2)
            inputs, labels = inputs_new.to(device), data['label'].to(device)
            outputs = myModel(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * (correct / total)
    validations.append(accuracy)
    print(f"Epoch {epoch+1}, Validation Accuracy: {accuracy:.6f}%")

    # 如果当前模型在验证集上表现优于之前的最佳表现，则保存当前模型参数
    if accuracy > best_accuracy:
        best_accuracy = accuracy
        
        torch.save(denseNetModel.state_dict(), best_model_path)
        print("Best model saved with validation accuracy:", best_accuracy)
print("After training, the best model saved with validation accuracy:", best_accuracy)   
plt.plot(validations, label = "Validation Accuracy")
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend()
print("mobileNetModel Training finished")
print("denseNetModel Training finished")