# Workbook

In [1]:
# import necessary packages

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1" 
import pathlib
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import numpy as np
# import numba
import seaborn as sns
import tensorflow as tf

from IPython import display
import tensorflow as tf
import pandas as pd
import librosa 

# Set the seed value for experiment reproducibility.
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)

2024-11-17 04:01:19.041989: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
audio_path = 'dataset/AUDIO_NEW/'
audio_types = ['breathing', 'cough', 'speech']

pos_directory_breathing = './dataset/AUDIO_NEW/breathing/COVID_Positive'
neg_directory_breathing = './dataset/AUDIO_NEW/breathing/COVID_Negative'

pos_directory_cough = './dataset/AUDIO_NEW/cough/COVID_Positive'
neg_directory_cough = './dataset/AUDIO_NEW/cough/COVID_Negative'

pos_directory_speech = './dataset/AUDIO_NEW/speech/COVID_Positive'
neg_directory_speech = './dataset/AUDIO_NEW/speech/COVID_Negative'


In [4]:
def get_directories(audio_path, audio_type):
    pos_directory = os.path.join(audio_path, audio_type, 'COVID_Positive')
    neg_directory = os.path.join(audio_path, audio_type, 'COVID_Negative')
    
    print(f"Positive Directory: {pos_directory}")
    print(f"Negative Directory: {neg_directory}")
    
    return pos_directory, neg_directory

### breathing mel spectrogram

In [None]:
# pos_directory_breathing, neg_directory_breathing = get_directories(audio_path, audio_types[0])

In [6]:
def create_mel_spectrogram(y, sr, file_path):
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=128, n_mels=128)
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

    # Normalize the spectrogram
    if np.std(log_mel_spectrogram) == 0:
        print("log_mel_spectrogram == 0: ", file_path.resolve())
    # Normalize the spectrogram
    epsilon = 1e-6  # Small value to avoid division by zero
    log_mel_spectrogram = (log_mel_spectrogram - np.mean(log_mel_spectrogram)) / (np.std(log_mel_spectrogram) + epsilon)

    # Add channel dimension
    log_mel_spectrogram = np.expand_dims(log_mel_spectrogram, axis=-1)  
    
    return log_mel_spectrogram
def create_zero_crossing_rate(y):
    return np.mean(librosa.feature.zero_crossing_rate(y))
def create_spectral_centroid(y, sr):
    return np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))

In [7]:
def create_features(pos_directory, neg_directory, target_length):
    spectrograms = []
    other_features = []
    labels = []

    # Read positive samples
    for file in os.listdir(pos_directory):
        file_path = os.path.join(pos_directory, file)
        y, sr = librosa.load(file_path, sr=22050, duration=target_length)
        
        if y.size == 0:
            continue
        
        # Generate features
        spectrogram = create_mel_spectrogram(y, sr, pathlib.Path(file_path))
        zero_crossing_rate = create_zero_crossing_rate(y)
        spectral_centroid = create_spectral_centroid(y, sr)
        
        # Append the features
        spectrograms.append(spectrogram)
        other_features.append([zero_crossing_rate, spectral_centroid])
        labels.append(1)  # Label for positive class

    # Read negative samples
    for file in os.listdir(neg_directory):
        file_path = os.path.join(neg_directory, file)
        y, sr = librosa.load(file_path, sr=22050, duration=target_length)
        if y.size == 0:
            continue 
        # Generate features
        spectrogram = create_mel_spectrogram(y, sr, pathlib.Path(file_path))
        zero_crossing_rate = create_zero_crossing_rate(y)
        spectral_centroid = create_spectral_centroid(y, sr)
        
        # Append the features
        spectrograms.append(spectrogram)
        other_features.append([zero_crossing_rate, spectral_centroid])
        labels.append(0)  # Label for negative class

    return spectrograms, other_features, labels


