In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import pydot
import kymatio
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from termcolor import colored
import matplotlib.pyplot as plt 
import plotly.graph_objects as go
import plotly.figure_factory as ff
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score
from kymatio import Scattering1D
import librosa
import warnings
warnings.filterwarnings('ignore')
%matplotlib inline

In [None]:
from sklearn.model_selection import GridSearchCV
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, Activation, Input, Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
import tensorflow as tf
#import tensorflow_io as tfio

print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))


In [None]:
gpu_options = tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=0.5)
sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(gpu_options=gpu_options))
tf.compat.v1.keras.backend.set_session(sess)

In [None]:
epochs = 20

model_file_path = "best_model_file.hdf5"
checkpoint = ModelCheckpoint(model_file_path, monitor='val_accuracy', verbose=0, save_best_only=True, mode='max')

earlystop = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                          min_delta=0,
                          patience=5,  # <-- Corrected value
                          verbose=1,
                          restore_best_weights=True
                          )

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss',
                              factor=0.2,
                              patience=5,
                              verbose=1,
                              min_delta=0.0001)

csv_logger = tf.keras.callbacks.CSVLogger('Model.log')

# Define a learning rate schedule
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
    initial_learning_rate,
    decay_steps=10000,
    end_learning_rate=0.000001,
    power=0.5,
    cycle=False
)


callbacks = [earlystop, csv_logger, reduce_lr, checkpoint]

In [None]:
os.listdir(r'D:\MIET_HeartSound\Dataset\Dataset2\heart_sound')
data = os.path.join(r'D:\MIET_HeartSound\Dataset\Dataset2\heart_sound')
data

In [None]:
train_dir_path = os.path.join(data, 'train')
valid_dir_path = os.path.join(data, 'val')

In [None]:
healthy = os.path.join(train_dir_path, 'healthy', 'a0007.wav')
unhealthy = os.path.join(train_dir_path, 'unhealthy', 'a0002.wav')

In [None]:
def load_wav_16k_mono(filename):
    # Load encoded wav file
    file_contents = tf.io.read_file(filename)
    # Decode wav (tensors by channels)
    wav, sample_rate = tf.audio.decode_wav(file_contents, desired_channels=1)
    print(wav, sample_rate)
    # Removes trailing axis
    wav = tf.squeeze(wav, axis=-1)
    sample_rate = tf.cast(sample_rate, dtype=tf.int64)
    print(wav, sample_rate)
    
    # Goes from 44100Hz to 16000hz - amplitude of the audio signal
#     wav = tfio.audio.resample(wav, rate_in=sample_rate, rate_out=16000)
    return wav

In [None]:
healthy_heart_train = tf.data.Dataset.list_files(train_dir_path+'/healthy'+'/*.wav')
unhealthy_heart_train = tf.data.Dataset.list_files(train_dir_path+'/unhealthy'+'/*.wav')
#healthy_heart_valid = tf.data.Dataset.list_files(valid_dir_path+'/healthy'+'/*.wav')
#unhealthy_heart_valid = tf.data.Dataset.list_files(valid_dir_path+'/unhealthy'+'/*.wav')

In [None]:
hhl_train = tf.data.Dataset.zip((healthy_heart_train, tf.data.Dataset.from_tensor_slices(tf.ones(len(healthy_heart_train)))))
uhl_train = tf.data.Dataset.zip((unhealthy_heart_train, tf.data.Dataset.from_tensor_slices(tf.zeros(len(unhealthy_heart_train)))))
train_data = hhl_train.concatenate(uhl_train)

#hhl_valid = tf.data.Dataset.zip((healthy_heart_valid, tf.data.Dataset.from_tensor_slices(tf.ones(len(healthy_heart_valid)))))
#uhl_valid = tf.data.Dataset.zip((unhealthy_heart_valid, tf.data.Dataset.from_tensor_slices(tf.zeros(len(unhealthy_heart_valid)))))
#valid_data = hhl_valid.concatenate(uhl_valid)

