# Tobig's 정규세션 10주차 음성 과제 1

- 다음 5개의 질문에 **단답형**으로 답해주세요.

## Q1. 시간에 따른 소리의 진폭을 나타낸 그래프를 영어로 무엇이라고 하나요?

A1. Waveform

## Q2. 샘플링 주파수는 원래 신호의 최고 주파수의 2배 이상이 되어야 원래 신호로 복구할 수 있다는 정리의 이름을 영어로 무엇이라고 하나요?

A2. Nyquist-Shannon Sampling Theorem

## Q3. 소리를 시각화한 것으로, 단시간 푸리에 변환을 통해 시간, 주파수, 진폭 정보를 모두 담고 있는 것을 영어로 무엇이라고 하나요?

A3. Spectrogram

## Q4. 인간의 청각 시스템에 맞게 Mel-scale을 적용하여 주파수를 조절한 스펙트로그램을 영어로 무엇이라고 하나요?

A4. Mel Spectrogram

## Q5. Speech 정규세션 강의자가 수업 시작할 때 5명을 랜덤으로 지목하여 질문하겠다고 했으나, 실제로는 그보다 적은 n명에게 질문하였습니다. n의 값은 무엇입니까?

A5. 0

# Tobig's 정규세션 10주차 음성 과제 2

1. train 폴더 내의 2,000개 음원을 이용하여 음성 분류 모델을 만들어보세요.
2. 음성 분류 모델을 이용하여 test 폴더 내의 300개 음원을 분류하세요.

In [None]:
import librosa 
import librosa.display as dsp
from IPython.display import Audio
import pandas as pd
import numpy as np
from tqdm import tqdm
import os
import torch
import glob

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') #GPU 할당

In [None]:
data_dir = os.getcwd()
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')
audio_files = glob.glob(os.path.join(train_dir, '*.wav'))
test_files = glob.glob(os.path.join(test_dir, '*.wav'))

labels = [int(os.path.basename(file)[-5]) for file in audio_files]


In [None]:
def load_audio(files, sr=44100):
    audios = []
    labels = []
    filenames = []
    for file in files:
        filename = os.path.basename(file)
        label = int(filename[-5]) 
        labels.append(label)
        filenames.append(filename)

        audio, _ = librosa.load(file, sr=sr)
        audios.append(audio)

    return np.array(audios), np.array(labels),filenames

In [None]:
def train_dataset(files):
    audio_data, labels , filenames = load_audio(files)
    dataset = pd.DataFrame(list(zip(audio_data, labels)), columns=['data', 'label'])
    return dataset

train_wav = train_dataset(audio_files)
train_wav


In [None]:
def test_dataset(files):
    audio_test, _ , filenames = load_audio(files)
    dataset = pd.DataFrame(list(zip(audio_test, filenames)), columns=['data', 'filename'])
    return dataset

test_wav = test_dataset(test_files)
test_wav


In [None]:
train_x = np.array(train_wav.data)
test_x = np.array(test_wav.data)

In [None]:
def get_mini(data):

    mini = 9999999
    for i in data:
        if len(i) < mini:
            mini = len(i)

    return mini

train_mini = get_mini(train_x)
test_mini = get_mini(test_x)

#음성들의 길이를 맞추기
mini = np.min([train_mini, test_mini])

print('가장 작은 길이 :', mini)

In [None]:
def set_length(data, d_mini):

    result = []
    for i in data:
        result.append(i[:d_mini])
    result = np.array(result)

    return result

train_x = set_length(train_x, mini)
test_x = set_length(test_x, mini)

In [None]:
print('train :', train_x.shape)
print('test :', test_x.shape)

In [None]:
# extracted_features = librosa.feature.mfcc(y=train_x[0], sr=44100, n_mfcc=40)
# extracted_features.shape

In [None]:
def preprocess_dataset(data):
    mfccs = []
    for i in data:
        extracted_features = librosa.feature.mfcc(y=i,
                                              sr=44100,
                                              n_mfcc=40)
        mfccs.append(extracted_features)
            
    return mfccs

In [None]:
train_mfccs = preprocess_dataset(train_x)
train_mfccs = np.array(train_mfccs)
train_mfccs = train_mfccs.reshape(-1, train_mfccs.shape[1], train_mfccs.shape[2], 1)


In [None]:
np.array(train_mfccs).shape

In [None]:
import torchvision.datasets as datasets
import torchvision.transforms as transforms

from torch.utils.data import DataLoader
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, X, y, train_mode=True, transforms=None):
        self.X = X
        self.y = y
        self.train_mode = train_mode
        self.transforms = transforms

    def __getitem__(self, index):
        X = self.X[index]
        
        if self.transforms is not None:
            X = self.transforms(X)

        if self.train_mode:
            y = self.y[index]
            return X, y
        else:
            return X
    
    def __len__(self): #길이 return
        return len(self.X)

In [None]:
train_X = train_mfccs[:1800]
vali_X = train_mfccs[1800:]

In [None]:
train_y = train_wav.label[:1800]
vali_y = train_wav.label[1800:].reset_index(drop = True)

In [None]:
vali_y

