In [1]:
import os
import glob
import torch
import torchaudio
import torchaudio.transforms as T
import librosa
import IPython.display as ipd
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

In [2]:
# 파일 개수 확인
train_healthy_paths = list(glob.glob('./SVD/train/healthy/*.wav'))
train_pathology_paths = list(glob.glob('./SVD/train/pathology/*.wav'))
test_healthy_paths = list(glob.glob('./SVD/test/healthy/*.wav'))
test_pathology_paths = list(glob.glob('./SVD/test/pathology/*.wav'))
print(f'train healthy   : {len(train_healthy_paths)} audios')
print(f'train_pathology : {len(train_pathology_paths)} audios')
print(f'test_healthy    : {len(test_healthy_paths)} audios')
print(f'test_pathology  : {len(test_pathology_paths)} audios')

train healthy   : 532 audios
train_pathology : 762 audios
test_healthy    : 100 audios
test_pathology  : 100 audios


In [3]:
# dataset 정의
def load_audios(paths):

    paths = paths
    dataset = []
    for p in paths:
        name = os.path.basename(p)
        name = os.path.splitext(name)[0]
        waveform, sample_rate = torchaudio.load(p)
        dataset.append([waveform, sample_rate, name])

    return dataset

In [4]:
# dataset 생성
h_train = load_audios(train_healthy_paths)
p_train = load_audios(train_pathology_paths)
h_test = load_audios(test_healthy_paths)
p_test = load_audios(test_pathology_paths)

In [5]:
# dataloader
loader_h_train = DataLoader(h_train, batch_size=1, shuffle=False)
loader_p_train = DataLoader(p_train, batch_size=1, shuffle=False)
loader_h_test = DataLoader(h_test, batch_size=1, shuffle=False)
loader_p_test = DataLoader(p_test, batch_size=1, shuffle=False)

In [6]:
# 데이터 증강 함수 정의
def augmentation(mfcc_tensor):
    # 주파수 마스킹
    freq_masking = T.FrequencyMasking(freq_mask_param=3)
    mfcc_tensor = freq_masking(mfcc_tensor)
    
    # 시간 마스킹
    time_masking = T.TimeMasking(time_mask_param=10)
    mfcc_tensor = time_masking(mfcc_tensor)
    
    return mfcc_tensor
# MFCC 이미지 생성 함수 정의
def create_mfcc_images(loader, label, t, augmentation_ratio=0):
    dir = f'./SVD/mfcc/{t}/{label}'
    os.makedirs(dir, exist_ok=True)
    
    augmented_count = int(len(loader) * augmentation_ratio)
    for i, data in enumerate(loader):
        waveform = data[0][0]
        sample_rate = data[1][0]
        name = data[2][0]
        
        # MFCC 변환
        mfcc_transform = T.MFCC(
            sample_rate=sample_rate,
            n_mfcc=13,
            melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 40}
        )
        mfcc = mfcc_transform(waveform)
        
        # 원본 이미지 저장
        plt.figure()
        librosa.display.specshow(mfcc[0].numpy(), x_axis='time')
        plt.colorbar()
        plt.title('MFCC')
        plt.xlabel('Time')
        plt.ylabel('MFCC Coefficients')
        plt.axis('off')
        plt.savefig(f'{dir}/{name}.png', bbox_inches='tight', pad_inches=0)
        plt.close()
        
        # 데이터 증강 및 증강 이미지 저장
        if i < augmented_count:
            augmented_mfcc = augmentation(mfcc)
            plt.figure()
            librosa.display.specshow(augmented_mfcc[0].numpy(), x_axis='time')
            plt.colorbar()
            plt.title('Augmented MFCC')
            plt.xlabel('Time')
            plt.ylabel('MFCC Coefficients')
            plt.axis('off')
            plt.savefig(f'{dir}/{name}_augmented.png', bbox_inches='tight', pad_inches=0)
            plt.close()


In [7]:
# spectrogram image 생성 (5분 정도 소요)
create_mfcc_images(loader_h_train, 'healthy', 'train', augmentation_ratio=1)
create_mfcc_images(loader_p_train, 'pathology', 'train', augmentation_ratio=1)
create_mfcc_images(loader_h_test, 'healthy', 'test', augmentation_ratio=0)
create_mfcc_images(loader_p_test, 'pathology', 'test', augmentation_ratio=0)

In [8]:
# 파일 개수 확인
train_healthy_images = list(glob.glob('./SVD/mfcc/train/healthy/*.png'))
train_pathology_images = list(glob.glob('./SVD/mfcc/train/pathology/*.png'))
test_healthy_images = list(glob.glob('./SVD/mfcc/test/healthy/*.png'))
test_pathology_images = list(glob.glob('./SVD/mfcc/test/pathology/*.png'))
print(f'train healthy   : {len(train_healthy_images)} images')
print(f'train_pathology : {len(train_pathology_images)} images')
print(f'test_healthy    : {len(test_healthy_images)} images')
print(f'test_pathology  : {len(test_pathology_images)} images')

