## 載入函式庫 & 掛載雲端硬碟

In [None]:
import os
from google.colab import drive
import copy
import numpy
import numpy as np
import pandas as pd
import librosa

import tensorflow as tf
from tensorflow.keras.layers import Activation, BatchNormalization, Dense, LayerNormalization
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras import datasets, layers, models

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, recall_score
from sklearn.preprocessing import MinMaxScaler

from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn import svm
from sklearn.metrics import classification_report, accuracy_score

import scipy.io.wavfile
from scipy.fftpack import dct

drive.mount('/content/drive')

os.chdir('/content/drive/MyDrive/Colab Notebooks/Voice-Diease/')

Mounted at /content/drive


## 載入訓練資料

In [None]:
# 讀取訓練資料集表單
source_df = pd.read_csv('./data/training_datalist.csv')

print("source_df.shape :", source_df.shape)

class_weight = {0: 1.,
          1: 2.436,
          2: 3.19,
          3: 12.182,
          4: 16.75,}

source_df.shape : (1000, 28)


## 切分訓練與驗證資料

In [None]:
training_df, validate_df = train_test_split(source_df, test_size=0.15, random_state=333)

print("training_df shape :", training_df.shape, ", test_df shape :", validate_df.shape)

training_df shape : (850, 28) , test_df shape : (150, 28)


## 聲學音檔轉換MFCC特徵

In [None]:
def MFCCs(filename):
  # setup
  sample_rate, raw_signal = scipy.io.wavfile.read(filename) # File assumed to be in the same directory
  signal = np.zeros(int(3.5*44100)) #fix audio to 3.5 seconds
  if(len(raw_signal) <= len(signal)):
    signal[:len(raw_signal)] = raw_signal
  else:
    signal = raw_signal[:len(signal)]

  emphasized_signal = signal
  # #pre-emphasis
  # pre_emphasis = 0.97
  # emphasized_signal = numpy.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])

  #framing
  frame_size = 0.025
  frame_stride = 0.01

  frame_length, frame_step = frame_size * sample_rate, frame_stride * sample_rate  # Convert from seconds to samples
  signal_length = len(emphasized_signal)
  frame_length = int(round(frame_length))
  frame_step = int(round(frame_step))
  num_frames = int(numpy.ceil(float(numpy.abs(signal_length - frame_length)) / frame_step))  # Make sure that we have at least 1 frame

  pad_signal_length = num_frames * frame_step + frame_length
  z = numpy.zeros((pad_signal_length - signal_length))
  pad_signal = numpy.append(emphasized_signal, z) # Pad Signal to make sure that all frames have equal number of samples without truncating any samples from the original signal

  indices = numpy.tile(numpy.arange(0, frame_length), (num_frames, 1)) + numpy.tile(numpy.arange(0, num_frames * frame_step, frame_step), (frame_length, 1)).T
  frames = pad_signal[indices.astype(numpy.int32, copy=False)]

  #window
  frames *= numpy.hamming(frame_length)
  # frames *= 0.54 - 0.46 * numpy.cos((2 * numpy.pi * n) / (frame_length - 1))  # Explicit Implementation **

  #Fourier-Transform and Power Spectrum
  NFFT = 512
  mag_frames = numpy.absolute(numpy.fft.rfft(frames, NFFT))  # Magnitude of the FFT
  pow_frames = ((1.0 / NFFT) * ((mag_frames) ** 2))  # Power Spectrum

  #Filter Banks
  nfilt =40
  low_freq_mel = 0
  high_freq_mel = (2595 * numpy.log10(1 + (sample_rate / 2) / 700))  # Convert Hz to Mel
  mel_points = numpy.linspace(low_freq_mel, high_freq_mel, nfilt + 2)  # Equally spaced in Mel scale
  hz_points = (700 * (10**(mel_points / 2595) - 1))  # Convert Mel to Hz
  bin = numpy.floor((NFFT + 1) * hz_points / sample_rate)

  fbank = numpy.zeros((nfilt, int(numpy.floor(NFFT / 2 + 1))))
  for m in range(1, nfilt + 1):
      f_m_minus = int(bin[m - 1])   # left
      f_m = int(bin[m])             # center
      f_m_plus = int(bin[m + 1])    # right

      for k in range(f_m_minus, f_m):
          fbank[m - 1, k] = (k - bin[m - 1]) / (bin[m] - bin[m - 1])
      for k in range(f_m, f_m_plus):
          fbank[m - 1, k] = (bin[m + 1] - k) / (bin[m + 1] - bin[m])
  filter_banks = numpy.dot(pow_frames, fbank.T)
  filter_banks = numpy.where(filter_banks == 0, numpy.finfo(float).eps, filter_banks)  # Numerical Stability
  filter_banks = 20 * numpy.log10(filter_banks)  # dB
  filter_banks -= (numpy.mean(filter_banks, axis=0) + 1e-8)

  return filter_banks

In [None]:
def audioData_preprocessing(input_df, file_path):
  id_list = input_df.ID.tolist()
  data_size = input_df.shape[0]
  audio_feature = []
  for i in range(0, data_size):
    audio_feature.append(MFCCs(file_path + "{}.wav".format(id_list[i])))

  return np.array(audio_feature)