In [None]:
# Calculate the lengths
train_length = tf.data.experimental.cardinality(train_data).numpy()
#valid_length = tf.data.experimental.cardinality(valid_data).numpy()

# Print the lengths
print("Length of train_data:", train_length)
#print("Length of valid_data:", valid_length)

In [None]:
def extract_mfcc(file_path, label, wav_length=30000):
    def mfccs(wav):
        # Ensure wav is a numpy array
        wav = wav.numpy()
        
        mfcc = librosa.feature.mfcc(y=wav, sr=2000, n_mfcc=7)
        return mfcc

    wav = load_wav_16k_mono(file_path)
    wav = tf.cast(wav, dtype=tf.float32)  # Ensure wav is float32
    wav = wav / tf.reduce_max(tf.abs(wav))
    wav = wav[:wav_length] if tf.shape(wav)[0] > wav_length else tf.pad(wav, [(0, wav_length - tf.shape(wav)[0])], "CONSTANT")

    # Using tf.py_function to wrap the scattering transform
    mfcc = tf.py_function(mfccs, [wav], tf.float32)
    mfcc = tf.abs(mfcc)
    # You might need to set the shape of the output manually if required
    #scattering_transform.set_shape((7, 157))  # Set the correct shape based on your scattering output
    mfcc = tf.expand_dims(mfcc, axis=2)
    print('scattering_transform',mfcc.shape, type(mfcc))
    return mfcc, label

In [None]:
iterator = hhl_train.shuffle(buffer_size=10000).as_numpy_iterator()
iterator.next()
filepath, label = next(iterator)
print(filepath, label)

In [None]:
mfcc, label = extract_mfcc(filepath, label)
print(mfcc.shape)

In [None]:
train_data = train_data.map(extract_mfcc)
train_data = train_data.cache()
train_data = train_data.shuffle(buffer_size=1000)
train_data = train_data.batch(4)
train_data = train_data.prefetch(2)

In [None]:
num_samples = tf.data.experimental.cardinality(train_data).numpy()
print(f"Number of samples in train_data: {num_samples}")

In [None]:
import tensorflow as tf

# Assuming train_data is your complete dataset
total_samples = len(list(train_data.as_numpy_iterator()))  # Get the total number of samples in the dataset

# Define split proportions
train_size = int(total_samples * 0.7)
val_size = int(total_samples * 0.20)
test_size = total_samples - train_size - val_size  # Ensures all data is used and accounts for rounding

# Create the datasets
train = train_data.take(train_size)
val = train_data.skip(train_size).take(val_size)
test = train_data.skip(train_size + val_size)

# Example of extracting a batch from the train dataset to check shapes
samples, labels = next(iter(train.batch(1)))
print("Sample shape:", samples.shape)
print("Labels:", labels)

# This gives you three datasets: train, val, and test
# You can iterate over these datasets as needed for training and evaluation


In [None]:
# Calculate the lengths
train_length = tf.data.experimental.cardinality(train).numpy()
val_length = tf.data.experimental.cardinality(val).numpy()
test_length = tf.data.experimental.cardinality(test).numpy()

# Print the lengths
print("Length of train:", train_length)
print("Length of validation:", val_length)
print("Length of test:", test_length)

CNN

In [None]:
def cnn():
    model = Sequential()
    model.add(Conv2D(16, (2,2), activation='relu', input_shape=(7, 59, 1)))
    model.add(Conv2D(32, (2,2), activation='relu'))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    return model