train healthy   : 1064 images
train_pathology : 1524 images
test_healthy    : 100 images
test_pathology  : 100 images


# classification

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torchvision.models import ResNet34_Weights
from sklearn.metrics import confusion_matrix
from torchaudio.transforms import MFCC


  from .autonotebook import tqdm as notebook_tqdm


In [11]:
# trainset
train_path = './SVD/mfcc/train'
trainset = ImageFolder(root=train_path, transform=transforms.Compose([transforms.ToTensor(),
                                                                    transforms.Resize((256, 256))]))
print(trainset)
print(f'\nclass : index\n{trainset.class_to_idx}')

# testset
test_path = './SVD/mfcc/test'
testset = ImageFolder(root=test_path, transform=transforms.Compose([transforms.ToTensor(),
                                                                    transforms.Resize((256, 256))]))
print(testset)
print(f'\nclass : index\n{testset.class_to_idx}')

# dataloader
train_dataloader = DataLoader(trainset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(testset, batch_size=16, shuffle=False)


Dataset ImageFolder
    Number of datapoints: 2588
    Root location: ./SVD/mfcc/train
    StandardTransform
Transform: Compose(
               ToTensor()
               Resize(size=(256, 256), interpolation=bilinear, max_size=None, antialias=True)
           )

class : index
{'healthy': 0, 'pathology': 1}
Dataset ImageFolder
    Number of datapoints: 200
    Root location: ./SVD/mfcc/test
    StandardTransform
Transform: Compose(
               ToTensor()
               Resize(size=(256, 256), interpolation=bilinear, max_size=None, antialias=True)
           )

class : index
{'healthy': 0, 'pathology': 1}


In [12]:
# GPU

device = torch.device("mps")
print(f'Using {device} device')

Using mps device


In [13]:
# 사전학습모델 불러오기
import numpy as np
from torchvision.models import resnet50, ResNet50_Weights, efficientnet_b0, EfficientNet_B0_Weights

# ResNet50 모델
resnet50_model = resnet50(weights=ResNet50_Weights.DEFAULT)
resnet50_model.fc = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(2048, 2)
)
# EfficientNet-B0 모델
efficientnet_b0_model = efficientnet_b0(weights=EfficientNet_B0_Weights.DEFAULT)
efficientnet_b0_model.classifier = nn.Sequential(
    nn.Dropout(p=0.5),
    nn.Linear(1280, 2)
)

# 모델, 손실함수, 옵티마이저
resnet50_model = resnet50_model.to(device)
efficientnet_b0_model = efficientnet_b0_model.to(device)
criterion = nn.CrossEntropyLoss()
resnet50_optimizer = optim.Adam(resnet50_model.parameters(), lr=0.0005)
efficientnet_b0_optimizer = optim.Adam(efficientnet_b0_model.parameters(), lr=0.0005)


In [14]:
# resnet training
for epoch in range(10):
    running_loss = 0.0
    correct = 0
    total = 0
    
    for i, data in enumerate(train_dataloader):
        images, labels = data[0].to(device), data[1].to(device)
        resnet50_optimizer.zero_grad()
        outputs = resnet50_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        resnet50_optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if (i + 1) % 10 == 0:
            accuracy = 100 * correct / total
            print(f'[epoch: {epoch+1} / batch: {i+1:3d}] loss: {running_loss/10:.4f}, accuracy: {accuracy:.2f}%')
            running_loss = 0.0
            correct = 0
            total = 0

print('Finished Training')

# efficient training 
for epoch in range(10):
    running_loss = 0.0
    correct = 0
    total = 0

    for i, data in enumerate(train_dataloader):
        images, labels = data[0].to(device), data[1].to(device)
        efficientnet_b0_optimizer.zero_grad()
        outputs = efficientnet_b0_model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        efficientnet_b0_optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        if (i + 1) % 10 == 0:
            accuracy = 100 * correct / total
            print(f'[epoch: {epoch+1} / batch: {i+1:3d}] loss: {running_loss/100:.4f}, accuracy: {accuracy:.2f}%')
            running_loss = 0.0
            correct = 0
            total = 0

print('Finished Training')

# ResNet50 모델 저장
resnet50_path = './SVD/resnet50_model.pth'
torch.save(resnet50_model.state_dict(), resnet50_path)

# EfficientNet-B0 모델 저장
efficientnet_b0_path = './SVD/efficientnet_b0_model.pth'
torch.save(efficientnet_b0_model.state_dict(), efficientnet_b0_path)


