In [None]:
#Train Heart/Lung Sound Model One ~ Mboalab(Improve Digital Stethoscope AI App) ~ Outreachy 2023 
#Author: Sumaya Ahmed Salihs 
#Inpired Project: https://www.kaggle.com/code/mohammadrezashafie/par-ebi

In [None]:
# Imports all libraies needed to run this project
import glob
import os
import librosa as lib
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import librosa.display
import numpy as np
import IPython.display as ipd
import shutil
import soundfile
from tqdm import tqdm
from sklearn.model_selection import StratifiedShuffleSplit
import shutil
from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
# from mlxtend.plotting import plot_confusion_matrix

In [None]:
import os
os.chdir('[Home_dir_path]/Desktop/HeartBeatSounds/archive/')
dir_path='[Home_dir_path]/Desktop/HeartBeatSounds/archive/'

In [None]:
#Extracts the file names from the audio files
def get_fileNames(path):
    fileNames = [file for file in glob.glob(path)]
    return fileNames

In [None]:
# Creating dataframe from the labelled audio files having duration equal to and more than 3 seconds
def create_dataframe(dataframe_name):
    audio = {'file_path': [], 'label': []}
    unlabeled_files = ['Aunlabelledtest', 'Bunlabelledtest']
    for folder in [dir_path+'set_a/', dir_path+'set_b/']:
      fileNames = get_fileNames(folder + '//**')
      for file in fileNames:
        label = os.path.basename(file).split('_')[0]
        if((lib.get_duration(filename=file)>3) and (label not in unlabeled_files)):
          audio['file_path'].append(file)
          audio['label'].append(label)

    dataframe_name = pd.DataFrame(audio)
    return dataframe_name

In [None]:
# Creating a dataframe for the input audio data
raw_data = create_dataframe('raw_data')
raw_data
# print(raw_data.size)

In [None]:
type(raw_data)

In [None]:
# Counting the number of each labels in the dataframe
raw_data.label.value_counts()

In [None]:
# Plotting Data Category Distribution
def data_distribution(data):
    plt.figure(figsize=(16,3))
    data.label.value_counts().plot(kind='bar', title="Data Category distribution")
    plt.show()

In [None]:
# Plotting spectogram of an audio signal
def spectrogram(file_path, label):
  y, sr = lib.load(file_path)
  plt.figure(figsize=(16,3))
  plt.title(label + 'Log-Frequency Power Spectrogram')
  data = lib.amplitude_to_db(np.abs(lib.stft(y)), ref=np.max)
  lib.display.specshow(data, y_axis='log', x_axis='time')
  plt.colorbar();

In [None]:
import librosa.display

In [None]:
# finding the unique labels in the raw dataset
unique_labels = raw_data.label.unique()
unique_labels

In [None]:
# Plotting Waveform of an audio signal
def waveform(file_path, label):
  y, sr = lib.load(file_path)
  plt.figure(figsize=(16, 3))
  plt.title(label + ' Sound Wave')
  librosa.display.waveplot(y)
  # librosa.display.waveplot(y, sr)


In [None]:
# Plotting graphs for Murmur Heartbeat Sound
path = raw_data[raw_data.label==unique_labels[0]].file_path.iloc[3]
print(path)
waveform(path, unique_labels[0])
spectrogram(path, unique_labels[0])
ipd.Audio(path)

In [None]:
# Plotting graphs for Extrahls Heartbeat Sound
path = raw_data[raw_data.label==unique_labels[1]].file_path.iloc[0]
print(path)
waveform(path, unique_labels[1])
spectrogram(path, unique_labels[1])
ipd.Audio(path)

In [None]:
#Plotting graphs for Artifact Heartbeat Sound
path = raw_data[raw_data.label==unique_labels[2]].file_path.iloc[0]
print(path)
waveform(path,unique_labels[2])
spectrogram(path,unique_labels[2])
ipd.Audio(path)

