Denoising

In [None]:
import os
import numpy as np
from scipy.io import wavfile
from scipy import signal
import pywt
from scipy.signal import morlet2, spectrogram

def bayesian_soft_thresholding(x, sigma_sq):
    threshold = np.sqrt(3 * sigma_sq)
    return np.sign(x) * np.maximum(0, np.abs(x) - threshold)

def sym20_wavelet_bayesian_soft_thresholding(input_file, output_folder, num_levels):
    # Load the PCG signal
    sample_rate, pcg_signal = wavfile.read(input_file)
    coeffs = pywt.wavedec(pcg_signal, 'sym20', level=num_levels)
    thresholded_coeffs = [bayesian_soft_thresholding(c, np.var(c)) for c in coeffs]
    reconstructed_data = pywt.waverec(thresholded_coeffs, 'sym20')
    # Save the filtered PCG signal
    output_file = os.path.join(output_folder, os.path.basename(input_file))
    wavfile.write(output_file, sample_rate, reconstructed_data.astype(np.int16))


# Get the current directory
current_directory = os.getcwd()

# List of folders containing PCG signal files
input_folders = ['training-a', 'training-b', 'training-c', 'training-d', 'training-e', 'training-f']

# Folder to save filtered PCG signal files
output_folder = os.path.join(current_directory, 'denoised_pcg_signals')

# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Iterate over input folders
for folder in input_folders:
    input_folder = os.path.join(current_directory, folder)
    # Check if the input folder exists
    if os.path.exists(input_folder):
        # Iterate over PCG signal files in the input folder
        for filename in os.listdir(input_folder):
            if filename.endswith('.wav'):
                input_file = os.path.join(input_folder, filename)
                # Apply Symlet wavelet with 10 decomposition levels and Bayesian soft thresholding
                reconstructed_data = sym20_wavelet_bayesian_soft_thresholding(input_file, output_folder, num_levels=10)
    else:
        print(f"Folder '{folder}' not found.")

print("Denoising completed.")

Bandpass filter

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy import signal

# Define function to filter PCG signals
def filter_pcg_signal(pcg_signal, sample_rate):

    # Resample the PCG signal to 1000 Hz
    resampled_rate = 1000
    resampled_pcg_signal = signal.resample(pcg_signal, int(len(pcg_signal) * (resampled_rate / sample_rate)))

    # Define the band-pass filter parameters
    nyquist_freq = resampled_rate / 2
    lowcut = 20
    highcut = 400
    lowcut_normalized = lowcut / nyquist_freq
    highcut_normalized = highcut / nyquist_freq

    # Create the band-pass filter
    b, a = signal.butter(4, [lowcut_normalized, highcut_normalized], btype='band')

    # Apply the band-pass filter to the resampled PCG signal
    filtered_pcg_signal = signal.filtfilt(b, a, resampled_pcg_signal)
    return filtered_pcg_signal


# Function to process PCG WAV file
def process_pcg_wav_file(input_filename, output_folder):
    # Read PCG signal from WAV file
    fs, pcg_signal = wavfile.read(input_filename)

    # Normalize the signal
    normalised_signal = filter_pcg_signal(pcg_signal, fs)

    # Get the base filename without extension
    basename = os.path.splitext(os.path.basename(input_filename))[0]

    # Specify the output filename
    """output_filename = os.path.join(output_folder, f"{basename}_new_filtered.wav")"""
    output_filename = os.path.join(output_folder, f"{basename}.wav")

    # Save the normalized signal as WAV file
    wavfile.write(output_filename, fs, normalised_signal.astype(np.int16))


# Main function
if __name__ == "__main__":
    # Specify the input folder containing WAV files
    input_folder = "denoised_pcg_signals"

    # Specify the output folder for saving normalized signals
    output_folder = "final_filtered_pcg_signals"

    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Check if the input folder exists
    if os.path.isdir(input_folder):
        # Iterate over files in the input folder
        for filename in os.listdir(input_folder):
            if filename.endswith('.wav'):
                input_file_path = os.path.join(input_folder, filename)
                process_pcg_wav_file(input_file_path, output_folder)
    else:
        print(f"Error: Folder '{input_folder}' not found.")
print("Bandpass Filtering completed.")

Scalogram generation

In [None]:
import os
import numpy as np
from scipy.io import wavfile
import pywt
import matplotlib.pyplot as plt

def generate_scalogram(signal, wavelet='morl', scales=None):
    if scales is None:
        scales = range(1, 128)  # Default scales

    try:
        coefficients, frequencies = pywt.cwt(signal, scales, wavelet)
        power = (abs(coefficients)) ** 2
        return power, frequencies
    except Exception as e:
        print(f"Error generating scalogram for signal: {e}")
        return None, None

def plot_scalogram(power, frequencies):
    plt.imshow(power, extent=[0, len(power[0]), frequencies[-1], frequencies[0]], aspect='auto', cmap='jet', vmax=np.max(power)*0.01) # Adjust the scaling factor as needed
    plt.axis('off')