In [8]:
def create_features_for_all(target_length):
    # All audio types
    all_spectrograms = []
    all_other_features = []
    all_labels = []

    # List of directories for each audio type
    audio_types = ['breathing', 'cough', 'speech']

    for audio_type in audio_types:
        # Define the directories based on audio_type
        if audio_type == 'breathing':
            pos_directory = pos_directory_breathing
            neg_directory = neg_directory_breathing
        elif audio_type == 'cough':
            pos_directory = pos_directory_cough
            neg_directory = neg_directory_cough
        elif audio_type == 'speech':
            pos_directory = pos_directory_speech
            neg_directory = neg_directory_speech
        
        # Generate features for the current type
        spectrograms, other_features, labels = create_features(pos_directory, neg_directory, target_length)
        
        # Collect results
        all_spectrograms.extend(spectrograms)
        all_other_features.extend(other_features)
        all_labels.extend(labels)

    # Convert lists to numpy arrays
    all_spectrograms = np.array(all_spectrograms)
    all_other_features = np.array(all_other_features)
    all_labels = np.array(all_labels)

    return all_spectrograms, all_other_features, all_labels


In [9]:
target_length = 80000
spectrograms, other_features, labels = create_features_for_all(target_length)

print(f"Spectrograms: {spectrograms.shape}")
print(f"Other Features: {other_features.shape}")
print(f"Labels: {labels.shape}")

: 

In [None]:
# # List to store spectrograms and labels
# spectrograms = []
# zero_crossing_rates = []
# spectral_centroids = []
# labels = []

# for file_path in pos_directory.iterdir():
#     y, sr = librosa.load(file_path.resolve(), sr=16000)
#     y = y[:80000]

#     zero_padding = np.zeros(80000 - len(y), dtype=np.float32)
#     y = np.concatenate([y, zero_padding],axis=0)
    
#     mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=128, n_mels=128)
#     log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

#     # Normalize the spectrogram
#     log_mel_spectrogram = (log_mel_spectrogram - np.mean(log_mel_spectrogram)) / np.std(log_mel_spectrogram)

#     log_mel_spectrogram = np.expand_dims(log_mel_spectrogram, axis=-1)  # Add channel dimension

#     # Append to list
#     spectrograms.append(log_mel_spectrogram)
    
#     zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(y))
#     spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
    
#     zero_crossing_rates.append(zero_crossing_rate)
#     spectral_centroids.append(spectral_centroid)
    
#     labels.append(1)  # Assuming binary classification

# for file_path in neg_directory.iterdir():
#     y, sr = librosa.load(file_path.resolve(), sr=16000)
#     y = y[:80000]

#     zero_padding = np.zeros(80000 - len(y), dtype=np.float32)
#     y = np.concatenate([y, zero_padding],axis=0)
    
#     mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=128, n_mels=128)
#     log_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)

#     if np.std(log_mel_spectrogram) == 0:
#         print(file_path.resolve())
#     # Normalize the spectrogram
#     epsilon = 1e-6  # Small value to avoid division by zero
#     log_mel_spectrogram = (log_mel_spectrogram - np.mean(log_mel_spectrogram)) / (np.std(log_mel_spectrogram) + epsilon)
    
#     log_mel_spectrogram = np.expand_dims(log_mel_spectrogram, axis=-1)  # Add channel dimension
#     #print(log_mel_spectrogram.shape)
#     # Append to list
#     spectrograms.append(log_mel_spectrogram)

#     zero_crossing_rate = np.mean(librosa.feature.zero_crossing_rate(y))
#     spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))
    
#     zero_crossing_rates.append(zero_crossing_rate)
#     spectral_centroids.append(spectral_centroid)
    
#     labels.append(0)  

In [None]:
spectrograms = np.array(spectrograms)
other_features = np.array(other_features)
spectrograms_flattened = spectrograms.reshape(spectrograms.shape[0], -1)

In [None]:
spectrograms.shape

In [None]:
spectrograms

In [None]:
spectrograms_flattened

In [None]:
X_combined = np.hstack([spectrograms_flattened, other_features])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_combined, labels, test_size=0.2, stratify=labels)

In [None]:
X_combined[0]

In [None]:
# Flatten each spectrogram into a single vector
# Resulting shape will be (num_samples, height * width * channels)
# X_train_numpy = np.array(X_train)
# X_train_flattened = X_train_numpy.reshape(X_train_numpy.shape[0], -1)