In [None]:
#Plotting graphs for Normal Heartbeat Sound
path = raw_data[raw_data.label==unique_labels[3]].file_path.iloc[0]
print(path)
waveform(path,unique_labels[3])
spectrogram(path,unique_labels[3])
ipd.Audio(path)

In [None]:
#Plotting graphs for Extrastole Heatbeat Sound
path = raw_data[raw_data.label==unique_labels[4]].file_path.iloc[4]
print(path)
waveform(path,unique_labels[4])
spectrogram(path,unique_labels[4])
ipd.Audio(path)

**Creating new audio files by using Data Augmentation Technique**

In [None]:
# Changing only the speed of an audio signal with different rates and saving it
def changing_speed(speed_rate, src_path, dst_path):
    files = get_fileNames(src_path + "//**")
    if not os.path.exists(dst_path):
      os.makedirs(dst_path)
    for file in tqdm(files):
      label = os.path.basename(file).split('.')[0]
      y, sr = lib.load(file)
      updated_y = lib.effects.time_stretch(y, rate=speed_rate)
      soundfile.write(dst_path + '//' + label + '_' + str(speed_rate) + ".wav", updated_y, sr)

In [None]:
# Changing only the pitch of an audio signal with different steps and saving it
def changing_pitch(step, src_path, dst_path):
    files = get_fileNames(src_path + '//**')
    if not os.path.exists(dst_path):
      os.makedirs(dst_path)
    for file in tqdm(files):
      label = os.path.basename(file).split('.')[0]
      y, sr = lib.load(file)
      updated_y = lib.effects.pitch_shift(y, sr, n_steps=step)
      soundfile.write(dst_path + '//' + label + '_' + str(step) + '.wav', updated_y, sr)

In [None]:
# Creating new files from changing pitch and speed of the input audio files
def sound_augmentation(src_path, dst_path):
    speed_rates = [1.08, 0.8, 1.10, 0.9]
    for speed_rate in speed_rates:
        changing_speed(speed_rate, src_path, dst_path)


    steps = [2, -2, 2.5, -2.5]
    for step in steps:
        changing_pitch(step, src_path, dst_path)

    files = get_fileNames(src_path + '//**')
    for f in files:
      shutil.copy(f, dst_path)


In [None]:
#ensure you are in the archive directory
!mkdir working
!mkdir working/Data2

In [None]:
import os
import shutil

source = dir_path+'set_a/'
destination = dir_path+'working/Data2/'

# Get a list of files in the source directory
file_list = os.listdir(source)

# Iterate over the files and copy them to the destination directory
for file_name in file_list:
    source_file = os.path.join(source, file_name)
    destination_file = os.path.join(destination, file_name)
    shutil.copy2(source_file, destination_file)


In [None]:
source = dir_path+'set_b/'
destination = 'working/Data2/'

# Get a list of files in the source directory
file_list = os.listdir(source)

# Iterate over the files and copy them to the destination directory
for file_name in file_list:
    source_file = os.path.join(source, file_name)
    destination_file = os.path.join(destination, file_name)
    shutil.copy(source_file, destination_file)


In [None]:
os.chdir(dir_path+'working')
!mkdir OUT

In [None]:

# Checking and creating new directory for saving newly generated audio files using data augmentation
if os.path.exists(dir_path+'working/OUT'):
  if len(get_fileNames(dir_path+'working/OUT//**')) == 4175:
      print('Sound Augmentation Already Done and Saved')
  else:
      shutil.rmtree(dir_path+'working/OUT')
      sound_augmentation(dir_path+'working/Data2', dir_path+'working/OUT')
else:
    sound_augmentation(dir_path+'working/Data2', dir_path+'working/OUT')


