# Import

In [2]:
import random
import pandas as pd
import numpy as np
import os
import librosa

from tqdm.auto import tqdm

from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import OneHotEncoder

import warnings
warnings.filterwarnings(action='ignore') 

# Hyperparameter Setting

In [None]:
CFG = {
    'SR':16000,
    'N_MFCC':32, # MFCC 벡터를 추출할 개수
    'SEED':42
}

# Fixed Random-Seed

In [None]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)

seed_everything(CFG['SEED']) # Seed 고정

# Data Pre-Processing 1

In [1]:
train_df = pd.read_csv('./train_data.csv')
test_df = pd.read_csv('./test_data.csv')

NameError: name 'pd' is not defined

In [None]:
def get_mfcc_feature(df, data_type, save_path):
    # Data Folder path
    root_folder = './wav_dataset'
    if os.path.exists(save_path):
        print(f'{save_path} is exist.')
        return
    features = []
    for uid in tqdm(df['id']):
        root_path = os.path.join(root_folder, data_type)
        path = os.path.join(root_path, str(uid).zfill(5)+'.wav')

        # librosa패키지를 사용하여 wav 파일 load
        y, sr = librosa.load(path, sr=CFG['SR'])
        
        # librosa패키지를 사용하여 mfcc 추출
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CFG['N_MFCC'])

        y_feature = []
        # 추출된 MFCC들의 평균을 Feature로 사용
        for e in mfcc:
            y_feature.append(np.mean(e))
        features.append(y_feature)
    
    # 기존의 자가진단 정보를 담은 데이터프레임에 추출된 오디오 Feature를 추가
    mfcc_df = pd.DataFrame(features, columns=['mfcc_'+str(x) for x in range(1,CFG['N_MFCC']+1)])
    df = pd.concat([df, mfcc_df], axis=1)
    df.to_csv(save_path, index=False)
    print('Done.')

"\ndef get_mfcc_feature(df, data_type, save_path):\n    # Data Folder path\n    root_folder = './wav_dataset'\n    if os.path.exists(save_path):\n        print(f'{save_path} is exist.')\n        return\n    features = []\n    for uid in tqdm(df['id']):\n        root_path = os.path.join(root_folder, data_type)\n        path = os.path.join(root_path, str(uid).zfill(5)+'.wav')\n\n        # librosa패키지를 사용하여 wav 파일 load\n        y, sr = librosa.load(path, sr=CFG['SR'])\n        \n        # librosa패키지를 사용하여 mfcc 추출\n        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=CFG['N_MFCC'])\n\n        y_feature = []\n        # 추출된 MFCC들의 평균을 Feature로 사용\n        for e in mfcc:\n            y_feature.append(np.mean(e))\n        features.append(y_feature)\n    \n    # 기존의 자가진단 정보를 담은 데이터프레임에 추출된 오디오 Feature를 추가\n    mfcc_df = pd.DataFrame(features, columns=['mfcc_'+str(x) for x in range(1,CFG['N_MFCC']+1)])\n    df = pd.concat([df, mfcc_df], axis=1)\n    df.to_csv(save_path, index=False)\n    print(

In [None]:
'''
get_mfcc_feature(train_df, 'train', './train_mfcc_data.csv')
get_mfcc_feature(test_df, 'test', './test_mfcc_data.csv')
'''

"\nget_mfcc_feature(train_df, 'train', './train_mfcc_data.csv')\nget_mfcc_feature(test_df, 'test', './test_mfcc_data.csv')\n"

In [None]:
train_mfcc_data = pd.read_csv('./train_mfcc_data.csv')
test_mfcc_data = pd.read_csv('./test_mfcc_data.csv')
# get_mfcc_feature2('unlabeled', './unlabeled_mfcc_data.csv')

# Data Pre-Processing 2

In [None]:
# wav 파일의 MFCC Feature와 상태정보를 합친 학습데이터를 불러옵니다.
# train_df = pd.read_csv('./train_mfcc_data.csv')

# 학습데이터를 모델의 input으로 들어갈 x와 label로 사용할 y로 분할
train_x = train_mfcc_data.drop(columns=['id', 'covid19'])
train_y = train_df['covid19']

In [None]:
def onehot_encoding(ohe, x):
    # 학습데이터로 부터 fit된 one-hot encoder (ohe)를 받아 transform 시켜주는 함수
    encoded = ohe.transform(x['gender'].values.reshape(-1,1))
    encoded_df = pd.DataFrame(encoded, columns=ohe.categories_[0])
    x = pd.concat([x.drop(columns=['gender']), encoded_df], axis=1)
    return x

