<a href="https://colab.research.google.com/github/jjbmsda/EnsembleModel/blob/main/EnsembleModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet18, densenet121
import torchaudio
from torchaudio.datasets import LIBRISPEECH
from torchaudio.transforms import MFCC
from sklearn.metrics import accuracy_score, roc_curve
import numpy as np
import os
import torch.nn.functional as F

# 사용자 정의 PadTrim 함수
def pad_trim(audio, target_length, pad_value=0):
    """Pad or trim the audio tensor to a fixed length."""
    length = audio.size(-1)  # 시간 축 길이
    if length > target_length:  # Trim
        audio = audio[..., :target_length]  # 마지막 차원을 트림
    elif length < target_length:  # Pad
        pad_amount = target_length - length
        pad_shape = list(audio.shape[:-1]) + [pad_amount]  # 기존 차원 유지
        padding = torch.full(pad_shape, pad_value, device=audio.device)
        audio = torch.cat([audio, padding], dim=-1)  # 시간 축에 따라 패딩 추가
    return audio

# 입력 크기 조정 함수
def pad_or_resize(audio, target_height=50, target_width=50):
    """오디오 데이터를 모델에 적합한 크기로 패딩 또는 리사이즈."""
    _, height, width = audio.shape
    if height < target_height or width < target_width:
        # 패딩 추가
        pad_height = max(0, target_height - height)
        pad_width = max(0, target_width - width)
        audio = F.pad(audio, (0, pad_width, 0, pad_height))
    elif height > target_height or width > target_width:
        # 크기 리샘플링
        audio = F.interpolate(audio.unsqueeze(0), size=(target_height, target_width), mode="bilinear", align_corners=False).squeeze(0)
    return audio

# LibriSpeechDataset 클래스
class LibriSpeechDataset(Dataset):
    def __init__(self, dataset, transform=None, target_length=None, label_to_index=None):
        self.dataset = dataset
        self.transform = transform
        self.target_length = target_length
        self.label_to_index = label_to_index

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        audio, sr, label, *_ = self.dataset[idx]
        if self.transform:
            audio = self.transform(audio)
        if self.target_length:
            audio = pad_trim(audio, self.target_length)
        # 크기 조정
        audio = pad_or_resize(audio, target_height=50, target_width=50)
        label_idx = self.label_to_index[label]
        label_tensor = torch.tensor(label_idx, dtype=torch.long)
        return audio, label_tensor

# MFCC 변환
mfcc_transform = MFCC(sample_rate=16000, n_mfcc=13, log_mels=True)

# 데이터 디렉토리 생성
os.makedirs("./data", exist_ok=True)

# LibriSpeech 데이터셋 다운로드 및 로드
train_data = LIBRISPEECH("./data", url="train-clean-100", download=True)
test_data = LIBRISPEECH("./data", url="test-clean", download=True)

# 라벨 매핑 생성
speaker_ids = set()
for _, _, label, *_ in train_data:
    speaker_ids.add(label)
label_to_index = {label: idx for idx, label in enumerate(sorted(speaker_ids))}
index_to_label = {idx: label for label, idx in label_to_index.items()}

# 최대 길이를 계산
max_length = 0
for audio, _, _, *_ in train_data:
    audio = audio[:, :80000]  # 긴 오디오를 5초로 트림
    mfcc = mfcc_transform(audio)
    max_length = max(max_length, mfcc.shape[-1])
target_length = max(max_length, 50)  # 최소 길이를 50으로 보장

# 데이터셋 생성
train_dataset = LibriSpeechDataset(
    train_data,
    transform=mfcc_transform,
    target_length=target_length,
    label_to_index=label_to_index
)
test_dataset = LibriSpeechDataset(
    test_data,
    transform=mfcc_transform,
    target_length=target_length,
    label_to_index=label_to_index
)

# DataLoader에서 배치 크기 축소
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=0)

# ResNet 및 DenseNet 정의
class ResNetModel(nn.Module):
    def __init__(self, num_classes):
        super(ResNetModel, self).__init__()
        self.resnet = resnet18(pretrained=False, num_classes=num_classes)
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.resnet.maxpool = nn.Identity()

    def forward(self, x):
        return self.resnet(x)

class DenseNetModel(nn.Module):
    def __init__(self, num_classes):
        super(DenseNetModel, self).__init__()
        self.densenet = densenet121(pretrained=False)
        self.densenet.features.conv0 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.densenet.features.pool0 = nn.Identity()
        self.densenet.classifier = nn.Linear(self.densenet.classifier.in_features, num_classes)

    def forward(self, x):
        return self.densenet(x)

# GPU 메모리 정리
torch.cuda.empty_cache()

# 모델 초기화
num_classes = len(speaker_ids)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet_model = ResNetModel(num_classes).to(device)
densenet_model = DenseNetModel(num_classes).to(device)

# 손실 함수 및 옵티마이저
criterion = nn.CrossEntropyLoss()
optimizer_resnet = optim.Adam(resnet_model.parameters(), lr=0.001)
optimizer_densenet = optim.Adam(densenet_model.parameters(), lr=0.001)

# Mixed Precision 적용한 학습 함수
def train(model, dataloader, criterion, optimizer):
    model.train()
    total_loss = 0
    scaler = torch.cuda.amp.GradScaler()
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()
    return total_loss / len(dataloader)

# 평가 함수
def evaluate(model, dataloader):
    model.eval()
    preds, targets = [], []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            preds.extend(predicted.cpu().numpy())
            targets.extend(labels.cpu().numpy())
    acc = accuracy_score(targets, preds)
    return acc, preds, targets

# 학습 루프
epochs = 5
for epoch in range(epochs):
    resnet_loss = train(resnet_model, train_loader, criterion, optimizer_resnet)
    densenet_loss = train(densenet_model, train_loader, criterion, optimizer_densenet)
    print(f"Epoch {epoch+1}/{epochs}")
    print(f"ResNet Loss: {resnet_loss:.4f}")
    print(f"DenseNet Loss: {densenet_loss:.4f}")
