In [55]:
import os
import Preprocessor as pp
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio
import torch.nn.functional as F
from torchvision import models
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from tqdm import tqdm

In [56]:
print(torch.__version__)
print(torch.cuda.is_available())

2.7.0+cu126
True


In [57]:
DATASET = "./Dataset/" # 데이터셋 경로
SAVE_BEST_PATH = "./result/efficientnet_v2_l_best.pt"
SAVE_LAST_PATH = "./result/efficientnet_v2_l_last.pt"
CLASS_NAME = ["danger", "fire", "gas", "non", "tsunami"] # 분류할 클래스
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 하이퍼 파라미터
CLASS_NUM = len(CLASS_NAME)
SAMPLE_RATE = 16000
DURATION = 1
BATCH_SIZE = 32
EPOCHS = 5
LEARNING_RATE = 1e-4
WINDOW_SIZE = 512
HOP_SIZE = 160
MEL_BINS = 64
FMIN = 50
FMAX = 8000

In [58]:
class ClassNameError(Exception):
    def __init__(self):
        super().__init__("폴더이름과 클래스이름이 일치 하지 않습니다.")

In [59]:
# 커스텀 데이터셋 정의
class AudioDataset(Dataset):
    def __init__(self, filepaths, labels, training=False, sample_rate=SAMPLE_RATE, duration=DURATION):
        self.filepaths = filepaths # 데이터 경로
        self.labels = labels # 라벨
        self.sample_rate = sample_rate # 샘플링 레이트
        self.num_samples = int(sample_rate * duration) # 오디오 샘플의 길이
        self.training = training

    # 데이터셋의 길이(파일 개수) 반환
    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, idx):
        filepath = self.filepaths[idx]
        label = self.labels[idx]

        waveform, sr = torchaudio.load(filepath) # waveform (channel, length)

        # 모노(1채널)로 변환
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)

        # 원하는 샘플링 레이트가 아니면 리샘플링
        if sr != self.sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.sample_rate)
            waveform = resampler(waveform)

        # 오디오 샘플의 길이조정
        if waveform.shape[1] < self.num_samples: # 길이가 부족하면 0(무음)을 채워 길이를 연장
            waveform = F.pad(waveform, (0, self.num_samples - waveform.shape[1])) # num_samples와 현재의 길이의 차 만큼 0을 패딩
        else:
            waveform = waveform[:, :self.num_samples] # 길이가 길면 슬라이싱

        # 절댓값 정규화(-1 ~ 1)
        waveform = waveform / (waveform.abs().max() + 1e-9)

        logmel = pp.logmel(waveform) # [batch, channel, mel_bins, time]

        # print(f"logmel {logmel.shape}")
        if self.training:
            logmel = pp.spec_augmentation(logmel)

        return logmel, label


In [60]:
# 데이터와 라벨
filepaths, labels = [], []

# 데이터셋과 하위폴더에서 확장자가 "wav"인 파일의 경로와 라벨(폴더이름) 저장
for root, _, files in os.walk(DATASET):
    folder_name = os.path.basename(root)

    # 폴더명이 클래스 리스트에 없는 경우 에러
    if folder_name not in CLASS_NAME and folder_name != "":
        raise ClassNameError
    
    for file in files:
        if not file.lower().endswith(".wav"):
            continue
        
        filepaths.append(os.path.join(root, file))
        labels.append(folder_name)

In [61]:
label_encoder = LabelEncoder()
integer_labels = label_encoder.fit_transform(labels)

In [62]:
print(integer_labels.shape)

(3000,)


In [63]:
# 데이터셋을 8:1:1의 비율을 가진 학습, 검증, 테스트로 나누기
X_train, X_temp, y_train, y_temp = train_test_split(
    filepaths, # X
    integer_labels, # y
    test_size=0.2, # train과 임시데이터셋 비율 8:2
    stratify=labels, # 기준값을 기준으로 동일한 클래스의 비율로 나누기
    random_state=42 # seed
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp,
    y_temp,
    test_size=0.5, # 임시데이터셋을 1:1 비율로 val과 test로 나누기
    stratify=y_temp,
    random_state=42
)

In [64]:
print(len(X_train), len(y_train))
print(len(X_val), len(y_val))
print(len(X_test), len(y_test))

2400 2400
300 300
300 300


In [65]:
# Dataset 객체
train_dataset = AudioDataset(X_train, y_train, training=True) # X 독립변수, y 종속변수
val_dataset   = AudioDataset(X_val,   y_val, training=False)
test_dataset  = AudioDataset(X_test,  y_test, training=False)

# 각각의 데이터셋 로드
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False)

In [66]:
train_iter = iter(train_loader)
inputs, labels = next(train_iter)
print(f"Inputs: {inputs.shape}")
print(f"Labels: {labels.shape}")
print(f"Dtype: {inputs.dtype}\n")

val_iter = iter(val_loader)
inputs, labels = next(val_iter)
print(f"Inputs: {inputs.shape}")
print(f"Labels: {labels.shape}\n")

test_iter = iter(test_loader)
inputs, labels = next(test_iter)
print(f"Inputs: {inputs.shape}")
print(f"Labels: {labels.shape}\n")

Inputs: torch.Size([32, 1, 64, 101])
Labels: torch.Size([32])
Dtype: torch.float32

Inputs: torch.Size([32, 1, 64, 101])
Labels: torch.Size([32])

Inputs: torch.Size([32, 1, 64, 101])
Labels: torch.Size([32])



