# 載入套件

In [None]:
import numpy as np
import pandas as pd
import librosa
import librosa.display
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import glob
import warnings

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, recall_score, f1_score, classification_report, accuracy_score

import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F

from torch.optim import Adam, SGD
from torchvision.transforms import transforms
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import CosineAnnealingLR,CosineAnnealingWarmRestarts,StepLR, ReduceLROnPlateau
from torchvision.io import read_image
from torchvision import transforms
from PIL import Image
from torchvision import datasets
from torchvision.transforms import ToTensor

## voice preprocess

In [None]:
# MFCC
def make_mfcc(df):
    for file in df['wave_path'].to_list():
        signal_tem, sample_rate = librosa.load(file, sr=44100)
        signal = signal_tem[:44100]
        n_fft = int(16/1000 * sample_rate)
        hop_length = int(8/1000 * sample_rate)

        # extract 13 MFCCs
        MFCCs = librosa.feature.mfcc(y=signal, sr =sample_rate, n_fft=n_fft, hop_length=hop_length, n_mfcc=13)
        # print(MFCCs.shape)
        
        np.save(file.replace('.wav', f'_mfcc_13.npy'), MFCCs)

## functions

In [None]:
def medical_data_proccessing(df):
    # 將性別編碼0,1
    df['Sex'] = df['Sex'] - 1

    # 將空值填0
    df['PPD'] = df['PPD'].fillna(0)
    df['Voice handicap index - 10'] = df['Voice handicap index - 10'].fillna(0)

    # 正規化過大的數值
    df['Age'] = df['Age'] / 50
    df['Voice handicap index - 10'] = df['Voice handicap index - 10'] / 40

    return df

def normalization(data):
    _range = np.max(data) - np.min(data)
    return (data - np.min(data)) / _range
 
def standardization(data):
    mu = np.mean(data, axis=0)
    sigma = np.std(data, axis=0)
    return (data - mu) / sigma

In [None]:
warnings.filterwarnings("ignore")

## Dataset

In [None]:
# Train Dataset
class CustomImageDataset(Dataset):
    def __init__(self, source_df, transform=None, target_transform=None):
        medical_col = ['Sex', 'Age', 'Narrow pitch range',
                'Decreased volume', 'Fatigue', 'Dryness', 'Lumping', 'heartburn',
                'Choking', 'Eye dryness', 'PND', 'Smoking', 'PPD', 'Drinking',
                'frequency', 'Diurnal pattern', 'Onset of dysphonia ', 'Noise at work',
                'Occupational vocal demand', 'Diabetes', 'Hypertension', 'CAD',
                'Head and Neck Cancer', 'Head injury', 'CVA',
                'Voice handicap index - 10']
        
        self.dataframe = source_df
        self.transform = transform
        self.target_transform = target_transform

        try:
            self.medical = self.dataframe.drop(columns= ['ID', 'mfcc_path', 'Disease category', 'wave_path'])
        except:
            self.medical = self.dataframe.drop(columns= ['ID', 'mfcc_path', 'Disease category'])
        self.path = self.dataframe['mfcc_path']
        self.label = self.dataframe['Disease category']
    
    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.path.iloc[idx]
        
        np_araay = np.load(img_path)
        np_araay = standardization(np_araay)

        medicals = self.medical.iloc[idx].values
        label = self.label.iloc[idx]

        if self.transform:
            np_araay = self.transform(np_araay)

        if self.target_transform:
            label = self.target_transform(label)
        
        return np_araay, medicals, label, img_path

In [None]:
# Test dataset
class CustomImageDataset_test(Dataset):
    def __init__(self, source_df, transform=None, target_transform=None):

        medical_col = ['Sex', 'Age', 'Narrow pitch range',
                'Decreased volume', 'Fatigue', 'Dryness', 'Lumping', 'heartburn',
                'Choking', 'Eye dryness', 'PND', 'Smoking', 'PPD', 'Drinking',
                'frequency', 'Diurnal pattern', 'Onset of dysphonia ', 'Noise at work',
                'Occupational vocal demand', 'Diabetes', 'Hypertension', 'CAD',
                'Head and Neck Cancer', 'Head injury', 'CVA',
                'Voice handicap index - 10']
        
        self.dataframe = source_df
        self.transform = transform
        self.target_transform = target_transform

        try:
            self.medical = self.dataframe.drop(columns= ['ID', 'mfcc_path', 'wave_path'])
        except:
            self.medical = self.dataframe.drop(columns= ['ID', 'mfcc_path'])
        self.path = self.dataframe['mfcc_path']
        self.id = self.dataframe['ID']
        # self.label = self.dataframe['Disease category']
        
    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.path.iloc[idx]
        
        np_araay = np.load(img_path)
        np_araay = standardization(np_araay)

        medicals = self.medical.iloc[idx].values
        ids = self.id.iloc[idx]
        # label = self.label.iloc[idx]

        if self.transform:
            np_araay = self.transform(np_araay)

        if self.target_transform:
            label = self.target_transform(label)
        
        return np_araay, medicals, ids