In [None]:
# Create an optimizer with the learning rate schedule
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
model0 = cnn()
model0.compile(optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"])
model0.summary()

In [None]:
#tf.keras.utils.plot_model(model0, 
           #show_shapes = True,
          # show_dtype= True,
           #show_layer_names = True,
           #show_layer_activations= True,
           #rankdir = 'TB', 
           #expand_nested = False, 
           #dpi = 70)

In [None]:
batch_size = 16

history0 = model0.fit(train, 
                    batch_size=batch_size, 
                    epochs=30,
                    validation_data=test,
                    verbose=1,
                    )

In [None]:
model0.save("Model-mfcc-dummy1.h5")
print('Model save to Disk')

In [None]:
df = pd.DataFrame(history0.history) 
#df.to_excel("output.xlsx")
#df.to_csv("output.csv")
df.head(50)

In [None]:
# Interpreting the Metrics
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss' ]):
    ax[i].plot(history0.history[met])
    ax[i].plot(history0.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])
plt.savefig("Model0-Results.png")

In [None]:
%%time
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
results0 = model0.evaluate(test)
print("test loss, test acc:", results0)

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Step 4: Make predictions and evaluate on the test set
true_labels = []
predicted_labels = []

for samples, labels in test.as_numpy_iterator():
    predictions = model0.predict(samples)
    predicted_classes = (predictions > 0.5).astype(int)  # Adjust threshold as needed
    true_labels.extend(labels)
    predicted_labels.extend(predicted_classes)

true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Step 5: Classification report and confusion matrix
print(classification_report(true_labels, predicted_labels))
cm = confusion_matrix(true_labels, predicted_labels)
print(cm)

# Plot the confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()


BiLSTM

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Define the model architecture
model2 = models.Sequential([
    # Since the input includes a singleton dimension, you can use a Lambda layer to remove it
    layers.Lambda(lambda x: tf.squeeze(x, axis=-1), input_shape=(7, 59, 1)),
    # First BiLSTM layer, return sequences to pass to another LSTM layer
    layers.Bidirectional(layers.LSTM(64, return_sequences=True)),
    # Second BiLSTM layer, no need to return sequences as this is the final LSTM layer
    layers.Bidirectional(layers.LSTM(32)),
    # Dense layer for interpretation
    layers.Dense(64, activation='relu'),
    # Dropout for regularization
    layers.Dropout(0.5),
    # Output layer
    layers.Dense(1, activation='sigmoid')
])
 

In [None]:
# Create an optimizer with the learning rate schedule
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
model2.compile(optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"])
model2.summary()

In [None]:
history2 = model2.fit(train_data, epochs=50, verbose=1, validation_data=val,batch_size=16)

In [None]:
model2.save("Model-Mfcc-bilstm-dummy1.h5")
print('Model save to Disk')

In [None]:
df = pd.DataFrame(history2.history) 
#df.to_excel("output.xlsx")
#df.to_csv("output.csv")
df.head(50)

In [None]:
# Interpreting the Metrics
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss' ]):
    ax[i].plot(history2.history[met])
    ax[i].plot(history2.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])
plt.savefig("Model2-Results.png")

In [None]:
%%time
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
results0 = model2.evaluate(test)
print("test loss, test acc:", results0)

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Step 4: Make predictions and evaluate on the test set
true_labels = []
predicted_labels = []

for samples, labels in test.as_numpy_iterator():
    predictions = model2.predict(samples)
    predicted_classes = (predictions > 0.5).astype(int)  # Adjust threshold as needed
    true_labels.extend(labels)
    predicted_labels.extend(predicted_classes)

true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Step 5: Classification report and confusion matrix
print(classification_report(true_labels, predicted_labels))
cm = confusion_matrix(true_labels, predicted_labels)
print(cm)

# Plot the confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()


BiRNN

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Define the model architecture
model3 = models.Sequential([
    # Since the input includes a singleton dimension, you can use a Lambda layer to remove it
    layers.Lambda(lambda x: tf.squeeze(x, axis=-1), input_shape=(7, 59, 1)),
    # First BiLSTM layer, return sequences to pass to another LSTM layer
    layers.Bidirectional(layers.SimpleRNN(64, return_sequences=True)),
    # Second BiLSTM layer, no need to return sequences as this is the final LSTM layer
    layers.Bidirectional(layers.SimpleRNN(32)),
    # Dense layer for interpretation
    layers.Dense(64, activation='relu'),
    # Dropout for regularization
    layers.Dropout(0.5),
    # Output layer
    layers.Dense(1, activation='sigmoid')
])
 