# print(f"Original shape: {X_train_numpy.shape}")
# print(f"Flattened shape: {X_train_flattened.shape}")

In [None]:
from imblearn.combine import SMOTEENN
from collections import Counter

smote_enn = SMOTEENN(random_state=seed)
X_train_resampled, y_train_resampled = smote_enn.fit_resample(X_train, y_train)

# Print resampled class distribution
print(f"Resampled class distribution: {Counter(y_train_resampled)}")

In [None]:
# from imblearn.combine import SMOTEENN
# from collections import Counter

# smote_enn = SMOTEENN(random_state=seed)
# X_resampled, y_resampled = smote_enn.fit_resample(X_train_flattened, y_train)

# # Print resampled class distribution
# print(f"Resampled class distribution: {Counter(y_resampled)}")

In [None]:
# # Reshape back to original format after resampling
# X_resampled = X_resampled.reshape(X_resampled.shape[0], 128, 626, 1)

# print(f"Reshaped back to original format: {X_resampled.shape}")

In [None]:
num_spectrogram_features = 128 * 626  # Flattened size

In [None]:
# Separate resampled spectrograms and other features for X_train_resampled
spectrograms_resampled = X_train_resampled[:, :num_spectrogram_features]
other_features_resampled = X_train_resampled[:, num_spectrogram_features:]

In [None]:
spectrograms_resampled

In [None]:
# Reshape the flattened spectrograms back to their original shape (128x626)
spectrograms_resampled_reshaped = spectrograms_resampled.reshape(spectrograms_resampled.shape[0], 128, 626, 1)

In [None]:
# Optionally add a channel dimension for CNN input (e.g., shape: [samples, 128, 626, 1])
#spectrograms_resampled_reshaped = np.expand_dims(spectrograms_resampled_reshaped, axis=-1)

In [None]:
spectrograms_resampled_reshaped.shape

In [None]:
other_features_resampled.shape

In [None]:
import matplotlib.pyplot as plt
import librosa.display

# Function to plot spectrogram
def plot_spectrogram(spectrogram, title):
    plt.figure(figsize=(10, 4))
    # Remove channel dimension if present for visualization
    if spectrogram.shape[-1] == 1:
        spectrogram = spectrogram.squeeze(-1)
    librosa.display.specshow(spectrogram.T, sr=22050, x_axis='time', y_axis='mel')
    plt.colorbar(format='%+2.0f dB')
    plt.title(title)
    plt.tight_layout()
    plt.show()

# Assuming 'original_spectrogram' is your original unflattened spectrogram
# and 'reshaped_spectrogram' is your reshaped version after processing

# Plot original spectrogram (before flattening)
plot_spectrogram(spectrograms[0], "Original Spectrogram")

# Plot reshaped spectrogram (after reshaping back)
plot_spectrogram(spectrograms_resampled_reshaped[100], "Reshaped Spectrogram")


In [None]:
# For X_test (no resampling needed), separate and reshape the spectrograms
spectrograms_test = X_test[:, :num_spectrogram_features]
other_features_test = X_test[:, num_spectrogram_features:]
spectrograms_test_reshaped = spectrograms_test.reshape(spectrograms_test.shape[0], 128, 626, 1)

In [None]:
y_train_resampled = np.array(y_train_resampled)
y_test = np.array(y_test)

In [None]:
# y_resampled = np.array(y_resampled)

### Single input model

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Input, GlobalAveragePooling2D

In [None]:
model = Sequential()
model.add(Input(shape=(625,128,1)))

# First Conv2D layer followed by MaxPooling
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2))) # Reduces spatial dimensions by half

# Second Conv2D layer followed by MaxPooling
model.add(Conv2D(16, (3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2))) # Further reduces spatial dimensions

# Third Conv2D layer followed by MaxPooling
model.add(Conv2D(32, (3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2))) # Further reduces spatial dimensions

# Fourth Conv2D layer followed by MaxPooling (new)
model.add(Conv2D(64, (3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2))) # Further reduces spatial dimensions