## Network

In [None]:
device = torch.device('cuda:0')

In [None]:
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()
        
        self.maxpool = 2
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=(3, 3), stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(16)        
        self.pool1 = nn.MaxPool2d(self.maxpool, padding= 1)

        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 10), stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(self.maxpool, padding= 1)
        
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 10), stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d(self.maxpool, padding= 1)

        self.conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn4 = nn.BatchNorm2d(64)   
        self.pool4 = nn.MaxPool2d(self.maxpool, padding= 1)

        self.conv5 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn5 = nn.BatchNorm2d(64)

        self.conv6 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn6 = nn.BatchNorm2d(64)

        self.conv7 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.bn7 = nn.BatchNorm2d(128)
        
        self.conv_open = False

        self.pools = 1
        self.ave_pool = nn.AdaptiveAvgPool2d(self.pools)

        # MLP   
        self.linear1= nn.Linear(44, 1024)
        self.l1 = nn.BatchNorm1d(1024)
        self.drop2 = nn.Dropout(0.1)
        self.linear2= nn.Linear(1024, 256)
        self.l2 = nn.BatchNorm1d(256)
        self.linear3= nn.Linear(256, 128)
        
        # FC
        self.fc1_num = 128
        self.fc1 = nn.Linear(64 * self.pools * self.pools + 128, self.fc1_num)
        self.bnfc = nn.BatchNorm1d(self.fc1_num)
        self.fc2 = nn.Linear(self.fc1_num, 5)
        self.soft = nn.Softmax(dim=1)
        self.drop = nn.Dropout(0.3)


    def forward(self, input1, medical):
        # cnn
        output = F.celu(self.conv1(input1))
        output = self.bn1(output)
        output = self.pool1(output) 

        output = F.celu(self.conv2(output))     
        output = self.bn2(output)
        output = self.pool2(output)

        output = F.celu(self.conv3(output)) 
        output = self.bn3(output)
        output = self.pool3(output)   

        output = self.ave_pool(output)
        output = output.view(-1, 64*self.pools*self.pools)

        # med
        x = F.celu(self.linear1(medical))
        x = F.celu(self.linear2(x))
        x = F.celu(self.linear3(x))
        
        # concat
        con = torch.cat((output, x), 1)
        con = F.celu(self.fc1(con))
        con = self.bnfc(con)
        con = self.fc2(con)   
        con = self.soft(con)

        return con

## main

In [None]:
# train
df_train = pd.read_csv(r'..\Training_Dataset\training_datalist.csv')
df_train['wave_path'] = df_train['ID'].apply(lambda x: f'..\\Training_Dataset\\training_voice_data\\{x}.wav')
make_mfcc(df_train)
df_train['mfcc_path'] = df_train['ID'].apply(lambda x: f'..\\Training_Dataset\\training_voice_data\\{x}_mfcc_13.npy')

# public
source_df_pub = pd.read_csv(r'..\Public Testing Dataset\test_datalist_public.csv')
source_df_pub['wave_path'] = source_df_pub['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}.wav')
make_mfcc(source_df_pub)
source_df_pub['mfcc_path'] = source_df_pub['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}_mfcc_13.npy')

# private
source_df_pri = pd.read_csv(r'..\Private Testing Dataset\test_datalist_private.csv')
source_df_pri['wave_path'] = source_df_pri['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}.wav')
make_mfcc(source_df_pri)
source_df_pri['mfcc_path'] = source_df_pri['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}_mfcc_13.npy')


# make dataloder
source_df = medical_data_proccessing(df_train)
source_df = pd.get_dummies(source_df, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])
source_df['Disease category'] = source_df['Disease category'] - 1

skf = StratifiedKFold(n_splits=4, shuffle=True)
fold1 = list(skf.split(source_df, source_df['Disease category']))[0]

training_df = source_df.loc[fold1[0]]
test_df = source_df.loc[fold1[1]]

cat_num_list = training_df['Disease category'].value_counts().sort_index().to_list()

trans_comp = transforms.Compose([transforms.ToTensor()])
train_dataset = CustomImageDataset(training_df, transform=trans_comp)
test_dataset = CustomImageDataset(test_df, transform=trans_comp)

train_dl = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True)
test_dl = DataLoader(test_dataset, batch_size=32, shuffle=False)

# model
model = Network().to(device)

weight = torch.tensor([1/cat_num_list[0], 1/cat_num_list[1], 1/cat_num_list[2], 1/cat_num_list[3], 1/cat_num_list[4]]).to(device)
criterion = nn.CrossEntropyLoss(weight=weight)