In [67]:
# 학습함수
def train_epoch(model, loader, optimizer, criterion, device):
    model.train() # 학습모드
    running_loss = 0.0 # 손실
    total_samples = 0
    total_correct = 0

    # 배치 단위로 데이터로드
    for waveforms, labels in tqdm(loader, desc="학습중", leave=True):
        waveforms = waveforms.to(device) # gpu로 전달
        labels = labels.to(device).long()

        optimizer.zero_grad() # 경사값 초기화

        outputs = model(waveforms) # 모델에 데이터입력

        loss = criterion(outputs, labels) # 손실계산
        loss.backward() # 역전파
        optimizer.step() # 가중치 갱신

        # 배치 손실을 누적
        running_loss += loss.item() * waveforms.size(0)

        # 예측 임계값 0.5 이상을 양성 클래스라 판단하여 이진 예측 생성
        predicts = outputs.argmax(dim=1)
        targets = labels.int()

        # 모든 클래스가 일치하는 샘플 개수 카운트, 모든 클래스 다 맞아야 정답 처리
        total_correct += (predicts == targets).sum().item()
        total_samples += labels.size(0)

    avg_loss = running_loss / total_samples # 평균손실
    accuracy = total_correct  / total_samples # 정답 개수 / 전체 예측값 개수

    return avg_loss, accuracy

# 검증함수
def validate(model, loader, criterion, device):
    model.eval() #평가모드
    running_loss = 0.0
    total_samples = 0
    total_correct = 0

    # 검증에는 기울기 계산필요없음, 즉 역전파 없음
    with torch.no_grad():
        for waveforms, labels in tqdm(loader, desc="검증중", leave=True):
            waveforms = waveforms.to(device)
            labels = labels.to(device).long()

            outputs = model(waveforms)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * waveforms.size(0)

            predicts = outputs.argmax(dim=1)
            targets = labels.int()

            total_correct += (predicts == targets).sum().item()
            total_samples += labels.size(0)

    avg_loss = running_loss / total_samples
    accuracy = total_correct  / total_samples
    
    return avg_loss, accuracy

In [68]:
weights = models.EfficientNet_V2_L_Weights.IMAGENET1K_V1
model = models.efficientnet_v2_l(weights=weights)
pre_conv = model.features[0][0]
replace_conv = nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
print(pre_conv)
print(replace_conv)
print(model)


Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
      )
      (1): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_s

In [69]:
with torch.no_grad():
    replace_conv.weight.copy_(pre_conv.weight.mean(dim=1, keepdim=True))
    if pre_conv.bias is not None:
        replace_conv.bias.copy_(pre_conv.bias)

In [70]:
model.features[0][0] = replace_conv
model.classifier[1] = nn.Linear(in_features=1280, out_features=5, bias=True)
model.to(DEVICE)
print(model)

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
      )
      (1): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  

In [71]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [72]:
# 학습 루프
best_val_accuracy = 0.0

for epoch in range(EPOCHS):
    print(f"\nEpoch[{epoch+1}/{EPOCHS}]")

    # 학습
    train_loss, train_acc = train_epoch(model=model, loader=train_loader, optimizer=optimizer, criterion=criterion, device=DEVICE)
    print(f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}", end="")

    # 검증
    val_loss, val_acc = validate(model=model, loader=val_loader, criterion=criterion, device=DEVICE)
    print(f"Validation Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}")

    # 검증 정확도가 가장 좋으면 모델 저장
    if val_acc > best_val_accuracy:
        best_val_accuracy = val_acc
        torch.save(model, SAVE_BEST_PATH)
    
torch.save(model, SAVE_LAST_PATH)

# writer.close()


Epoch[1/5]


학습중: 100%|██████████| 75/75 [00:17<00:00,  4.35it/s]


Train Loss: 0.5151, Accuracy: 0.8313

검증중: 100%|██████████| 10/10 [00:00<00:00, 12.38it/s]


Validation Loss: 0.3419, Accuracy: 0.9700

Epoch[2/5]


학습중: 100%|██████████| 75/75 [00:17<00:00,  4.36it/s]


Train Loss: 0.0976, Accuracy: 0.9725

검증중: 100%|██████████| 10/10 [00:00<00:00, 12.65it/s]


Validation Loss: 0.0157, Accuracy: 0.9967

Epoch[3/5]


학습중: 100%|██████████| 75/75 [00:17<00:00,  4.38it/s]


Train Loss: 0.0573, Accuracy: 0.9850

검증중: 100%|██████████| 10/10 [00:00<00:00, 11.86it/s]


Validation Loss: 0.1598, Accuracy: 0.9767

Epoch[4/5]


학습중: 100%|██████████| 75/75 [00:20<00:00,  3.72it/s]


Train Loss: 0.0390, Accuracy: 0.9892

검증중: 100%|██████████| 10/10 [00:00<00:00, 12.27it/s]


Validation Loss: 0.0159, Accuracy: 0.9967

Epoch[5/5]


학습중: 100%|██████████| 75/75 [00:17<00:00,  4.40it/s]


Train Loss: 0.0229, Accuracy: 0.9946

검증중: 100%|██████████| 10/10 [00:00<00:00, 12.64it/s]


Validation Loss: 0.0474, Accuracy: 0.9833


In [73]:
# 저장한 모델 로드
model = torch.load(SAVE_BEST_PATH, map_location=DEVICE, weights_only=False)
model.to(DEVICE)
model.eval()

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
      )
      (1): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  

In [74]:
# 모델 테스트
test_loss, test_acc = validate(model, test_loader, criterion, DEVICE)
print(f"Test Loss: {test_loss:.4f}, Accuracy: {test_acc:.4f}")

검증중: 100%|██████████| 10/10 [00:00<00:00, 12.05it/s]

Test Loss: 0.0128, Accuracy: 1.0000