In [None]:
# Creating dataframe from the labeled audio files having duration equal to and more than 3 seconds
def create_dataframe(dataframe_name):
    audio = {'file_path':[], 'label':[]}
    unlabeled_files = ['Aunlabelledtest', 'Bunlabelledtest']
    for folder in [dir_path+'working/OUT/']:
        files = get_fileNames(folder + '//**')
        for file in files:
            label = os.path.basename(file).split('_')[0]
            if((lib.get_duration(filename=file)>=3) and (label not in unlabeled_files)):
              audio['file_path'].append(file)
              audio['label'].append(label)

    dataframe_name = pd.DataFrame(audio)
    return dataframe_name



In [None]:
# Creating new dataframe from the Updated Audio Dataset
final_data = create_dataframe('final_data')
final_data

In [None]:
#Counting the number of lables in the final dataframe
final_data.label.value_counts()

In [None]:
#Data Distribution Graph
data_distribution(final_data)

In [None]:
#Creating waveform for the normal and changed speed sound wave
path = final_data[final_data.label==unique_labels[4]].file_path.iloc[4]
waveform(dir_path+"working/OUT//normal_noisynormal_101_1305030823364_B.wav","Normal(1x)")
waveform(dir_path+"working/OUT///normal_noisynormal_101_1305030823364_B_1.08.wav","Normal(1.08x)")

In [None]:
#Creating waveform for the normal and changed pitch sound wave
path = final_data[final_data.label==unique_labels[4]].file_path.iloc[4]
waveform(dir_path+"working/OUT//normal_noisynormal_101_1305030823364_B.wav","Normal")
waveform(dir_path+"working/OUT//normal_noisynormal_101_1305030823364_B_2.5.wav","Normal(pitch=2.5)")

**Data Splitting into Training and Testing Data sets**



In [None]:
# Splitting the Data into Training Data and Testing Data in the proportion of 80:20 (Train:Test)
split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_idx, test_idx in split.split(final_data, final_data.label):
    train_data = final_data.loc[train_idx]
    test_data = final_data.loc[test_idx]

In [None]:
#Total Training data
train_data

In [None]:
#Total Testing data
test_data

MFCC Feature Extraction from Audio Files

In [None]:
#Setting the sampling audio rate to 22050, setting the duration only to 3 seconds and finally
# extraction of MFCC features
def feature_extraction(file_path):
    y, sr = lib.load(file_path, duration=3)
    mfcc = lib.feature.mfcc(y=y, sr=sr,n_mfcc=128)
    return mfcc

In [None]:
x_train = np.asarray([feature_extraction(train_data.file_path.iloc[i]) for i in (range(len(train_data)))])
x_test = np.asarray([feature_extraction(test_data.file_path.iloc[i]) for i in (range(len(test_data)))])

In [None]:
# Changing the shape of the Training and Testing inputs to (3340, 128, 130, 1) to meet the CNN input requirements
x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], x_test.shape[2], 1)
print('X_Train Shape: ', x_train.shape)
print('X_Test Shape: ', x_test.shape)


In [None]:
# Encode the labels into numbers from string values
encode = LabelEncoder()
y_train = encode.fit_transform(train_data.label)
y_test = encode.fit_transform(test_data.label)

In [None]:
# Setting 5 labels for each audio example with their probabilities
y_train = to_categorical(y_train, num_classes=5)
y_test = to_categorical(y_test, num_classes=5)
print('Y_Train Shape: ', y_train.shape)
print('Y_Test Shape: ', y_test.shape)

**CNN Model Creation**

In [None]:
#Creating a CNN model
def CNN_model(n_width,n_height,n_channels,n_dropout,n_classes):
    cnn_model = Sequential()

    cnn_model.add(Conv2D(filters=24, kernel_size=(5,5), strides=(1, 1),input_shape=(n_width,n_height,n_channels), activation ='relu'))
    cnn_model.add(MaxPooling2D((4, 2), strides=(4, 2)))

    cnn_model.add(Conv2D(filters=48, kernel_size=(5,5), padding = 'valid', activation ='relu'))
    cnn_model.add(MaxPooling2D((4, 2), strides=(4, 2)))

    cnn_model.add(Conv2D(filters=48, kernel_size=(5,5), padding = 'valid', activation ='relu'))

    cnn_model.add(Flatten())
    cnn_model.add(Dropout(rate=n_dropout))

    cnn_model.add(Dense(64, activation ='relu'))
    cnn_model.add(Dropout(rate=n_dropout))

    cnn_model.add(Dense(n_classes, activation ='softmax'))

    return cnn_model