In [None]:
# Create an optimizer with the learning rate schedule
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
model3.compile(optimizer=optimizer, loss="binary_crossentropy", metrics=["accuracy"])
model3.summary()

In [None]:
history3 = model3.fit(train_data, epochs=50, verbose=1, validation_data=val, batch_size=16)

In [None]:
model3.save("Model-MFCC-birnn-dummy1.h5")
print('Model save to Disk')

In [None]:
df = pd.DataFrame(history3.history) 
#df.to_excel("output.xlsx")
#df.to_csv("output.csv")
df.head(50)

In [None]:
# Interpreting the Metrics
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax = ax.ravel()

for i, met in enumerate(['accuracy', 'loss' ]):
    ax[i].plot(history3.history[met])
    ax[i].plot(history3.history['val_' + met])
    ax[i].set_title('Model {}'.format(met))
    ax[i].set_xlabel('epochs')
    ax[i].set_ylabel(met)
    ax[i].legend(['train', 'val'])
plt.savefig("Model3-Results.png")

In [None]:
%%time
# Evaluate the model on the test data using `evaluate`
print("Evaluate on test data")
results0 = model3.evaluate(test)
print("test loss, test acc:", results0)

In [None]:
import numpy as np
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Step 4: Make predictions and evaluate on the test set
true_labels = []
predicted_labels = []

for samples, labels in test.as_numpy_iterator():
    predictions = model3.predict(samples)
    predicted_classes = (predictions > 0.5).astype(int)  # Adjust threshold as needed
    true_labels.extend(labels)
    predicted_labels.extend(predicted_classes)

true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Step 5: Classification report and confusion matrix
print(classification_report(true_labels, predicted_labels))
cm = confusion_matrix(true_labels, predicted_labels)
print(cm)

# Plot the confusion matrix
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()

SVM

In [None]:
import numpy as np

def preprocess_dataset(dataset):
    # Flatten each sample and collect them
    all_samples = []
    all_labels = []

    for samples, labels in dataset.as_numpy_iterator():
        # Flatten from (4, 7, 469, 1) to (4, 7*469)
        samples_flattened = samples.reshape(samples.shape[0], -1)
        all_samples.append(samples_flattened)
        all_labels.append(labels)

    # Convert lists to numpy arrays
    all_samples = np.vstack(all_samples)  # Stack arrays vertically
    all_labels = np.concatenate(all_labels)
    
    return all_samples, all_labels

# Prepare the full dataset into features and labels
X, y = preprocess_dataset(train_data)  # Assuming train_data contains both train and test



In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=48)


In [None]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn import svm

# Create an SVM model with a pipeline that includes scaling
model_svm = make_pipeline(
    StandardScaler(),
    svm.SVC(kernel='rbf', random_state=84)
)

# Train the model
model_svm.fit(X_train, y_train)


In [None]:
accuracy = model_svm.score(X_test, y_test)
print(f'Test accuracy: {accuracy}')


In [None]:
from sklearn import svm, metrics
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# Predict with the model
y_pred = model_svm.predict(X_test)

# Calculate metrics
accuracy = metrics.accuracy_score(y_test, y_pred)
precision = metrics.precision_score(y_test, y_pred)
recall = metrics.recall_score(y_test, y_pred)
f1 = metrics.f1_score(y_test, y_pred)

# Print the metrics
print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')
print(f'F1 Score: {f1}')

# Plotting the confusion matrix
cm = metrics.confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d")
plt.title('Confusion Matrix')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.show()