optimizer = SGD(model.parameters(), lr=0.01, weight_decay= 0.0001)

In [None]:
for i in range(15):
    # read file
    if i > 0:
        df_train = pd.read_csv(r'..\Training_Dataset\training_datalist.csv')
        df_train['mfcc_path'] = df_train['ID'].apply(lambda x: f'..\\Training_Dataset\\training_voice_data\\{x}_mfcc_13.npy')

        df_public = pd.read_csv(r'.\pesudo_pub_mfcc_13.csv')
        df_public['mfcc_path'] = df_public['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}_mfcc_13.npy')
        df_public['Sex'] = df_public['Sex'] + 1

        df_private = pd.read_csv(r'.\pesudo_pri_mfcc_13.csv')
        df_private['mfcc_path'] = df_private['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}_mfcc_13.npy')
        df_private['Sex'] = df_private['Sex'] + 1

        source_df = pd.concat([df_train, df_public, df_private], axis=0, ignore_index=True)
    else:
        df_train = pd.read_csv(r'..\Training_Dataset\training_datalist.csv')
        df_train['mfcc_path'] = df_train['ID'].apply(lambda x: f'..\\Training_Dataset\\training_voice_data\\{x}_mfcc_13.npy')
        source_df = df_train

    # make dataloder
    source_df = medical_data_proccessing(source_df)
    source_df = pd.get_dummies(source_df, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])
    source_df['Disease category'] = source_df['Disease category'] - 1

    skf = StratifiedKFold(n_splits=4, shuffle=True)
    fold1 = list(skf.split(source_df, source_df['Disease category']))[0]

    training_df = source_df.loc[fold1[0]]
    test_df = source_df.loc[fold1[1]]

    cat_num_list = training_df['Disease category'].value_counts().sort_index().to_list()

    trans_comp = transforms.Compose([transforms.ToTensor()])
    train_dataset = CustomImageDataset(training_df, transform=trans_comp)
    test_dataset = CustomImageDataset(test_df, transform=trans_comp)

    train_dl = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True)
    test_dl = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # model
    model = Network().to(device)

    weight = torch.tensor([1/cat_num_list[0], 1/cat_num_list[1], 1/cat_num_list[2], 1/cat_num_list[3], 1/cat_num_list[4]]).to(device)
    criterion = nn.CrossEntropyLoss(weight=weight)

    optimizer = SGD(model.parameters(), lr=0.01, weight_decay= 0.0001)

    # train
    train_acc_list = [0]
    test_acc_list = [0]
    before_acc = 0.
    epoch = 150
    for epoch in range(epoch):
        pred_list = []
        label_list = []
        
        size = len(train_dl.dataset)    
        losses = 0.
        accuracies = 0.
        total = 0
        
        for batch, (np_araay, medicals, labels, img_path) in enumerate(train_dl):
            model.train()
            
            inputs, medicals, labels = np_araay.float().to(device), medicals.float().to(device), labels.to(device)
            optimizer.zero_grad()
            preds = model(inputs, medicals)
            
            loss = criterion(preds, labels)
            losses = losses + loss.item()

            # backpropagation        
            loss.backward()
            optimizer.step()

            pred = list(preds.cpu().argmax(1))
            label = list(labels.cpu())
            
            pred_list += pred
            label_list += label

        y_pred = [a.item() for a in pred_list]
        y_true = [b.item() for b in label_list]

        results_recall = recall_score(y_true, y_pred, average=None)
        train_acc_list.append(results_recall.mean())

        # pesudo label
        pred_list = []
        label_list = []
        model.eval()
        with torch.no_grad():
            for np_araay, medicals, labels, img_path in test_dl:
                inputs, medicals, labels = np_araay.float().to(device), medicals.float().to(device), labels.to(device)

                preds = model(inputs, medicals)
                
                pred = list(preds.cpu().argmax(1))
                label = list(labels.cpu())
                
                pred_list += pred
                label_list += label

        y_true_test = [a.item() for a in pred_list]
        y_pred_test = [b.item() for b in label_list]

        results_recall_test = recall_score(y_true_test, y_pred_test, average=None)
        test_mean = results_recall_test.mean()
        test_acc_list.append(test_mean)

        bad_group = (results_recall_test[3] + results_recall_test[4])/2

        if test_mean > before_acc :
            before_acc = test_mean
            torch.save(model.state_dict(), "{}.pth".format("mfcc_13"))

    # load model
    model.load_state_dict(torch.load("{}.pth".format("mfcc_13")))

    # public
    source_df_pub = pd.read_csv(r'..\Public Testing Dataset\test_datalist_public.csv')
    source_df_pub['mfcc_path'] = source_df_pub['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}_mfcc_13.npy')

    source_df_pub_pro = medical_data_proccessing(source_df_pub)
    source_df_pub_pro = pd.get_dummies(source_df_pub_pro, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])

    trans_comp = transforms.Compose([transforms.ToTensor()])
    pub_dataset = CustomImageDataset_test(source_df_pub_pro, transform=trans_comp)
    pub_dl = DataLoader(pub_dataset, batch_size=32, shuffle=False)

    pub_pred_list = []
    model.eval()
    with torch.no_grad():
        for np_araay, medicals, img_path in pub_dl:
            inputs, medicals= np_araay.float().to(device), medicals.float().to(device)
            preds = model(inputs, medicals)
            pred = list(preds.cpu().argmax(1))
            pub_pred_list += pred

    y_pub = [x.item() for x in pub_pred_list]
    source_df_pub['Disease category'] = [cat + 1 for cat in y_pub]
    source_df_pub.to_csv('pesudo_pub_mfcc_13.csv', index=False)

    # private
    source_df_pri = pd.read_csv(r'..\Private Testing Dataset\test_datalist_private.csv')
    source_df_pri['mfcc_path'] = source_df_pri['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}_mfcc_13.npy')

    source_df_pri_pro = medical_data_proccessing(source_df_pri)
    source_df_pri_pro = pd.get_dummies(source_df_pri_pro, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])

    pri_dataset = CustomImageDataset_test(source_df_pri_pro, transform=trans_comp)
    pri_dl = DataLoader(pri_dataset, batch_size=32, shuffle=False)

    pri_pred_list = []
    model.eval()
    with torch.no_grad():
        for np_araay, medicals, img_path in pri_dl:
            inputs, medicals= np_araay.float().to(device), medicals.float().to(device)
            preds = model(inputs, medicals)
            pred = list(preds.cpu().argmax(1))
            pri_pred_list += pred
    
    y_pri = [x.item() for x in pri_pred_list]
    source_df_pri['Disease category'] = [cat + 1 for cat in y_pri]
    source_df_pri.to_csv('pesudo_pri_mfcc_13.csv', index=False)

