In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio
import torchaudio.transforms as AT
from torch import nn, optim
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

### Tutorial. 음성 파일(.wav)을 torchaudio로 읽기
음성 파일은 일정한 시간 간격을 가지고 어떤 지점의 음압을 측정한 것으로, 이때 일정한 시간 간격으로 음압을 측정하는 주파수를 sampling rate라고 한다. \
torchaudio를 사용해 음성을 읽으면 음성 데이터와 sampling rate를 반환하는데 아래의 음성에서는 sampling rate가 16000Hz임을 알 수 있다. \
한편, 음성 데이터의 크기를 보면 [1,16000]으로 나와 있는데 1은 채널의 개수 즉, 녹음한 마이크의 개수를 의미하고 16000은 데이터의 길이를 의미한다. sampling rate가 16000Hz인데 길이가 16000이라는 것은 이 음성의 길이가 1초라는 것을 의미한다.

In [None]:
file_name = "./Data/test/test_00001.wav"
x, sr = torchaudio.load(file_name)
print(x.shape, sr)

#torch.Size([1,16000]) 16000

아래는 읽은 음성 데이터를 시각화한 것이다. 가로축은 시간을 세로축은 음압을 의미한다. 음압이 큰 부분에 사람의 목소리가 있음을 추측할 수 있다. 

In [None]:
_, ax = plt.subplots(1,1,figsize=(8,4))
ax.plot(x[0,:])
plt.show()

 다음으로 이 음성 데이터를 딥러닝 모델이 더 잘 이해할 수 있는 spectrogram이나 melspectrogram으로 변환하는 방법에 대해 알아보겠다. 
 
 1. 푸리에 변환: 음성 신호에 푸리에 변환을 적용하면 각 진동수 성분이 그 음성에 얼마나 들어있는지 알 수 있다. 쉽게 설명하자면 음성 신호에서 저음과 고음의 분포를 정량적으로 구할 수 있다는 의미이다.
 2. STFT (Short Time Fourier Transform): 1초짜리 음성에 푸리에 변환을 적용하면 음성 전체에서 각 진동수 성분이 얼마나 들어있는지를 구할 수 있다. 다만, 음성 속의 단어를 파악하기 위해서는 시간에 따라 변하는 진동수 분포의 양상을 파악해야 한다. 따라서, 음성을 보다 작게 (0.01초 수준) 잘라서 각각의 작은 조각에 푸리에 변환을 적용할 수 있다. 이것을 STFT라고 부르고 일반적으로 이 결과의 L2 Norm을 Spectrogram이라고 부른다. 
 3. Melspectrogram: Melspectrogram은 spectrogram에 mel-filter를 적용해서 얻을 수 있다. 이는 사람의 청각 기관이 고음에서 주파수의 변화에 덜 민감하고 저음에서 더 민감한 특징을 반영하고 있다. 딥러닝과 사람의 청각 반응은 관련이 없어 보일 수 있으나 음성 처리나 자연어 처리 분야에서도 melspectrogram은 널리 사용되고 있으며, 좋은 성능을 보여주고 있다. 또한 melspectrogram은 spectrogram보다 크기가 작아서 학습 속도 등에서 유리한 점이 있다. 
 
 torchaudio에서는 다음과 같이 spectrogram과 melspectrogram을 얻을 수 있는 프로세스를 정의할 수 있다. AmplituteToDB는 power 단위의 spectrogram 또는 melspectrogram을 dB(로그) 단위로 변환해준다. 개인적으로 dB 단위가 딥러닝 모델이 이해하기 편한 범위의 값을 제공한다고 생각한다. 
 
 1. n_fft: win_length의 크기로 잘린 음성의 작은 조각은 0으로 padding되어 n_fft의 크기로 조절된다. 이후 padding된 조각에 푸리에 변환을 적용한다. 따라서 n_fft는 win_length보다 크거나 같아야 하고, 일반적으로 빠른 학습을 위해 2^n의 값으로 설정된다. 
 2. win_length: 원래의 음성을 작게 잘랐을 때의 조각의 크기를 의미한다. 자연어 처리 분야에서는 25m의 크기를 기본으로 하고 있으며, 16000Hz인 음성에서는 400에 해당하는 값이다.
 3. hop_length: 음성을 작은 조각으로 자를 때의 간격을 의미한다. 즉, 이 길이만큼 옆으로 밀면서 작은 조각을 얻는다. 일반적으로 10ms의 크기를 기본으로 하고 있으며 16000Hz인 음성에서는 160에 해당하는 값이다. 
 4. n_mels: 적용할 mel filter의 개수를 의미한다.