In [None]:
# 'gender' column의 경우 추가 전처리가 필요 -> OneHotEncoder 적용
ohe = OneHotEncoder(sparse=False)
ohe.fit(train_x['gender'].values.reshape(-1,1))
train_x = onehot_encoding(ohe, train_x)

In [None]:
train_x = np.array(train_x)
train_x

array([[24.,  0.,  1., ...,  1.,  0.,  0.],
       [51.,  0.,  0., ...,  0.,  1.,  0.],
       [22.,  0.,  0., ...,  0.,  1.,  0.],
       ...,
       [26.,  0.,  0., ...,  1.,  0.,  0.],
       [27.,  0.,  0., ...,  1.,  0.,  0.],
       [49.,  1.,  1., ...,  1.,  0.,  0.]])

In [None]:
# 위의 학습데이터를 전처리한 과정과 동일하게 test data에도 적용
test_x = pd.read_csv('/content/drive/MyDrive/YDS/DACON/COVID19/test_mfcc_data.csv')
test_x = test_x.drop(columns=['id'])
# Data Leakage에 유의하여 train data로만 학습된 ohe를 사용
# test_x = onehot_encoding(ohe, test_x)

In [None]:
test_x

NameError: ignored

In [None]:
def spec_to_image(spec, eps=1e-6):
  mean = spec.mean()
  std = spec.std()
  spec_norm = (spec - mean) / (std + eps)
  spec_min, spec_max = spec_norm.min(), spec_norm.max()
  spec_scaled = (spec_norm - spec_min) / (spec_max - spec_min)
  return spec_scaled

In [None]:
audio_mfcc_train = []
audio_mfcc_test = []
audio_mels_train = []
audio_mels_test = []

for data in train_x:
  extracted_features = librosa.feature.mfcc(y=data, sr=16000, n_mfcc=32)
  audio_mfcc_train.append(spec_to_image(extracted_features))

  extracted_features2 = librosa.feature.melspectrogram(y=data, sr=16000, n_mels=32)
  extracted_features2 = librosa.power_to_db(extracted_features2, ref=np.max)
  audio_mels_train.append(spec_to_image(extracted_features2))

# for data in test_x:
#   extracted_features = librosa.feature.mfcc(y=data, sr=16000, n_mfcc=32)
#   audio_mfcc_test.append(spec_to_image(extracted_features))

#   extracted_features2 = librosa.feature.melspectrogram(y=data, sr=16000, n_mels=32)
#   extracted_features2 = librosa.power_to_db(extracted_features2, ref=np.max)
#   audio_mels_test.append(spec_to_image(extracted_features2))

In [None]:
audio_mels_train