# Flatten and Dense layers
#model.add(Flatten())
model.add(GlobalAveragePooling2D())

model.add(Dense(128, activation='relu'))
model.add(Dense(1, activation='sigmoid'))

In [None]:
model.compile('Adam', loss='BinaryCrossentropy', metrics=[tf.keras.metrics.Recall(),tf.keras.metrics.Precision()])

In [None]:
model.summary()

In [None]:
hist = model.fit(X_train_resampled, y_train_resampled, batch_size = 16, verbose = 2, epochs=20, validation_split = 0.1)


### Fit Model, View Loss and KPI Plots

In [None]:
X_test = np.array(X_test)
y_test = np.array(y_test)

y_pred = model.predict(X_test)

In [None]:
y_pred_binary = (y_pred > 0.99).astype(int)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred_binary)

In [None]:
len(y_test)

In [None]:
disp = ConfusionMatrixDisplay(cm)

disp.plot()
plt.show()

### Multi-input model

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, GlobalAveragePooling2D, Dense, Concatenate, Dropout

# Define input for Mel spectrogram (shape: 625 time frames x 128 frequency bins x 1 channel)
mel_input = Input(shape=(128, 626, 1), name='mel_spectrogram')

# Define CNN sub-network for Mel spectrogram
x = Conv2D(16, (3,3), activation='relu')(mel_input)
x = MaxPooling2D(pool_size=(2,2))(x)
x = Conv2D(16, (3,3), activation='relu')(x)
x = MaxPooling2D(pool_size=(2,2))(x)
x = Conv2D(32, (3,3), activation='relu')(x)
x = MaxPooling2D(pool_size=(2,2))(x)
x = Conv2D(64, (3,3), activation='relu')(x)
x = MaxPooling2D(pool_size=(2,2))(x)
x = GlobalAveragePooling2D()(x)

# Define input for other features (e.g., zero-crossing rate and spectral centroid)
other_input = Input(shape=(2,), name='other_features')

# Define a simple dense sub-network for other features
y = Dense(64, activation='relu')(other_input)
y = Dense(128, activation='relu')(y)  # Add more dense layers
y = Dropout(0.3)(y)                   # Add dropout to prevent overfitting
y = Dense(128, activation='relu')(y)   # Another dense layer
y = Dense(64, activation='relu')(y)    # Another dense layer

# Concatenate both sub-networks
combined = Concatenate()([x, y])

# Add final classification layers
z = Dense(128, activation='relu')(combined)
z = Dense(1, activation='sigmoid')(z)  # Binary classification

In [None]:
X_train_spectrograms = spectrograms_resampled_reshaped
X_train_others = other_features_resampled
y_train = y_train_resampled

In [None]:
# Define the model with two inputs
model = tf.keras.Model(inputs=[mel_input, other_input], outputs=z)

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy','recall','precision'])

# Print model summary
model.summary()

In [None]:
history = model.fit(
    [X_train_spectrograms,
     X_train_others],
    y_train,
    validation_split = 0.1,
    verbose = 2,
    epochs=20,
    batch_size=16
)

In [None]:
X_train_spectrograms.shape

In [None]:
X_train_others

In [None]:
y_pred_train = model.predict([X_train_spectrograms, X_train_others])

In [None]:
y_pred_train_binary = (y_pred_train > 0.99).astype(int)

In [None]:
from sklearn.metrics import accuracy_score

accuracy_train = accuracy_score(y_train, y_pred_train_binary)

In [None]:
accuracy_train

In [None]:
cm = confusion_matrix(y_train, y_pred_train_binary)

# Step 4: Visualize or print the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix for Training Data")
plt.show()

In [None]:
y_pred_test = model.predict([spectrograms_test_reshaped, other_features_test])

In [None]:
y_pred_test_binary = (y_pred > 0.99).astype(int)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred_test_binary)

In [None]:
disp = ConfusionMatrixDisplay(cm)

disp.plot()
plt.title("Confusion Matrix for Testing Data")
plt.show()

In [None]:
from sklearn.metrics import accuracy_score

accuracy_test = accuracy_score(y_test, y_pred_test_binary)

In [None]:
accuracy_test