In [None]:
spectrogram = nn.Sequential(
    AT.Spectrogram(n_fft=512, win_length=400, hop_length=160),
    AT.AmplitudeToDB()
)

mel_spectrogram = nn.Sequential(
    AT.MelSpectrogram(sample_rate=sr, n_fft=512, win_length=400, hop_length=160, n_mels=80),
    AT.AmplitudeToDB()
)

 이제 실제 음성 데이터로부터 spectrogram과 melspectrogram을 얻어보겠다. 각각의 크기는 채널을 무시하면 [257,101], [80,101]인데, 101은 시간축 방향 성분의 수, 257과 80은 주파수 방향 성분의 수를 의미한다. n_mel=80이었으므로 melspectrogram의 주파수 방향 성분의 수는 80인 것이다. spectrogram의 경우 (n_fft / 2 + 1)개의 주파수 방향 성분을 얻을 수 있다. \
 주파수 성분은 0Hz부터 sampling rate의 절반 즉, 8000Hz까지를 나타내게 된다. sampling rate의 절반까지밖에 표현하지 못하는 이유는 Nyquist frequency에 대해 알아보면 이해할 수 있을 것이다. 

In [None]:
spec = spectrogram(x)
mel = mel_spectrogram(x)
print(spec.shape, mel.shape)

# torch.Size([1,257,101]) torch.Size([1,80,101])

 마지막으로 spectorgram과 melspectrogram의 해상력에 대해 설명하겠다. \
 win_length가 커질수록 주파수 성분에 대한 해상력은 높아지지만, 즉 더 정밀해지지만, 시간 성분에 대한 해상력은 낮아지게 된다. 즉, 더 정밀한 주파수 분포를 얻을 수 있으나 시간에 따른 주파수 변화를 관찰하기가 어려워진다. \
 반대로 win_length가 작은 경우에는 주파수 성분에 대한 해상력은 낮아지지만, 시간 성분에 대한 해상력은 높아지게 된다. \
 따라서 적절한 값의 win_length를 찾는 것이 중요하다. 또한 n_fft를 키우는 경우 주파수 성분의 수는 증가할지 몰라도 실제 주파수의 해상력은 증가하지 않는다. 

In [None]:
_, ax = plt.subplots(1,2,figsize=(8,4))
ax[0].pcolor(spec[0])
ax[1].pcolor(mel[0])
plt.show()

 ### Feature 생성: Multi-Resolution Mel-Spectrogram
 4가지의 서로 다른 window_length로 다양한 해상력을 가진 mel-spectrogram을 만든다.  \
 이 4개의 mel-spectrogram을 stack한 후, 평균과 표준편차를 정규화한다. \
 또한, 각 주파수에 대하여 보정 값을 더해서 최종 feature를 완성한다.
 (feature의 크기는 (4,160,160)이다.)

In [None]:
import random
from glob import glob
from tqdm import tqdm

import numpy as np
import pandas as pd

torch.cuda.is_available()

In [None]:
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"