[epoch: 1 / batch:  10] loss: 0.6259, accuracy: 62.81%
[epoch: 1 / batch:  20] loss: 0.6094, accuracy: 65.00%
[epoch: 1 / batch:  30] loss: 0.4955, accuracy: 75.31%
[epoch: 1 / batch:  40] loss: 0.5073, accuracy: 74.69%
[epoch: 1 / batch:  50] loss: 0.4809, accuracy: 77.81%
[epoch: 1 / batch:  60] loss: 0.4675, accuracy: 78.12%
[epoch: 1 / batch:  70] loss: 0.4503, accuracy: 80.31%
[epoch: 1 / batch:  80] loss: 0.4231, accuracy: 78.75%
[epoch: 2 / batch:  10] loss: 0.4361, accuracy: 79.69%
[epoch: 2 / batch:  20] loss: 0.3762, accuracy: 81.88%
[epoch: 2 / batch:  30] loss: 0.4430, accuracy: 79.69%
[epoch: 2 / batch:  40] loss: 0.3863, accuracy: 83.75%
[epoch: 2 / batch:  50] loss: 0.2850, accuracy: 89.69%
[epoch: 2 / batch:  60] loss: 0.3121, accuracy: 86.56%
[epoch: 2 / batch:  70] loss: 0.3035, accuracy: 86.88%
[epoch: 2 / batch:  80] loss: 0.3940, accuracy: 82.81%
[epoch: 3 / batch:  10] loss: 0.3434, accuracy: 84.69%
[epoch: 3 / batch:  20] loss: 0.3258, accuracy: 85.94%
[epoch: 3 

In [18]:

# 테스트 데이터에 대한 예측 결과 얻기
def confusion(resnet50_model, efficientnet_b0_model, loader, resnet50_weight, efficientnet_b0_weight):
    y_true = []
    ensemble_preds = []
    
    resnet50_model.eval()
    efficientnet_b0_model.eval()
    
    with torch.no_grad():
        for data in loader:
            images, labels = data[0].to(device), data[1].to(device)
            resnet50_outputs = resnet50_model(images)
            efficientnet_b0_outputs = efficientnet_b0_model(images)
            
            # Softmax 함수 적용하여 정규화
            resnet50_probabilities = nn.functional.softmax(resnet50_outputs, dim=1)
            efficientnet_b0_probabilities = nn.functional.softmax(efficientnet_b0_outputs, dim=1)
            
            ensemble_probabilities = (resnet50_weight * resnet50_probabilities) + (efficientnet_b0_weight * efficientnet_b0_probabilities)
            preds = torch.argmax(ensemble_probabilities, dim=1)
            
            y_true.extend(labels.tolist())
            ensemble_preds.extend(preds.tolist())
    
    cm = confusion_matrix(y_true, ensemble_preds)
    return cm

# 가중치 탐색 함수
def weight_search(resnet50_model, efficientnet_b0_model, loader):
    best_accuracy = 0
    best_weights = None
    best_metrics = None

    # 가중치 범위 설정 (0.3 ~ 0.7)
    weight_range = np.arange(0.3, 0.71, 0.05)
    for resnet50_weight in weight_range:
        efficientnet_b0_weight = 1 - resnet50_weight
        # 주어진 가중치로 평가
        cm = confusion(resnet50_model, efficientnet_b0_model, loader, resnet50_weight, efficientnet_b0_weight)
        accuracy, recall, precision, f1 = metrics(cm)
        # 최고 accuracy 갱신
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_weights = (resnet50_weight, efficientnet_b0_weight)
            best_metrics = (accuracy, recall, precision, f1)

    return best_weights, best_metrics

# metrics 계산 함수
def metrics(cm):
    tn, fp, fn, tp = cm.ravel()
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    recall = (tp) / (tp + fn)
    precision = (tp) / (tp + fp)
    f1 = (2 * recall * precision) / (recall + precision)

    return accuracy, recall, precision, f1

# 최적 가중치 탐색
best_weights, best_metrics = weight_search(resnet50_model, efficientnet_b0_model, test_dataloader)
best_resnet50_weight, best_efficientnet_b0_weight = best_weights

# 최적 가중치로 confusion matrix 계산
cm = confusion(resnet50_model, efficientnet_b0_model, test_dataloader, best_resnet50_weight, best_efficientnet_b0_weight)

# 최적 가중치와 confusion matrix 출력
print(f"Best Weights: ResNet50 = {best_resnet50_weight:.2f}, EfficientNet-B0 = {best_efficientnet_b0_weight:.2f}")
print(cm)

# 최적 가중치로 계산된 metrics 출력
accuracy, recall, precision, f1 = best_metrics
print(f'accuracy: {accuracy:.4f}, recall: {recall:.4f}, precision: {precision:.4f}, f1: {f1:.4f}')

[[83 17]
 [23 77]]
accuracy: 0.8000, recall: 0.7700, precision: 0.8191, f1: 0.7938