[array([[0.7247694 ],
        [1.        ],
        [0.9568583 ],
        [0.88617282],
        [0.96431696],
        [0.        ],
        [0.91366979],
        [0.73867666],
        [0.76635199],
        [0.75432086],
        [0.32666662],
        [0.40939441],
        [0.6111005 ],
        [0.66686597],
        [0.59287623],
        [0.67629087],
        [0.74241078],
        [0.86648121],
        [0.92361857],
        [0.94589533],
        [0.96028778],
        [0.95711742],
        [0.92930644],
        [0.88738351],
        [0.77869849],
        [0.65466623],
        [0.83105391],
        [0.94225373],
        [0.95570129],
        [0.91211783],
        [0.78329711],
        [0.90967257]]), array([[0.73166958],
        [0.9781134 ],
        [0.93521516],
        [0.87019673],
        [0.94789989],
        [0.        ],
        [0.95363504],
        [0.77962937],
        [0.84930157],
        [0.83733831],
        [0.72800115],
        [0.81026667],
        [0.39395843],
        [

# Modeling

In [None]:
# import tensorflow as tf

In [None]:
# model = tf.keras.models.Sequential([
#   tf.keras.layers.Flatten(input_shape=(28, 28)),
#   tf.keras.layers.Dense(128, activation='relu'),
#   tf.keras.layers.Dropout(0.2),
#   tf.keras.layers.Dense(10, activation='softmax')
# ])

# model.compile(optimizer='adam',
#               loss='binary_crossentropy',
#               metrics=['accuracy'])

In [None]:
import torch.nn as nn

class CNNclassification(torch.nn.Module):
    def __init__(self):
        super(CNNclassification, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer2 = torch.nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer3 = torch.nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.layer4 = torch.nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=(3, 3), padding=1), #cnn layer
            nn.ReLU(), #activation function
            nn.MaxPool2d(kernel_size=2, stride=2)) #pooling layer
        
        self.fc_layer = nn.Sequential( 
            nn.Linear(2048, 10) #fully connected layer(ouput layer)
        )    
        
    def forward(self, x):
        
        x = self.layer1(x) #1층
        
        x = self.layer2(x) #2층
         
        x = self.layer3(x) #3층
        
        x = self.layer4(x) #4층
        
        x = torch.flatten(x, 1) # N차원 배열 -> 1차원 배열
        
        x = self.fc_layer(x)
        return x

model = CNNclassification()

In [None]:
pip install torchinfo

In [None]:
from torchinfo import summary

print(summary(model, input_size = (1,1,32,32)))

In [None]:
import torchvision.datasets as datasets # 데이터셋 집합체
import torchvision.transforms as transforms # 변환 툴

from torch.utils.data import DataLoader # 학습 및 배치로 모델에 넣어주기 위한 툴
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, X, y, train_mode=True, transforms=None): #필요한 변수들을 선언
        self.X = X
        self.y = y
        self.train_mode = train_mode
        self.transforms = transforms

    def __getitem__(self, index): #index번째 data를 return
        X = self.X[index]
        
        if self.transforms is not None:
            X = self.transforms(X)

        if self.train_mode:
            y = self.y[index]
            return X, y
        else:
            return X
    
    def __len__(self): #길이 return
        return len(self.X)

# Train

In [None]:
'''
mlp = MLPClassifier(random_state=CFG['SEED']) # Sklearn에서 제공하는 Multi-layer Perceptron classifier 사용
mlp.fit(train_x, train_y) # Model Train
'''

In [None]:
from tqdm.auto import tqdm

def train(model, optimizer, train_loader, scheduler, device): 
    model.to(device)
    n = len(train_loader)
    best_acc = 0
    
    for epoch in range(1,num_epochs): #에포크 설정
        model.train() #모델 학습
        running_loss = 0.0
        
        for wav, label in tqdm(iter(train_loader), disable=True):
            
            wav, label = wav.type(torch.FloatTensor), label.type(torch.long)

            wav, label = wav.to(device), label.to(device) #배치 데이터
            optimizer.zero_grad() #배치마다 optimizer 초기화
        
            # Data -> Model -> Output
            logit = model(wav) #예측값 산출
            loss = criterion(logit, label) #손실함수 계산
            
            # 역전파
            loss.backward() #손실함수 기준 역전파 
            optimizer.step() #가중치 최적화
            running_loss += loss.item()
        
        if scheduler is not None:
            scheduler.step()
            
            
        #Validation set 평가
        model.eval() #evaluation 과정에서 사용하지 않아야 하는 layer들을 알아서 off 시키도록 하는 함수
        vali_loss = 0.0
        correct = 0
       
        with torch.no_grad(): #파라미터 업데이트 안하기 때문에 no_grad 사용
            for wav, label in tqdm(iter(vali_loader), disable=True):

                wav, label = wav.type(torch.FloatTensor), label.type(torch.long)
                
                wav, label = wav.to(device), label.to(device)
                logit = model(wav)
                vali_loss += criterion(logit, label)
                pred = logit.argmax(dim=1, keepdim=True)  #10개의 class중 가장 값이 높은 것을 예측 label로 추출
                correct += pred.eq(label.view_as(pred)).sum().item() #예측값과 실제값이 맞으면 1 아니면 0으로 합산
        vali_acc = 100 * correct / len(vali_loader.dataset)
        #print('Vail set: Loss: {:.4f}, Accuracy: {}/{} ( {:.0f}%)\n'.format(vali_loss / len(vali_loader), correct, len(vali_loader.dataset), 100 * correct / len(vali_loader.dataset)))
        
        #베스트 모델 저장
        if best_acc < vali_acc:
            best_acc = vali_acc
            torch.save(model.state_dict(), 'best_model.pth') #이 디렉토리에 best_model.pth을 저장

def predict_valid(model, vali_loader, device):
    model.eval()
    vali_loss = 0.0
    correct = 0
    with torch.no_grad(): #파라미터 업데이트 안하기 때문에 no_grad 사용
        for wav, label in tqdm(iter(vali_loader), disable=True):
            
            wav, label = wav.type(torch.FloatTensor), label.type(torch.long)

            wav, label = wav.to(device), label.to(device) #배치 데이터

            logit = model(wav)
            vali_loss += criterion(logit, label)

            pred = logit.argmax(dim=1, keepdim=True)  #10개의 class중 가장 값이 높은 것을 예측 label로 추출
            correct += pred.eq(label.view_as(pred)).sum().item() #예측값과 실제값이 맞으면 1 아니면 0으로 합산
    vali_acc = 100 * correct / len(vali_loader.dataset)
    return vali_acc    

def predict(model, test_loader, device):
    model.eval()
    model_pred = []
    with torch.no_grad():
        for wav in tqdm(iter(test_loader), disable=True):
            wav = wav.type(torch.FloatTensor)

            wav = wav.to(device)
            
            pred_logit = model(wav)

            model_pred.extend(pred_logit.tolist())
    return model_pred

In [None]:
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

label_encoder = LabelEncoder()

num_epochs = 70
batch_size = 32

skf = StratifiedKFold(n_splits = 10, random_state = 42, shuffle = True) #총 5번의 fold 진행

n = 0

pred_list = []

for train_index, valid_index in skf.split(audio_mfcc_train, label_encoder.fit_transform(train_y)):
  n += 1

  print("===== %d fold =====" %(n))

 
  ## mfcc
  X_train, X_valid = audio_mfcc_train[train_index], audio_mfcc_train[valid_index]
  y_train, y_valid = train_y[train_index], train_y[valid_index]
  '''
  train_dataset = CustomDataset(X=X_train, y=y_train)
  train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

  vali_dataset = CustomDataset(X=X_valid, y=y_valid)
  vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

  model = CNNclassification2().to(device)
  criterion = torch.nn.CrossEntropyLoss().to(device)
  optimizer = torch.optim.Adam(params = model.parameters(), lr = 0.001)
  scheduler = None

  train(model, optimizer, train_loader, scheduler, device)

  test_dataset = CustomDataset(X=audio_mfcc_test, y= None, train_mode=False)
  test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)

  # Validation Accuracy가 가장 뛰어난 모델을 불러옵니다.
  checkpoint = torch.load('best_model.pth')
  model = CNNclassification2().to(device)
  model.load_state_dict(checkpoint)

  print("%d fold mfcc score : %d%%" %(n, predict_valid(model, vali_loader, device)))

  mfcc_preds = predict(model, test_loader, device)
  '''

  ## melspectogram
  X_train, X_valid = audio_mels_train[train_index], audio_mels_train[valid_index]

  train_dataset = CustomDataset(X=X_train, y=y_train)
  train_loader = DataLoader(train_dataset, batch_size = batch_size, shuffle=True)

  vali_dataset = CustomDataset(X=X_valid, y=y_valid)
  vali_loader = DataLoader(vali_dataset, batch_size = batch_size, shuffle=False)

  model = CNNclassification().to(device)
  criterion = torch.nn.CrossEntropyLoss().to(device)
  optimizer = torch.optim.Adam(params = model.parameters(), lr = 0.001)
  scheduler = None

  train(model, optimizer, train_loader, scheduler, device)

  test_dataset = CustomDataset(X=audio_mels_test, y= None, train_mode=False)
  test_loader = DataLoader(test_dataset, batch_size = batch_size, shuffle=False)

  # Validation Accuracy가 가장 뛰어난 모델을 불러옵니다.
  checkpoint = torch.load('best_model.pth')
  model = CNNclassification().to(device)
  model.load_state_dict(checkpoint)

  print("%d fold mels score : %d%%" %(n, predict_valid(model, vali_loader, device)))

  mels_preds = predict(model, test_loader, device)

  #pred_list.append(np.array(mfcc_preds) + np.array(mels_preds))
  pred_list.append(np.array(mels_preds))

# Inference

In [None]:
'''
# Model 추론
preds = model.predict(test_x)
'''

In [None]:
model.evaluate(test_x, batch_size=28)

In [None]:
# 정확도

print("훈련 세트 정확도 : {:.3f}".format(mlp.score(train_x, train_y)))
print("테스트 세트 정확도 : {:.3f}".format(mlp.score(test_x, preds)))

# Submission

In [None]:
submission = pd.read_csv('./sample_submission.csv')
submission['covid19'] = preds
submission.to_csv('./submit.csv', index=False)