## make submission

In [None]:
model.load_state_dict(torch.load("{}.pth".format("mfcc_13")))

# public
source_df_pub = pd.read_csv(r'..\Public Testing Dataset\test_datalist_public.csv')
source_df_pub['wave_path'] = source_df_pub['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}.wav')
# make_mfcc(source_df_pub)
source_df_pub['mfcc_path'] = source_df_pub['ID'].apply(lambda x: f'..\\Public Testing Dataset\\test_data_public\\{x}_mfcc_13.npy')

source_df_pub_pro = medical_data_proccessing(source_df_pub)
source_df_pub_pro = pd.get_dummies(source_df_pub_pro, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])

trans_comp = transforms.Compose([transforms.ToTensor()])
pub_dataset = CustomImageDataset_test(source_df_pub_pro, transform=trans_comp)
pub_dl = DataLoader(pub_dataset, batch_size=32, shuffle=False)

pub_pred_list = []
model.eval()
with torch.no_grad():
    for np_araay, medicals, img_path in pub_dl:
        inputs, medicals= np_araay.float().to(device), medicals.float().to(device)
        preds = model(inputs, medicals)
        pred = list(preds.cpu().argmax(1))
        pub_pred_list += pred

y_pub = [x.item() for x in pub_pred_list]

# private
source_df_pri = pd.read_csv(r'..\Private Testing Dataset\test_datalist_private.csv')
source_df_pri['wave_path'] = source_df_pri['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}.wav')
# make_mfcc(source_df_pri)
source_df_pri['mfcc_path'] = source_df_pri['ID'].apply(lambda x: f'..\\Private Testing Dataset\\test_data_private\\{x}_mfcc_13.npy')

source_df_pri_pro = medical_data_proccessing(source_df_pri)
source_df_pri_pro = pd.get_dummies(source_df_pri_pro, columns=['Smoking', 'frequency', 'Onset of dysphonia ', 'Noise at work', 'Diurnal pattern', 'Occupational vocal demand'])

pri_dataset = CustomImageDataset_test(source_df_pri_pro, transform=trans_comp)
pri_dl = DataLoader(pri_dataset, batch_size=32, shuffle=False)

pri_pred_list = []
model.eval()
with torch.no_grad():
    for np_araay, medicals, img_path in pri_dl:
        inputs, medicals= np_araay.float().to(device), medicals.float().to(device)
        preds = model(inputs, medicals)
        pred = list(preds.cpu().argmax(1))
        pri_pred_list += pred

y_pri = [x.item() for x in pri_pred_list]

# combine submission
tem_pub_pri = pd.read_csv(r'..\Private Testing Dataset\submission_template_public+private.csv',header=None)
y_pub_pri = y_pub + y_pri
tem_pub_pri[1] = [x + 1 for x in y_pub_pri]
tem_pub_pri.to_csv('pub_pri_mfcc_13.csv', header=False, index=False)