class MRMS(nn.Module):
    def __init__(self):
        super(MRMS, self).__init__()
        self.sr, self.n_fft, self.hop, self.pad, self.f_min, self.f_max, self.n_mels = 16000, 2048, 100, 50, 25, 7500, 160
        self.tf_0 = self.create_tf(250)
        self.tf_1 = self.create_tf(500)
        self.tf_2 = self.create_tf(750)
        self.tf_3 = self.create_tf(1000)
        self.cali = torch.linspace(-0.5, 0.5, steps=160, device=DEVICE).view(1,-1,1)
    
    def create_tf(self, win_length):
        tf = nn.Sequential(
            AT.MelSpectrogram(sample_rate=self.sr, n_fft=self.n_fft, win_length=win_length,
                              hop_length=self.hop, pad=self.pad, f_min=self.f_min, 
                              f_max=self.f_max, n_mels=self.n_mels),
            AT.AmplitudeToDB()
        )
        return tf
    
    def forward(self, x):
        with torch.no_grad():
            spec_0 = self.tf_0(x)[0, :, 1:-1]
            spec_1 = self.tf_1(x)[0, :, 1:-1]
            spec_2 = self.tf_2(x)[0, :, 1:-1]
            spec_3 = self.tf_3(x)[0, :, 1:-1]
            
            out = torch.stack([spec_0, spec_1, spec_2, spec_3])
            out = (out - out.mean(dim=[1,2], keepdim=True)) / 20 + self.cali
            return out

In [None]:
extractor = MRMS().to(DEVICE)

for file_name in tqdm(np.sort(glob("./Data/train/*.wav"))):
    x, _ = torchaudio.load(file_name)
    x = x.to(DEVICE)
    spec = extractor(x)
    name = "./Cache/train/" + file_name.split("/")[-1].split(".")[0] + ".pt"
    torch.save(spec.to("cpu"), name)

for file_name in tqdm(np.sort(glob("./Data/test/*.wav"))):
    x, _ = torchaudio.load(file_name)
    x = x.to(DEVICE)
    spec = extractor(x)
    name = "./Cache/test/" + file_name.split("/")[-1].split(".")[0] + ".pt"
    torch.save(spec.to("cpu"), name)

 ### Data Augmentation
 이 문제를 이미지 탐지 및 분류 문제로 접근해보자. \
 즉, 모델은 mel-spectrogram의 모양을 보고 특정 클래스를 탐지하는 것이다. 따라서, 이미지 분류 및 컴퓨터 비전 분야에서 효과적으로 사용되는 augmentation 기법을 적용하였다. 음성 자체에 적용되는 pitch shifting과 같은 방법보다는 계산적으로도 유리하고 단순한 a) Random Crop과 b) Mixup 기법을 적용하였다. 추가로 Time/Frequency Masking을 적용하였다. 

In [None]:
class CustomDataset(Dataset):
    
    def __init__(self, file_list, gt_list, augmentation):
        self.file_list = file_list
        self.gt_list = gt_list
        self.augmentation = augmentation
        self.spec_augment = nn.Sequential(
            AT.FrequencyMasking(32, False), AT.TimeMasking(12, False), AT.TimeMasking(12, False),
        )
    
    def __len__(self):
        return len(self.file_list)
    
    def __getitem__(self, index):
        with torch.no_grad():
            x = torch.load(self.file_list[index])
            gt = self.gt_list[index]
            
            if self.augmentation:
                x = self.spec_augment(x)
                i,j = random.randrange(64), random.randrange(64)
                x = F.pad(x, [32,32,32,32])
                x = x[:, i:i+160, j:j+160]
                
                if random.random() > 0.25:
                    mixup_lambda = random.uniform(0.05, 0.25)
                    mixup_index = random.randrange(self.__len__())
                    mixup_x = torch.load(self.file_list[mixup_index])
                    mixup_gt = self.gt_list[mixup_index]
                    x = (1 - mixup_lambda) * x + mixup_lambda * mixup_x
                    gt = (1 - mixup_lambda) * gt + mixup_lambda * mixup_gt
        
        return x, gt

 ### Set learning parameters and DataLoader