In [None]:
y_train = pd.get_dummies(training_df, columns=['Disease category']).to_numpy()[:,-5:].astype('float32')
x_train = audioData_preprocessing(training_df, file_path = "./data/training_data/")
print(x_train.shape, y_train.shape)

y_val = pd.get_dummies(validate_df, columns=['Disease category']).to_numpy()[:,-5:].astype('float32')
x_val = audioData_preprocessing(validate_df, file_path = "./data/training_data/")
print(x_val.shape, y_val.shape)

(850, 348, 40) (850, 5)
(150, 348, 40) (150, 5)


## 模型

In [None]:
def create_AudioModel(input_shape):
  model = Sequential()
  model.add(layers.Input(shape=(348, 40, 1)))
  #model.add(layers.Input(shape=(351, 39, 1)))
  model.add(layers.Conv2D(32, (3, 3), padding='same'))
  model.add(LayerNormalization())
  model.add(Activation('relu'))
  model.add(layers.MaxPooling2D())
  #model.add(layers.Dropout(0.2))

  model.add(layers.Conv2D(64, (3, 3), padding='same'))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(layers.MaxPooling2D())
  #model.add(layers.Dropout(0.2))

  model.add(layers.Conv2D(96, (3, 3), padding='same'))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(layers.MaxPooling2D())
  #model.add(layers.Dropout(0.2))

  model.add(layers.Conv2D(192, (3, 3), padding='same'))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(layers.MaxPooling2D())
  #model.add(layers.Dropout(0.2))

  model.add(layers.Flatten())
  model.add(layers.Dense(64))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  #model.add(layers.Dropout(0.2))
  model.add(layers.Dense(5, activation='softmax'))
  model.summary()

  model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
  )
  return model

## 訓練

In [None]:
audio_model = create_AudioModel(input_shape = x_train.shape[1])
print(x_train.shape)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 348, 40, 32)       320       
                                                                 
 layer_normalization (LayerN  (None, 348, 40, 32)      64        
 ormalization)                                                   
                                                                 
 activation (Activation)     (None, 348, 40, 32)       0         
                                                                 
 max_pooling2d (MaxPooling2D  (None, 174, 20, 32)      0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 174, 20, 64)       18496     
                                                                 
 batch_normalization (BatchN  (None, 174, 20, 64)      2

In [None]:
train_history = audio_model.fit(x_train, y_train, batch_size=256, epochs=3000, class_weight = class_weight,
                                callbacks=[EarlyStopping(monitor='loss', patience=2000, mode='auto'),
                                           ModelCheckpoint("audio_model.h5", save_best_only=True, monitor='val_accuracy')
                                          ], 
                                validation_data=(x_val, y_val)
                                )

score = audio_model.evaluate(x_val, y_val, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

[1;30;43m串流輸出內容已截斷至最後 5000 行。[0m
Epoch 502/3000
Epoch 503/3000
Epoch 504/3000
Epoch 505/3000
Epoch 506/3000
Epoch 507/3000
Epoch 508/3000
Epoch 509/3000
Epoch 510/3000
Epoch 511/3000
Epoch 512/3000
Epoch 513/3000
Epoch 514/3000
Epoch 515/3000
Epoch 516/3000
Epoch 517/3000
Epoch 518/3000
Epoch 519/3000
Epoch 520/3000
Epoch 521/3000
Epoch 522/3000
Epoch 523/3000
Epoch 524/3000
Epoch 525/3000
Epoch 526/3000
Epoch 527/3000
Epoch 528/3000
Epoch 529/3000
Epoch 530/3000
Epoch 531/3000
Epoch 532/3000
Epoch 533/3000
Epoch 534/3000
Epoch 535/3000
Epoch 536/3000
Epoch 537/3000
Epoch 538/3000
Epoch 539/3000
Epoch 540/3000
Epoch 541/3000
Epoch 542/3000
Epoch 543/3000
Epoch 544/3000
Epoch 545/3000
Epoch 546/3000
Epoch 547/3000
Epoch 548/3000
Epoch 549/3000
Epoch 550/3000
Epoch 551/3000
Epoch 552/3000
Epoch 553/3000
Epoch 554/3000
Epoch 555/3000
Epoch 556/3000
Epoch 557/3000
Epoch 558/3000
Epoch 559/3000
Epoch 560/3000
Epoch 561/3000
Epoch 562/3000
Epoch 563/3000
Epoch 564/3000
Epoch 565/3000
Epoch

## 結果

In [None]:
audio_model = load_model("audio_model.h5")

In [None]:
y_pred = audio_model.predict(x_train).argmax(axis=1)
y_true = training_df['Disease category'] - 1

results_recall = recall_score(y_true, y_pred, average=None)
print("Training UAR(Unweighted Average Recall) :", results_recall.mean())
ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')

Training UAR(Unweighted Average Recall) : 0.9947383784572722


<sklearn.metrics._plot.confusion_matrix.ConfusionMatrixDisplay at 0x7f63405b7df0>

In [None]:
y_pred = audio_model.predict(x_val).argmax(axis=1)
y_true = validate_df['Disease category'] - 1

results_recall = recall_score(y_true, y_pred, average=None)
print("Test UAR(Unweighted Average Recall) :", results_recall.mean())
ConfusionMatrixDisplay(confusion_matrix(y_true, y_pred)).plot(cmap='Blues')