In [None]:
num_epochs = 100

batch_size = 32

train_dataset = CustomDataset(X=train_X, y=train_y)
train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

vali_dataset = CustomDataset(X=vali_X, y=vali_y)
vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

In [None]:
train_batches = len(train_loader)
vali_batches = len(vali_loader)

print('/ total train batches :', train_batches)
print('/ total valid batches :', vali_batches)

In [None]:
import torch
from tqdm.auto import tqdm
import torch.nn as nn 

class CNNclassification(torch.nn.Module):
    def __init__(self):
        super(CNNclassification, self).__init__()
        self.layer1 = torch.nn.Sequential(
          nn.Conv2d(40, 10, kernel_size=2, stride=1, padding=1),
          nn.BatchNorm2d(10),
          nn.ReLU(),
          nn.Dropout(0.2),
          nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer2 = torch.nn.Sequential(
          nn.Conv2d(10, 100, kernel_size=2, stride=1, padding=1),
          nn.BatchNorm2d(100),
          nn.ReLU(),
          nn.Dropout(0.2),
          nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer3 = torch.nn.Sequential(
          nn.Conv2d(100, 200, kernel_size=2, stride=1, padding=1),
          nn.BatchNorm2d(200),
          nn.ReLU(),
          nn.Dropout(0.2),
          nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer4 = torch.nn.Sequential(
          nn.Conv2d(200, 300, kernel_size=2, stride=1, padding=1),
          nn.BatchNorm2d(300),
          nn.ReLU(),
          nn.Dropout(0.2),
          nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc_layer = nn.Sequential(
          nn.Linear(300, 10),
          nn.Dropout(0.2))

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = torch.flatten(x, start_dim=1)
        out = self.fc_layer(x)
        return out


In [None]:
import torch.optim as optim # 최적화 알고리즘들이 포함힘

model = CNNclassification().to(device)
criterion = torch.nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(params = model.parameters(), lr = 1e-3,  weight_decay=1e-5)
scheduler = None

In [None]:
model(torch.rand(10, 40, 8, 1).to(device))

In [None]:
from tqdm.auto import tqdm

data_dir = os.getcwd()
model_save_path = os.path.join(data_dir, 'model/best_model.pth')
os.makedirs(os.path.dirname(model_save_path), exist_ok=True)

def train(model, optimizer, train_loader, scheduler, device): 
    model.to(device)
    n = len(train_loader)
    best_acc = 0
    
    for epoch in range(1,num_epochs):
        model.train()
        running_loss = 0.0
        
        for wav, label in tqdm(iter(train_loader)):
            
            wav, label = wav.to(device), label.to(device) 
            optimizer.zero_grad()
        
            logit = model(wav)
            loss = criterion(logit, label) 
            
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
             
        print('[%d] Train loss: %.10f' %(epoch, running_loss / len(train_loader)))
        
        if scheduler is not None:
            scheduler.step()
            
        model.eval()
        vali_loss = 0.0
        correct = 0
       
        with torch.no_grad(): 
            for wav, label in tqdm(iter(vali_loader)):
                
                wav, label = wav.to(device), label.to(device)
                logit = model(wav)
                vali_loss += criterion(logit, label)
                pred = logit.argmax(dim=1, keepdim=True)
                correct += pred.eq(label.view_as(pred)).sum().item()
        vali_acc = 100 * correct / len(vali_loader.dataset)
        print('Vail set: Loss: {:.4f}, Accuracy: {}/{} ( {:.0f}%)\n'.format(vali_loss / len(vali_loader), correct, len(vali_loader.dataset), 100 * correct / len(vali_loader.dataset)))
        
        if best_acc < vali_acc:
            best_acc = vali_acc
            torch.save(model.state_dict(), model_save_path)
            print('Model Saved.')

In [None]:
train(model, optimizer, train_loader, scheduler, device)

In [None]:
test_mfccs = preprocess_dataset(test_x)
test_mfccs = np.array(test_mfccs)
test_mfccs = test_mfccs.reshape(-1, test_mfccs.shape[1], test_mfccs.shape[2], 1)

In [None]:
test_mfccs.shape

In [None]:
def predict(model, test_loader, device):
    model.eval()
    model_pred = []
    with torch.no_grad():
        for wav in tqdm(iter(test_loader)):
            wav = wav.to(device)

            pred_logit = model(wav)
            pred_logit = pred_logit.argmax(dim=1, keepdim=True).squeeze(1)

            model_pred.extend(pred_logit.tolist())
    return model_pred

In [None]:
test_dataset = CustomDataset(X=test_mfccs, y= None, train_mode=False)
test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)

In [None]:
checkpoint = torch.load(model_save_path)
model = CNNclassification().to(device)
model.load_state_dict(checkpoint)

preds = predict(model, test_loader, device)
preds[0:5]

In [None]:
len(preds)

In [None]:
test_wav['label'] = preds
test_wav = test_wav[['file_name', 'label']]

pred_df = test_wav.copy()
pred_df = pred_df.sort_values(by=[pred_df.columns[0]], ascending=[True]).reset_index(drop=True)
pred_df.head()