In [None]:
cnn_model = CNN_model(x_train.shape[1], x_train.shape[2], x_train.shape[3], 0.5, len(encode.classes_))


**Setting Hyperparameters for the model**

In [None]:
#Setting the learning rate and loss function for the model
optimizer = Adam(learning_rate=0.0001)
cnn_model.compile(loss='categorical_crossentropy', metrics=['accuracy'], optimizer=optimizer)
cnn_model.summary()

**Training and Saving the Best Model with Minimum Loss**

In [None]:
cnn_model.summary()

In [None]:
%%time
%cd dir_path+woking #redundant, likely already in that directory
# Training the model over 300 times and having a batch size of 128 and saving the best model in a .hdf5 file
epochs = 300
batch_size = 128
file = 'cnn_heartbeat_classifier.hdf5'
path = os.path.join(file)

file_last = 'LAST_MODEL.hdf5'
path1 = os.path.join(file_last)

checkpoints_0 = ModelCheckpoint(filepath=path, save_best_only=True, verbose=1)
checkpoints_1 = ModelCheckpoint(filepath=path1, save_best_only=False, verbose=1)

cnn_history = cnn_model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test),
                            callbacks=[checkpoints_0, checkpoints_1], verbose=1)


**Accuracy and Loss Graphs**

In [None]:
# Displaying the best training accuracy and loss score
testing_scores = cnn_model.evaluate(x_test, y_test)
print('Least Testing Loss:', testing_scores[0])
print('Best Testing Accuracy:', testing_scores[1])


In [None]:
#Calculating and displaying the Precison, Recall and F1 score for each class
preds = cnn_model.predict(x_test)
y_actual = []
y_pred = []

labels = encode.classes_
for idx, pred in enumerate(preds):
    y_actual.append(labels[np.argmax(y_test[idx])])
    y_pred.append(labels[np.argmax(pred)])

print(classification_report(y_pred, y_actual))

In [None]:
cnn_history_history = cnn_history.history

In [None]:
plt.figure(figsize=(16,6))
plt.plot(cnn_history_history['accuracy'], color = "b")
plt.plot(cnn_history_history['val_accuracy'], color = 'r')
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Training Accuracy','Testing Accuracy'],loc='upper left')

In [None]:
plt.figure(figsize=(16,8))
plt.plot(cnn_history_history['loss'], color = "b")
plt.plot(cnn_history_history['val_loss'], color="r")
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Training Loss','Testing Loss'],loc='upper right')

In [None]:
#ConfusionMatrixDisplay requires a higher version(sklearn>=1.0.0) sklearn, so function doesn't work, current version of sklearn is 0.21.3
# cm_mat = confusion_matrix(y_actual,y_pred, labels=['normal', 'artifact', 'extrahls', 'murmur', 'extrastole'])
# cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = cm_mat, display_labels = [False, True])
# cm_display.plot()
# plt.show()

In [None]:
loss, acc = cnn_model.evaluate(x_test, y_test, verbose=2)
print('Restored model, accuracy: {:5.2f}%'.format(100 * acc))


In [None]:
cnn_model.save('file_name')

In [None]:
import tensorflow as tf
fetched_model = tf.keras.models.load_model('file_name')
fetched_model.summary()

In [None]:
os.getcwd()
keras_model = tf.keras.models.load_model('file_name') #my_model.h5
converter = tf.lite.TFLiteConverter.from_keras_model(keras_model)
converter.experimental_new_converter = True 
tflite_model = converter.convert()
open('file_name.tflite', "wb").write(tflite_model)