In [None]:
# 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import librosa 
import librosa.display as dsp
from IPython.display import Audio

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import os

In [None]:
import torch
import torchaudio

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') #GPU 할당

In [None]:
import random

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(929)

In [None]:
import pandas as pd
train = pd.read_csv('/content/drive/My Drive/audio-data/train.csv')
train.head()

Unnamed: 0,file_name,label
0,001.wav,9
1,002.wav,0
2,004.wav,1
3,005.wav,8
4,006.wav,0


In [None]:
train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 400 entries, 0 to 399
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   file_name  400 non-null    object
 1   label      400 non-null    int64 
dtypes: int64(1), object(1)
memory usage: 6.4+ KB


In [None]:
data, sample_rate = librosa.load('/content/drive/My Drive/audio-data/train/001.wav', sr = 16000)
print('sample_rate:', sample_rate, ', audio shape:', data.shape)
print('length:', data.shape[0]/float(sample_rate), 'secs')

sample_rate: 16000 , audio shape: (10192,)
length: 0.637 secs


In [None]:
def train_dataset():
    folder = "/content/drive/My Drive/audio-data/train/"
    dataset = []
    for file in tqdm(os.listdir(folder),colour='green'):
        if 'wav' in file:
            abs_file_path = os.path.join(folder,file)
            data, sr = librosa.load(abs_file_path, sr = 16000)
            class_label = int(train[train.file_name == file].label)
            dataset.append([data, sr, class_label])
    
    print("Train Dataset 생성 완료")
    return pd.DataFrame(dataset,columns=['data', 'sr', 'label'])

In [None]:
def test_dataset():
    folder = "/content/drive/My Drive/audio-data/test/"
    dataset = []
    for file in tqdm(os.listdir(folder),colour='green'):
        if 'wav' in file:
            abs_file_path = os.path.join(folder,file)
            data, sr = librosa.load(abs_file_path, sr = 16000)
            
            dataset.append([data, sr, file])
    
    print("Test Dataset 생성 완료")
    return pd.DataFrame(dataset,columns=['data', 'sr', 'file_name'])

In [None]:
train_wav = train_dataset()
test_wav = test_dataset()

