In [None]:
# import os
from glob import glob

import librosa
from librosa.filters import mel as librosa_mel_fn

import torch
from torch import nn
import torch.nn.functional as F

import torchaudio
from torch.utils.data import Dataset, DataLoader

In [None]:
# GPU 사용 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

In [None]:
import random

# 난수 발생을 위한 seed를 모두 0으로 설정
torch.manual_seed(0)
if torch.cuda.is_available():
    torch.cuda.manual_seed(0)

random.seed(0)

In [None]:
from google.colab import drive

# 구글 드라이브 마운트
drive.mount('/content/gdrive', force_remount=True)
filepath = '/content/gdrive/My Drive' + '/Colab Notebooks/speech_emotion_recognition/'

In [None]:
class SpeechDataset(Dataset):
    def __init__(self, n_fft=1024, n_mels=80, sr=22050, hop_size=256, \
                 win_size=1024, fmin=0, fmax=8000, max_sec=6.0, center=False):
        # wav 파일 가져오기
        # os.getcwd() : 현재의 경로
        # wav 디렉토리 안에 Actor별로 디렉토리에 있는 모든 wav 파일을 가져옴
        # Colab에서 os를 활용하기 보다는 직접 자신의 경로를 맞게 다시 설정해주자!
        # 예시 : sorted(glob("/content/gdrive/your_path/wav/*/*.wav"))
        self.wav_path = sorted(glob(filepath + "*/*.wav"))
        self.wav_len = len(self.wav_path)
        self.n_fft = n_fft # FFT 사이즈
        self.n_mels = n_mels # Mel-frequency의 개수
        self.sr = sr # 샘플링 레이트
        self.hop_size = hop_size # hop length
        self.win_size = win_size # window length
        self.fmin = fmin # 최저 주파수
        self.fmax = fmax # 최고 주파수
        self.center = center # True면 padding을 좀 주고, False면 padding이 없고
        self.max_length = int(sr * max_sec) # 서로 다른 wav의 길이를 모두 동일하게 맞추기

        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

        self.mel_basis = {} # Mel Filter Bank 만들어두기
        self.hann_window = {} # hanning window 만들어두기

        self.mel = librosa_mel_fn(sr=self.sr, n_fft=self.n_fft, \
                            n_mels=self.n_mels, fmin=self.fmin, fmax=self.fmax)
        self.mel_basis[self.device] = torch.from_numpy(self.mel).float().to(self.device)
        self.hann_window[self.device] = torch.hann_window(self.n_fft).to(self.device)


    def __len__(self): # len()에 대응하는 스페셜 메소드
        return self.wav_len

    def __getitem__(self, idx): # 인덱싱을 지원하는 Operator 개념?
        emotion = self.get_emotion(self.wav_path[idx])  # emotion label 가져오기

        waveform = self.load_audio(self.wav_path[idx]) # waveform 획득
        spec = self.mel_spectrogram(waveform).to(self.device) # wav → mel-spectrogram

        return spec, emotion

    def load_audio(self, file_path):
        waveform, sr = torchaudio.load(file_path) # wav 파일 load
        waveform = waveform.to(self.device) # wav 파일 로드

        # 2채널 이상이면, mono 채널로 만들기
        if waveform.size(0) != 1:
            waveform = waveform.mean(dim=0).unsqueeze(0)

        # 샘플링 레이트 안맞으면 re-sampling
        if sr != self.sr:
            resampler = torchaudio.transforms.Resample(sr, self.sr).to(self.device)
            waveform = resampler(waveform)

        # wav normalization
        waveform = waveform / torch.max(torch.abs(waveform))

        # zero-padding 얼마나 할지
        pad = int(self.max_length - waveform.size(1) - self.hop_size) // 2

        # wav 양쪽에 zero-padding
        waveform = torch.nn.functional.pad(waveform, (pad, pad), mode='constant')

        return waveform

    def mel_spectrogram(self, y): # wavform -> mel-spectrogram 변환
        # short-time Fourier transform (STFT)
        spec = torch.stft(y, self.n_fft, hop_length=self.hop_size, win_length=self.win_size,  \
                          window=self.hann_window[self.device], center=self.center, \
                          pad_mode='reflect', normalized=False, onesided=True, return_complex=True)
        spec = torch.view_as_real(spec)  # Complex Tensor를 Real Tensor로 변환
        spec = torch.sqrt(spec.pow(2).sum(-1) + (1e-9))
        # Complex -> Spectrogram(Magnigtude)
        spec = torch.matmul(self.mel_basis[self.device], spec)
        # Spectrogram(Magnitude) -> Mel-Spectrogram
        spec = torch.log(torch.clamp(spec, min=1e-5))  # Power to Decibel

        return spec

    # 파일 이름에서 emotion 정보 가져오기
    def get_emotion(self, file_path):
        # wav 파일명은 아래와 같은 규칙을 가지고 있음
        # 02-01-06-01-02-01-12.wav
        # 세번째 항목(06)이 emotion label -> 해당 파일은 "fearful"에 해당함
        # (01=neutral, 02=calm, 03=happy, 04=sad, 05=angry, 06=fearful, 07=disgust, 08=surprised)
        # 데이터에 대해 더 궁금한건 "Kaggle - Speech Emotion Recognition"을 검색해보자!
        part = file_path.split("Actor")[1]
        emotion = int(part.split("-")[2])
        return torch.LongTensor([emotion - 1]).squeeze().to(self.device)