def generate_and_save_scalogram(input_folder, output_folder, wavelet='morl', scales=None):
    # Create output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Iterate over files in the input folder
    for filename in sorted(os.listdir(input_folder)):
        if filename.endswith('.wav'):
            input_file_path = os.path.join(input_folder, filename)
            output_file_path = os.path.join(output_folder, os.path.splitext(filename)[0] + '_scalogram.png')

            try:
                # Load PCG signal from WAV file
                fs, pcg_signal = wavfile.read(input_file_path)

                # Only consider one channel if it's a stereo recording
                if pcg_signal.ndim > 1:
                    pcg_signal = pcg_signal[:, 0]

                # Normalize signal based on maximum absolute value
                max_abs = np.max(np.abs(pcg_signal))
                pcg_signal = pcg_signal.astype(np.float32)
                pcg_signal /= max_abs

                # Generate scalogram
                power, frequencies = generate_scalogram(pcg_signal, wavelet=wavelet, scales=scales)

                if power is not None and frequencies is not None:
                    # Plot and save scalogram
                    plot_scalogram(power, frequencies)
                    plt.savefig(output_file_path, bbox_inches='tight', pad_inches=0)
                    plt.close()
                    print(f"Scalogram saved to: {output_file_path}")

                else:
                    print(f"Skipping processing of {filename}")
            except Exception as e:
                print(f"Error processing {filename}: {e}")

# Main function
if __name__ == "__main__":
    # Specify the input folder containing normalized PCG signals
    input_folder = "final_filtered_pcg_signals"

    # Specify the output folder for saving scalogram images
    output_folder = "scalogram"

    # Generate and save scalograms for signals in the input folder
    generate_and_save_scalogram(input_folder, output_folder)




Resizing Scalogram

In [None]:
import os
from PIL import Image

def resize_images(input_folder, output_folder, size=(224, 224)):
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)

    # Iterate over all files in the input folder
    for filename in os.listdir(input_folder):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
            try:
                # Open an image file
                with Image.open(os.path.join(input_folder, filename)) as img:
                    # Resize image
                    img = img.resize(size, Image.LANCZOS)
                    # Save it to the output folder
                    img.save(os.path.join(output_folder, filename))
                    print(f"Resized and saved {filename} to {output_folder}")
            except Exception as e:
                print(f"Could not process {filename}: {e}")

# Define the input and output folder paths
input_folder = 'scalogram'
output_folder = 'last_scalogram'

# Call the resize function
resize_images(input_folder, output_folder)

VGG-19 Model

In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.applications import VGG19
from keras.models import Model
from keras.layers import Dense, GlobalAveragePooling2D, Dropout
from keras.optimizers import Adam
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

# Function to read scalogram images from folder
def read_images_from_folder(folder):
    images = []
    for filename in sorted(os.listdir(folder)):
        if filename.endswith(".png"):
            img_path = os.path.join(folder, filename)
            img = tf.keras.preprocessing.image.load_img(img_path, target_size=(img_height, img_width))
            img_array = tf.keras.preprocessing.image.img_to_array(img)
            images.append(img_array)
    return np.array(images)

def read_labels_from_csv(csv_file):
    df = pd.read_csv(csv_file)
    labels = df['Abnormality'].values
    return labels

# Load scalogram images and labels
folder_path = "resized_final_scalogram"
csv_file = "output.csv"
img_height, img_width = 224, 224  # Define your desired image dimensions

images = read_images_from_folder(folder_path)
labels = read_labels_from_csv(csv_file)

# Preprocess the data (e.g., normalization)
images = images / 255.0  # Normalize pixel values to [0, 1]

# Convert labels to numerical format
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.2, stratify=labels, random_state=42)

# Load the pre-trained VGG19 model
base_model = VGG19(weights='imagenet', include_top=False, input_shape=(img_height, img_width, 3))

# Add a global spatial average pooling layer
x = base_model.output
x = GlobalAveragePooling2D()(x)

# Add a fully-connected layer
x = Dense(1024, activation='relu')(x)

# Add dropout
x = Dropout(0.3)(x)

# Add a logistic layer for binary classification
predictions = Dense(1, activation='sigmoid')(x)

# This is the model we will train
model = Model(inputs=base_model.input, outputs=predictions)

# Freeze the base model
for layer in base_model.layers:
    layer.trainable = False

# Instantiate the Adam optimizer with the desired learning rate
optimizer = Adam(learning_rate=0.001)

# Compile the model with the Adam optimizer
model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train, epochs=200, batch_size=16, validation_data=(X_test, y_test))

# Evaluate the model
test_loss, test_acc = model.evaluate(X_test, y_test)
print('\nTest accuracy:', test_acc)

# Make predictions
predictions = model.predict(X_test)

# Convert predictions to binary classes
binary_predictions = np.round(predictions).flatten()

# Generate classification report
print('\nClassification Report:')
print(classification_report(y_test, binary_predictions))

# Generate confusion matrix
conf_matrix = confusion_matrix(y_test, binary_predictions)
print('\nConfusion Matrix:')
print(conf_matrix)

# Plot accuracy and loss graphs
plt.figure(figsize=(12, 6))

# Plot training & validation accuracy values
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')

plt.tight_layout()
plt.show()

# Save or serialize the model if desired
model.save("cvd_vgg19_200_model.h5")