100%|[32m██████████[0m| 400/400 [00:22<00:00, 18.02it/s]


Train Dataset 생성 완료


100%|[32m██████████[0m| 200/200 [00:07<00:00, 27.22it/s]

Test Dataset 생성 완료





In [None]:
train_wav.head()

Unnamed: 0,data,sr,label
0,"[-0.00020736177, -0.00032222085, -0.0002783999...",16000,2
1,"[-0.003955867, -0.006708248, -0.005994901, -0....",16000,0
2,"[-3.323972e-05, -9.3231734e-05, -3.423063e-05,...",16000,1
3,"[0.00013533195, 0.00020968198, 0.00022006914, ...",16000,8
4,"[0.00025892138, 0.00046078255, 0.00037030628, ...",16000,9


In [None]:
test_wav.head()

Unnamed: 0,data,sr,file_name
0,"[-0.0002413789, -0.00043204584, -0.00041909475...",16000,295.wav
1,"[4.9274193e-05, 8.647903e-05, 6.483143e-05, 5....",16000,567.wav
2,"[0.00013012115, 0.00020055204, 0.00018348989, ...",16000,305.wav
3,"[-0.0004026151, -0.0006202783, -0.00058203033,...",16000,446.wav
4,"[8.87213e-05, 0.00013668207, 0.000102160935, 8...",16000,404.wav


In [None]:
train_data = np.array(train_wav)

In [None]:
test_data = np.array(test_wav)

## add speed augmentation

In [None]:
def speed_augmentation(dataset):
  data_augmented = []
  for waveform, sample_rate, label in dataset:
    if len(waveform) > 10000:
      effects = [["speed", "1.2"], ["rate", f"{sample_rate}"],]
    else:
      effects = [["speed", "0.8"], ["rate", f"{sample_rate}"],]
      
    waveform = torch.tensor(waveform.reshape(1,-1))
    waveform2, sample_rate2 = torchaudio.sox_effects.apply_effects_tensor(waveform, sample_rate, effects)
    waveform2 = np.array(waveform2).reshape(1,-1)
    waveform2 = waveform2.reshape(-1,)
    data_augmented.append([waveform2, sample_rate2, label])
  return np.array(data_augmented, dtype=object)

In [None]:
train_data_aug = speed_augmentation(train_data)

In [None]:
train_data.shape

(400, 3)

In [None]:
train_data_aug.shape

(400, 3)

In [None]:
train_data = np.append(train_data, train_data_aug, axis = 0)

In [None]:
train_data.shape

(800, 3)

## add random padding

In [None]:
train_x = np.array(train_wav.data)
test_x = np.array(test_wav.data)

In [None]:
def get_max(data):

    maxi = 1
    for i in data:
        if len(i) > maxi:
            maxi = len(i)

    return maxi

train_max = get_max(train_x)
test_max = get_max(test_x)

In [None]:
print(train_max, test_max)

15573 15744


In [None]:
def add_padding_train(dataset):
  data_augmented = []

  for waveform, sample_rate, label in dataset:
    pad_need = 16000 - len(waveform)
    for i in range(5):
      randint = np.random.randint(1, pad_need)

      waveform2 = np.append(np.zeros(randint), np.append(waveform, np.zeros(pad_need-randint)))

      data_augmented.append([waveform2, sample_rate, label])
  return np.array(data_augmented, dtype=object)

def add_padding_test(dataset):
  data_augmented = []

  for waveform, sample_rate, test_name in dataset:
    pad_need = 16000 - len(waveform)
    randint = np.random.randint(1, pad_need)
    waveform2 = np.append(np.zeros(randint), np.append(waveform, np.zeros(pad_need-randint)))
    data_augmented.append([waveform2, sample_rate, test_name])
  return np.array(data_augmented, dtype=object)

In [None]:
train_data = add_padding_train(train_data)

In [None]:
test_data.shape

(200, 3)

In [None]:
test_data = add_padding_test(test_data)

In [None]:
train_data.shape

(4000, 3)

In [None]:
train_x = train_data[:,0]

In [None]:
print(train_x.shape)

(4000,)


In [None]:
test_x = test_data[:, 0]

In [None]:
print('train :', train_x.shape)
print('test :', test_x.shape)

train : (4000,)
test : (200,)


In [None]:
train_y = train_data[:, 2]

In [None]:
train_y.shape

(4000,)

## make mfcc and melspectogram data

In [None]:
def spec_to_image(spec, eps=1e-6):
  mean = spec.mean()
  std = spec.std()
  spec_norm = (spec - mean) / (std + eps)
  spec_min, spec_max = spec_norm.min(), spec_norm.max()
  spec_scaled = (spec_norm - spec_min) / (spec_max - spec_min)
  return spec_scaled

In [None]:
audio_mfcc_train = []
audio_mfcc_test = []
audio_mels_train = []
audio_mels_test = []

for data in train_x:
  extracted_features = librosa.feature.mfcc(y=data, sr=16000, n_mfcc=32)
  audio_mfcc_train.append(spec_to_image(extracted_features))

  extracted_features2 = librosa.feature.melspectrogram(y=data, sr=16000, n_mels=32)
  extracted_features2 = librosa.power_to_db(extracted_features2, ref=np.max)
  audio_mels_train.append(spec_to_image(extracted_features2))

for data in test_x:
  extracted_features = librosa.feature.mfcc(y=data, sr=16000, n_mfcc=32)
  audio_mfcc_test.append(spec_to_image(extracted_features))

  extracted_features2 = librosa.feature.melspectrogram(y=data, sr=16000, n_mels=32)
  extracted_features2 = librosa.power_to_db(extracted_features2, ref=np.max)
  audio_mels_test.append(spec_to_image(extracted_features2))

In [None]:
audio_mfcc_train = np.array(audio_mfcc_train).reshape(-1, 1, 32, 32)
audio_mfcc_test = np.array(audio_mfcc_test).reshape(-1, 1, 32, 32)
audio_mels_train = np.array(audio_mels_train).reshape(-1, 1, 32, 32)
audio_mels_test = np.array(audio_mels_test).reshape(-1, 1, 32, 32)

## create simple cnn model using pytorch

In [None]:
import torch.nn as nn

class CNNclassification(torch.nn.Module):
    def __init__(self):
        super(CNNclassification, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer2 = torch.nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer3 = torch.nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer4 = torch.nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.fc_layer = nn.Sequential( 
            nn.Linear(2048, 10) #fully connected layer(ouput layer)
        )    
        
    def forward(self, x):
        
        x = self.layer1(x) #1층
        
        x = self.layer2(x) #2층
         
        x = self.layer3(x) #3층
        
        x = self.layer4(x) #4층
        
        x = torch.flatten(x, 1) # N차원 배열 -> 1차원 배열
        
        x = self.fc_layer(x)
        return x

model = CNNclassification()

In [None]:
pip install torchinfo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from torchinfo import summary

print(summary(model, input_size = (1,1,32,32)))

Layer (type:depth-idx)                   Output Shape              Param #
CNNclassification                        [1, 10]                   --
├─Sequential: 1-1                        [1, 64, 16, 16]           --
│    └─Conv2d: 2-1                       [1, 64, 32, 32]           640
│    └─ReLU: 2-2                         [1, 64, 32, 32]           --
│    └─MaxPool2d: 2-3                    [1, 64, 16, 16]           --
├─Sequential: 1-2                        [1, 128, 8, 8]            --
│    └─Conv2d: 2-4                       [1, 128, 16, 16]          73,856
│    └─ReLU: 2-5                         [1, 128, 16, 16]          --
│    └─MaxPool2d: 2-6                    [1, 128, 8, 8]            --
├─Sequential: 1-3                        [1, 256, 4, 4]            --
│    └─Conv2d: 2-7                       [1, 256, 8, 8]            295,168
│    └─ReLU: 2-8                         [1, 256, 8, 8]            --
│    └─MaxPool2d: 2-9                    [1, 256, 4, 4]            --
├─Seq

In [None]:
import torchvision.datasets as datasets # 데이터셋 집합체
import torchvision.transforms as transforms # 변환 툴

from torch.utils.data import DataLoader # 학습 및 배치로 모델에 넣어주기 위한 툴
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, X, y, train_mode=True, transforms=None): #필요한 변수들을 선언
        self.X = X
        self.y = y
        self.train_mode = train_mode
        self.transforms = transforms

    def __getitem__(self, index): #index번째 data를 return
        X = self.X[index]
        
        if self.transforms is not None:
            X = self.transforms(X)

        if self.train_mode:
            y = self.y[index]
            return X, y
        else:
            return X
    
    def __len__(self): #길이 return
        return len(self.X)

## train via mfcc

In [None]:
from tqdm.auto import tqdm

def train(model, optimizer, train_loader, scheduler, device): 
    model.to(device)
    n = len(train_loader)
    best_acc = 0
    
    for epoch in range(1,num_epochs): #에포크 설정
        model.train() #모델 학습
        running_loss = 0.0
        
        for wav, label in tqdm(iter(train_loader), disable=True):
            
            wav, label = wav.type(torch.FloatTensor), label.type(torch.long)

            wav, label = wav.to(device), label.to(device) #배치 데이터
            optimizer.zero_grad() #배치마다 optimizer 초기화
        
            # Data -> Model -> Output
            logit = model(wav) #예측값 산출
            loss = criterion(logit, label) #손실함수 계산
            
            # 역전파
            loss.backward() #손실함수 기준 역전파 
            optimizer.step() #가중치 최적화
            running_loss += loss.item()
        
        if scheduler is not None:
            scheduler.step()
            
            
        #Validation set 평가
        model.eval() #evaluation 과정에서 사용하지 않아야 하는 layer들을 알아서 off 시키도록 하는 함수
        vali_loss = 0.0
        correct = 0
       
        with torch.no_grad(): #파라미터 업데이트 안하기 때문에 no_grad 사용
            for wav, label in tqdm(iter(vali_loader), disable=True):

                wav, label = wav.type(torch.FloatTensor), label.type(torch.long)
                
                wav, label = wav.to(device), label.to(device)
                logit = model(wav)
                vali_loss += criterion(logit, label)
                pred = logit.argmax(dim=1, keepdim=True)  #10개의 class중 가장 값이 높은 것을 예측 label로 추출
                correct += pred.eq(label.view_as(pred)).sum().item() #예측값과 실제값이 맞으면 1 아니면 0으로 합산
        vali_acc = 100 * correct / len(vali_loader.dataset)
        #print('Vail set: Loss: {:.4f}, Accuracy: {}/{} ( {:.0f}%)\n'.format(vali_loss / len(vali_loader), correct, len(vali_loader.dataset), 100 * correct / len(vali_loader.dataset)))
        
        #베스트 모델 저장
        if best_acc < vali_acc:
            best_acc = vali_acc
            torch.save(model.state_dict(), 'best_model.pth') #이 디렉토리에 best_model.pth을 저장

def predict_valid(model, vali_loader, device):
    model.eval()
    vali_loss = 0.0
    correct = 0
    with torch.no_grad(): #파라미터 업데이트 안하기 때문에 no_grad 사용
        for wav, label in tqdm(iter(vali_loader), disable=True):
            
            wav, label = wav.type(torch.FloatTensor), label.type(torch.long)

            wav, label = wav.to(device), label.to(device) #배치 데이터

            logit = model(wav)
            vali_loss += criterion(logit, label)

            pred = logit.argmax(dim=1, keepdim=True)  #10개의 class중 가장 값이 높은 것을 예측 label로 추출
            correct += pred.eq(label.view_as(pred)).sum().item() #예측값과 실제값이 맞으면 1 아니면 0으로 합산
    vali_acc = 100 * correct / len(vali_loader.dataset)
    return vali_acc    

def predict(model, test_loader, device):
    model.eval()
    model_pred = []
    with torch.no_grad():
        for wav in tqdm(iter(test_loader), disable=True):
            wav = wav.type(torch.FloatTensor)

            wav = wav.to(device)
            
            pred_logit = model(wav)

            model_pred.extend(pred_logit.tolist())
    return model_pred

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

label_encoder = LabelEncoder()

num_epochs = 70
batch_size = 32

skf = StratifiedKFold(n_splits = 10, random_state = 42, shuffle = True) #총 5번의 fold 진행

n = 0

pred_list = []

for train_index, valid_index in skf.split(audio_mfcc_train, label_encoder.fit_transform(train_y)):
  n += 1

  print("===== %d fold =====" %(n))

 
  ## mfcc
  X_train, X_valid = audio_mfcc_train[train_index], audio_mfcc_train[valid_index]
  y_train, y_valid = train_y[train_index], train_y[valid_index]
  '''
  train_dataset = CustomDataset(X=X_train, y=y_train)
  train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

  vali_dataset = CustomDataset(X=X_valid, y=y_valid)
  vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

  model = CNNclassification2().to(device)
  criterion = torch.nn.CrossEntropyLoss().to(device)
  optimizer = torch.optim.Adam(params = model.parameters(), lr = 0.001)
  scheduler = None

  train(model, optimizer, train_loader, scheduler, device)

  test_dataset = CustomDataset(X=audio_mfcc_test, y= None, train_mode=False)
  test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)

  # Validation Accuracy가 가장 뛰어난 모델을 불러옵니다.
  checkpoint = torch.load('best_model.pth')
  model = CNNclassification2().to(device)
  model.load_state_dict(checkpoint)

  print("%d fold mfcc score : %d%%" %(n, predict_valid(model, vali_loader, device)))

  mfcc_preds = predict(model, test_loader, device)
  '''

  ## melspectogram
  X_train, X_valid = audio_mels_train[train_index], audio_mels_train[valid_index]

  train_dataset = CustomDataset(X=X_train, y=y_train)
  train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

  vali_dataset = CustomDataset(X=X_valid, y=y_valid)
  vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

  model = CNNclassification().to(device)
  criterion = torch.nn.CrossEntropyLoss().to(device)
  optimizer = torch.optim.Adam(params = model.parameters(), lr = 0.001)
  scheduler = None

  train(model, optimizer, train_loader, scheduler, device)

  test_dataset = CustomDataset(X=audio_mels_test, y= None, train_mode=False)
  test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)

  # Validation Accuracy가 가장 뛰어난 모델을 불러옵니다.
  checkpoint = torch.load('best_model.pth')
  model = CNNclassification().to(device)
  model.load_state_dict(checkpoint)

  print("%d fold mels score : %d%%" %(n, predict_valid(model, vali_loader, device)))

  mels_preds = predict(model, test_loader, device)

  #pred_list.append(np.array(mfcc_preds) + np.array(mels_preds))
  pred_list.append(np.array(mels_preds))

===== 1 fold =====
1 fold mels score : 100%
===== 2 fold =====
2 fold mels score : 100%
===== 3 fold =====
3 fold mels score : 100%
===== 4 fold =====
4 fold mels score : 100%
===== 5 fold =====
5 fold mels score : 99%
===== 6 fold =====
6 fold mels score : 100%
===== 7 fold =====
7 fold mels score : 99%
===== 8 fold =====
8 fold mels score : 99%
===== 9 fold =====
9 fold mels score : 99%
===== 10 fold =====
10 fold mels score : 100%


## submit 10 fold cv result

In [None]:
pred_proba = pred_list[0]
pred_proba = np.array(pred_proba)
print(pred_proba[0])

for x in range(1, 10):
    pred_proba += pred_list[x]
    print(pred_list[x][0])

pred_class = []

for i in pred_proba:
    pred = np.argmax(i)
    pred_class.append(pred)

[ 12.11338902 -29.68893051   2.38490057  -0.67388457 -16.66025162
 -11.27813816 -10.31386471  -1.9040705  -25.38279533   0.0583332 ]
[ 11.83879375 -25.16182137   5.76404667  -0.69165313 -13.26419449
 -10.08846664 -11.20291901  -0.33122888 -21.94761848   2.94140911]
[ 16.13591576 -31.49822426   3.54072833  -1.13393652 -24.62070656
 -13.91210556  -9.21488571  -2.41283989 -25.28068352   1.10930693]
[ 12.97489452 -21.92676926   2.32835364  -1.8964628  -16.05875587
 -10.98121071  -3.77511001   1.76216745 -12.91107559   1.78104377]
[ 15.19774342 -28.77054214   5.67140961  -4.32099009 -18.798769
  -9.71912479 -13.19395542   1.41473305 -20.65965652   1.69770455]
[ 13.09883785 -28.07902336   3.53971982  -2.20593429 -21.50437164
 -10.13330078  -4.8013339   -0.78275454 -18.53122139  -2.35971189]
[ 13.32914639 -26.17148781   5.58505535  -1.86449242 -15.96135712
  -8.01762009  -9.50854111   0.29280996 -17.20223999   1.69409251]
[ 11.9453516  -22.46689796   3.68034673   1.191149   -14.18681526
 -11.

In [None]:
test_wav['label'] = pred_class
test_wav = test_wav[['file_name', 'label']]

pred_df = test_wav.copy()
pred_df = pred_df.sort_values(by=[pred_df.columns[0]], ascending=[True]).reset_index(drop=True)
pred_df.head()

Unnamed: 0,file_name,label
0,003.wav,0
1,008.wav,9
2,010.wav,3
3,015.wav,8
4,024.wav,2


In [None]:
submission = pd.read_csv('/content/drive/My Drive/audio-data/sample_submission.csv')
submission['label'] = pred_df['label']
submission.head()

Unnamed: 0,file_name,label
0,003.wav,0
1,008.wav,9
2,010.wav,3
3,015.wav,8
4,024.wav,2


In [None]:
submission.to_csv('/content/drive/My Drive/audio-data/submit3_mels_10fold.csv', index=False)