In [None]:
speech_dataset = SpeechDataset()

# 데이터셋 인덱스 생성
indices = list(range(len(speech_dataset)))
random.shuffle(indices)

# 데이터셋 분할 인덱스 계산
train_ratio = 0.7
validation_ratio = 1 - train_ratio
train_split_index = int(train_ratio * len(speech_dataset))

# train 데이터셋과 validation 데이터셋 생성
train_dataset = torch.utils.data.Subset(speech_dataset, indices[:train_split_index])
val_dataset = torch.utils.data.Subset(speech_dataset, indices[train_split_index:])

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
class CNN(nn.Module):
    def __init__(self, num_classes=8):
        super().__init__()

        self.cnn = nn.Sequential (
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1),
            # [b, 1, 80, 512] -> [b, 16, 80, 512]
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),
            # [b, 16, 80, 512] -> [b, 16, 40, 256]
            nn.Dropout(0.1),

            nn.Conv2d(16, 16, kernel_size=3, stride=1, padding=1),
            # [b, 16, 40, 256] -> [b, 16, 40, 256]
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),
             # [b, 16, 40, 256] -> [b, 16, 20, 128]
            nn.Dropout(0.1),

            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
             # [b, 16, 20, 128] -> [b, 32, 20, 128]
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),
            # [b, 32, 20, 128] -> [b, 32, 10, 64]
            nn.Dropout(0.1),

            nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1),
             # [b, 32, 10, 64] -> [b, 32, 10, 64]
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),
             # [b, 32, 10, 64] -> [b, 32, 5, 32]
            nn.Dropout(0.1),
        )

        self.fc1 = nn.Linear(32 * 5 * 32, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.cnn(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:


from torch.optim import Adam

model = CNN().to(device)
criterion = nn.CrossEntropyLoss().to(device)
opti = Adam(model.parameters(), lr=1e-3, weight_decay=1e-3)

In [None]:
from tqdm.notebook import tqdm

def train(model, dataloader, criterion, data_len, opti):
    correct = 0

    model.train()
    for data, target in tqdm(dataloader):
        data = data.to(device)
        target = target.to(device)

        output = model(data)
        loss = criterion(output, target)

        opti.zero_grad()
        loss.backward()
        opti.step()

        pred = output.max(1, keepdim=True)[1]
        correct += pred.eq(target.view_as(pred)).sum().item()

    acc = 100. * correct / data_len
    return acc

In [None]:
def evaluate(model, dataloader, criterion, data_len):
    correct = 0

    model.eval()
    for data, target in tqdm(dataloader):
        data = data.to(device)
        target = target.to(device)

        output = model(data)
        loss = criterion(output, target)

        pred = output.max(1, keepdim=True)[1]
        correct += pred.eq(target.view_as(pred)).sum().item()

    acc = 100. * correct / data_len
    return acc

In [None]:
# .wav를 .npz로 변경해서 학습시간을 줄이는 것이 유리해보인다.
# wav → MFCC로 변경하는 건가?
epoch = 20

for i in range(epoch):
    train_acc = train(model, train_dataloader, criterion, len(train_dataloader.dataset), opti)
    val_acc = evaluate(model, val_dataloader, criterion, len(val_dataloader.dataset))

    print(f"[Epoch: {i:2d}], [Train Acc: {train_acc:3.4f}], [Val Acc: {val_acc:3.4f}]" + '\n')