In [None]:
N_MODEL = 16
N_EPOCH = 200
BATCH_SIZE = 128
MODEL_FACTOR = 24
LEARNING_RATE = 0.2
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0001
LOADER_PARAM = {
        "batch_size": BATCH_SIZE, "num_workers": 3, "pin_memory": True
}

train_x = np.sort(glob("./Cache/train/*.pt"))
test_x = np.sort(glob("./Cache/test/*.pt"))
train_y = torch.tensor(pd.read_csv("./Data/train_answer.csv").to_numpy()[:,1:], dtype=torch.float32)

train_loader = DataLoader(CustomDataset(train_x, train_y, augmentation=True), shuffle=True, 
                          drop_last=True, **LOADER_PARAM)
test_loader = DataLoader(CustomDataset(test_x, list(range(10000)), augmentation=False), shuffle=False,
                         drop_last=False, **LOADER_PARAM)

 ### Model
 최근 이미지 분야에서 매우 효과적인 모델(e.g. DARTS, EfficientNet 등)이 있지만 이 대회에서는 간단하게 VGG와 비슷한 모델을 구성하였다. 아래와 같이 Pooling 방법을 달리한 2가지 모델을 만들었다. 

In [None]:
class bn_relu_conv(nn.Module):
    
    def __init__(self, ks, n_in, n_out):
        super(bn_relu_conv, self).__init__()
        self.bn = nn.BatchNorm2d(n_in)
        self.relu = nn.LeakyReLU(0.1)
        self.conv = nn.Conv2d(n_in, n_out, kernel_size=ks, padding=ks // 2)
    
    def forward(self, x):
        return self.conv(self.relu(self.bn(x)))

class MAC(nn.Module):
    
    def __init__(self, pool_method):
        super(MAC, self).__init__()
        self.factor = MODEL_FACTOR
        self.pool = nn.AvgPool2d(2) if pool_method == "Avg" else nn.MaxPool2d(2)
        self.initialize = nn.Conv2d(4, 2*self.factor, 6, stride=2, padding=2)
       
        self.lg_0 = nn.Sequential(
            bn_relu_conv(1, 2*self.factor, self.factor),
            bn_relu_conv(3, self.factor, self.factor),
            bn_relu_conv(1, self.factor, self.factor),
            bn_relu_conv(3, self.factor, self.factor)
        )
        self.lg_1 = nn.Sequential(
            bn_relu_conv(1, self.factor, 2*self.factor),
            bn_relu_conv(3, 2*self.factor, 2*self.factor),
            bn_relu_conv(1, 2*self.factor, 2*self.factor),
            bn_relu_conv(3, 2*self.factor, 2*self.factor)
        )
        self.lg_2 = nn.Sequential(
            bn_relu_conv(1, 2*self.factor, 4*self.factor),
            bn_relu_conv(3, 4*self.factor, 4*self.factor),
            bn_relu_conv(1, 4*self.factor, 4*self.factor),
            bn_relu_conv(3, 4*self.factor, 4*self.factor)
        )
        self.lg_3 = nn.Sequential(
            bn_relu_conv(1, 4*self.factor, 6*self.factor),
            bn_relu_conv(3, 6*self.factor, 6*self.factor),
            bn_relu_conv(1, 6*self.factor, 6*self.factor),
            bn_relu_conv(3, 6*self.factor, 6*self.factor)
        )
        self.lg_4 = nn.Sequential(
            bn_relu_conv(1, 6*self.factor, 8*self.factor),
            bn_relu_conv(3, 8*self.factor, 8*self.factor),
            bn_relu_conv(1, 8*self.factor, 8*self.factor),
            bn_relu_conv(3, 8*self.factor, 8*self.factor)
        )
        
        self.finalize = nn.Sequential(
            bn_relu_conv(1, 8*self.factor, 16*self.factor),
            nn.AdaptiveAvgPool2d(1)
        ) if pool_method == "Avg" else nn.Sequential(
            bn_relu_conv(1, 8*self.factor, 16*self.factor),
           nn.AdaptiveMaxPool2d(1)
        )
        
        self.dense = nn.Sequential(
            nn.Linear(16*self.factor, 8*self.factor),
            nn.Dropout(0.25),
            nn.BatchNorm1d(8*self.factor),
            nn.LeakyReLU(0.25),
            nn.Linear(8*self.factor, 30)
        )
    
    def forward(self, x):
        x = self.initialize(x)
        x = self.pool(self.lg_0(x))
        x = self.pool(self.lg_1(x))
        x = self.pool(self.lg_2(x))
        x = self.pool(self.lg_3(x))
        x = self.finalize(self.lg_4(x))
        x = self.dense(x.view(x.shape[0], -1))
        
        return F.log_softmax(x, dim=1)

 ### Train
 오차 함수로는 KL-Divergence,Optimizer로는 SGD를 사용했다. 이는 다양한 실험에서 SGD가 Adam에 비해 더 좋은 값으로 수렴함을 확인할 수 있었기 때문이다. 또한, PyTorch의 AMP(Automatic Mixed Precision)을 이용해 메모리를 절약하고 학습 시간을 단축했다. \
 Max Pooling을 적용한 모델 8개 및 Avg Pooling을 적용한 모델 8개를 전체 학습 데이터에 대해 학습하였고 총 16개의 결과를 평균하여 결과를 제출하였다. 

In [None]:
def train(model_no, pool_method):
    model = MAC(pool_method).to(DEVICE)
    criterion = nn.KLDivLoss(reduction="batchmean")
    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM,
                          weight_decay=WEIGHT_DECAY)
    scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer, T_0=N_EPOCH, eta_min=LEARNING_RATE / 40)
    scaler = GradScaler()
    
    for epoch in range(N_EPOCH):
        model.train()
        running_loss, running_count = 0., 0
        
        for i, (xx,yy) in tqdm(enumerate(train_loader), leave=False):
            optimizer.zero_grad()
            
            with autocast():
                xx,yy = xx.to(DEVICE), yy.to(DEVICE)
                pred = model(xx)
                loss = criterion(pred, yy)
            
            scaler.scale(loss).backward()
            scaler.step(optimzier)
            scaler.update()
            scheduler.step(epoch + i / len(train_loader))
            
            with torch.no_grad():
                running_loss += loss.item() * len(yy)
                running_count += len(yy)
        print("{}Pool Model {:01d} Epoch {:03d} | Train {:7.5f}"
              .format(pool_method, model_no+1, epoch+1, running_loss/running_count))
    
    model.eval()
    prediction = torch.zeros((10000,30), dtype=torch.float32)
    
    with torch.no_grad():
        for idx, (xx,_) in enumerate(test_loader):
            xx = xx.to(DEVICE)
            pred = model(xx).detach().exp().to("cpu")
            prediction[BATCH_SIZE * idx:min(BATCH_SIZE * (idx+1), len(prediction))] = pred[:,:]
    
    df = pd.read_csv("./Data/submission.csv")
    df.iloc[:,1:] = prediction.numpy()
    df.to_csv("./SubResult/{}{:01d}.csv".format(pool_method, model_no+1), index=False)

In [None]:
for m in range(N_MODEL // 2):
    train(m, "Avg")
    train(m, "Max")

out = np.zeros((10000, 30), dtype=np.float32)
file_list = glob("./SubResult/*.csv")

for file_name in file_list:
    print(file_name)
    out += pd.read_csv(file_name).to_numpy()[:,1:]
out /= len(file_list)

df = pd.read_csv("./data/submission.csv")
df.iloc[:,1:] = out
df.to_csv("./PredictionFinal.csv", index=False)

 ### Result
 Public LB: 0.34680 \
 Private LB: 0.34732