In [44]:
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing import image

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
import cv2
import os

# Set the path to your ECG image directory
ecg_dir = "/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/"

# Set the target image size for VGG16
target_size = (224, 224)

# Load the pre-trained VGG16 model without the top classification layer
vgg_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Create a new model, specifying the 'block5_pool' output
model = Model(inputs=vgg_model.input, outputs=vgg_model.get_layer('block5_pool').output)

# Function to preprocess and extract features from an ECG image
def extract_features(record):
    images = load_image(record)
    all_features = []  # Store features for all images in this record 
    for image in images:
        image = np.asarray(image)
        image = cv2.resize(image, target_size)
        image = img_to_array(image)
        image = np.expand_dims(image, axis=0)
        image = preprocess_input(image)

        feature = model.predict(image)
        print(f"Features shape per image before flattening: {feature.shape}")
        feature = feature.flatten()
        print(f"Features shape per image after flattening: {feature.shape}")
        all_features.append(feature)

    # Combine features correctly, handling cases with zero or multiple images:
    if len(all_features) > 0:
        features = np.vstack(all_features)  # Use vstack to vertically stack feature vectors
    else:
        features = np.array([])  # Handle cases with no images
    print(f"Features shape per record after concatenation: {features.shape}")
    return features

# Load the image(s) for a record.
def load_image(record):
    path = os.path.split(record)[0]
    image_files = get_image_files(record)
    
    images = list()
    for image_file in image_files:
        image_file_path = os.path.join(path, image_file)
        if os.path.isfile(image_file_path):
            image = Image.open(image_file_path)
            images.append(image)
    
    return images

# Get the image file(s) for a record.
def get_image_files(record):
    header_file = get_header_file(record)
    header = load_text(header_file)
    image_files = get_image_files_from_header(header)
    return image_files

# Get the image file(s) from a header or a similar string.
def get_image_files_from_header(string):
    images, has_image = get_variables(string, '#Image:')
    if not has_image:
        raise Exception('No images available: did you forget to generate or include the images?')
    return images

# Load the header for a record.
def load_header(record):
    header_file = get_header_file(record)
    header = load_text(header_file)
    return header

# Get the header file for a record.
import os

# Get the header file for a record.
def get_header_file(record):
    if not record.endswith('.hea'):
        header_file = record + '.hea'
    else:
        header_file = record
    return header_file

# Function to load dx labels from a record
def load_dx(record):
    header = load_header(record)
    dx = get_dxs_from_header(header)
    return dx

# Get the dx class(es) from a header or a similar string.
def get_dxs_from_header(string):
    dxs, has_dx = get_variables(string, '#Dx:')
    if not has_dx:
        raise Exception('No dx classes available: are you trying to load the classes from the held-out dataset, or did you forget to prepare the data to include the classes?')
    return dxs

# Get variables from a text file.
def get_variables(string, variable_name, sep=','):
    variables = list()
    has_variable = False
    for l in string.split('\n'):
        if l.startswith(variable_name):
            variables += [variable.strip() for variable in l[len(variable_name):].strip().split(sep)]
            has_variable = True
    return variables, has_variable

# Load a text file as a string.
def load_text(filename):
    with open(filename, 'r') as f:
        string = f.read()
    return string

# Function to compute one-hot encoding for the dx labels
def compute_one_hot_encoding(dxs, classes):
    num_records = len(dxs)
    num_classes = len(classes)
    one_hot = np.zeros((num_records, num_classes), dtype=int)
    for i in range(num_records):
        for dx in dxs[i]:
            if dx in classes:
                j = classes.index(dx)
                one_hot[i, j] = 1
    return one_hot

# Find the records in a folder and its subfolders.
def find_records(folder):
    records = set()
    for root, directories, files in os.walk(folder):
        for file in files:
            extension = os.path.splitext(file)[1]
            if extension == '.hea':
                record = os.path.relpath(os.path.join(root, file), folder)[:-4]
                records.add(record)
    records = sorted(records)
    return records


# Load and preprocess the ECG images
features = list()
dxs = list()

records = find_records(ecg_dir)
num_records = len(records)

if num_records == 0:
    raise FileNotFoundError('No data was provided.')

for i in range(num_records):
    width = len(str(num_records))
    print(f'- {i+1:>{width}}/{num_records}: {records[i]}...')

    record = os.path.join(ecg_dir, records[i])
    
    # Extract the features from the image, but only if the image has one or more dx classes.
    dx = load_dx(record)
    if dx:
        current_features = extract_features(record)
        features.append(current_features)
        dxs.append(dx)

if not dxs:
    raise Exception('There are no labels for the data.')

features = np.array(features) 
print(f"Final Features Shape for the Entire Dataset: {features.shape}")

classes = sorted(set.union(*map(set, dxs)))
dxs = compute_one_hot_encoding(dxs, classes)

-    1/1987: 00000/00001_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 253ms/step
Features shape per image before flattening: (1, 7, 7, 512)
Features shape per image after flattening: (25088,)
Features shape per record after concatenation: (1, 25088)
-    2/1987: 00000/00002_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
Features shape per image before flattening: (1, 7, 7, 512)
Features shape per image after flattening: (25088,)
Features shape per record after concatenation: (1, 25088)
-    3/1987: 00000/00003_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
Features shape per image before flattening: (1, 7, 7, 512)
Features shape per image after flattening: (25088,)
Features shape per record after concatenation: (1, 25088)
-    4/1987: 00000/00004_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
Features shape per image before flattening: (1, 7, 7, 512)
Features shape per im

In [8]:
# #Training the model

# from sklearn.ensemble import RandomForestClassifier
# from sklearn.multiclass import OneVsRestClassifier
# import joblib
# import os

# # Save your trained dx classification model.
# def save_dx_model(model_folder, model, classes):
#     d = {'model': model, 'classes': classes}
#     filename = os.path.join(model_folder, 'rf_dx_model.sav')
#     joblib.dump(d, filename, protocol=0)


# model_folder = '/home/vinayaka/physionet/Model_folder'

# # Define parameters for random forest classifier.
# n_estimators = 123  # Number of trees in the forest.
# max_leaf_nodes = 456  # Maximum number of leaf nodes in each tree.
# random_state = 789  # Random state; set for reproducibility.

# # Fit the model.
# rf_model = RandomForestClassifier(
#     n_estimators=n_estimators,
#     max_leaf_nodes=max_leaf_nodes,
#     random_state=random_state
# )

# # Use the OneVsRestClassifier to handle multi-label cases. Uses binary relevance technique to handle multi-label classification scenario
# ovr_rf_classifier = OneVsRestClassifier(rf_model)

# # Train the classifier on the dataset
# ovr_rf_classifier.fit(features, dxs)

# # Save the model.
# save_dx_model(model_folder, ovr_rf_classifier, classes)

In [45]:
#Training the model

from sklearn.ensemble import RandomForestClassifier
import joblib
import os

# Save your trained dx classification model.
def save_dx_model(model_folder, model, classes):
    d = {'model': model, 'classes': classes}
    filename = os.path.join(model_folder, 'rf_dx_model.sav')
    joblib.dump(d, filename, protocol=0)


model_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/Model_folder'

# Define parameters for random forest classifier.
n_estimators = 123  # Number of trees in the forest.
max_leaf_nodes = 456  # Maximum number of leaf nodes in each tree.
random_state = 789  # Random state; set for reproducibility.

# Reshape features to 2D before passing to the classifier
features = features.reshape(features.shape[0], -1)
print(features.shape)

# Fit the model.
rf_model = RandomForestClassifier(
    n_estimators=n_estimators,
    max_leaf_nodes=max_leaf_nodes,
    random_state=random_state
).fit(features, dxs)

# Save the model.
save_dx_model(model_folder, rf_model, classes)

(1987, 25088)


In [46]:
#Run the model - Load the saved trained model and run it on the test data.

import os
import joblib
import numpy as np
import cv2

# Set the target image size for VGG16
target_size = (224, 224)

# Load the pre-trained VGG16 model without the top classification layer
vgg_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Create a new model, specifying the 'block5_pool' output
model = Model(inputs=vgg_model.input, outputs=vgg_model.get_layer('block5_pool').output)

# Load your trained dx classification model. This function is *required*. You should edit this function to add your code, but do
# *not* change the arguments of this function. If you do not train a dx classification model, then you can return None.
def load_dx_model(model_folder):
    filename = os.path.join(model_folder, 'rf_dx_model.sav')
    return joblib.load(filename)

# Function to preprocess and extract features from an ECG image
def extract_features(record):
    images = load_image(record)
    features = []
    for image in images:
        image = np.asarray(image)
        image = cv2.resize(image, target_size)
        image = img_to_array(image)
        image = np.expand_dims(image, axis=0)
        image = preprocess_input(image)
        feature = model.predict(image)
        features.append(feature)
    features = np.concatenate(features, axis=1)
    features = features.flatten()
    return features

# Run your trained dx classification model. This function is *required*. You should edit this function to add your code, but do
# *not* change the arguments of this function.
def run_dx_model(dx_model, record):
    model = dx_model['model']
    classes = dx_model['classes']

    # Extract features.
    features = extract_features(record)
    features = features.reshape(1, -1)

    # Get model probabilities.
    probabilities = model.predict_proba(features)
    #probabilities = np.asarray(probabilities, dtype=np.float32)[:, 0, 1]

    # Choose the class(es) with the highest probability as the label(s).
    max_probability = np.nanmax(probabilities)
    #labels = [classes[i] for i, probability in enumerate(probabilities) if probability == max_probability]
    #labels = [classes[i] for i, prob in enumerate(probabilities) for element in prob if element == max_probability]
    
    labels = [classes[i] for i, prob in enumerate(probabilities) if (prob == max_probability).any()]
    
    return labels

# Save the dx class(es) for a record.
def save_dx(record, dx):
    header_file = get_header_file(record)
    header = load_text(header_file)
    header += '#Dx: ' + ', '.join(dx) + '\n'
    save_text(header_file, header)
    return header

def save_dxs(record, dxs):
    return save_dx(record, dxs)

# Save a string as a text file.
def save_text(filename, string):
    with open(filename, 'w') as f:
        f.write(string)
        
# Get the header file for a record.
def get_header_file(record):
    if not record.endswith('.hea'):
        header_file = record + '.hea'
    else:
        header_file = record
    return header_file

# Save the header for a record.
def save_header(record, header):
    header_file = get_header_file(record)
    save_text(header_file, header)
    

#Driver code starts from here

model_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/Model_folder'
data_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Test'
output_folder = "/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/test_predictions_output"

#Load the saved model      
dx_model = load_dx_model(model_folder) ### Teams: Implement this function!!!

# Find the Challenge data (test data).
print('Finding the Challenge data...')

records = find_records(data_folder)
num_records = len(records)

if num_records==0:
    raise Exception('No data were provided.')

# Create a folder for the Challenge outputs if it does not already exist.
os.makedirs(output_folder, exist_ok=True)

# Run the team's model(s) on the Challenge data.
print('Running the Challenge model(s) on the Challenge data...')

# Iterate over the records.
for i in range(num_records):
    width = len(str(num_records))
    print(f'- {i+1:>{width}}/{num_records}: {records[i]}...')

    data_record = os.path.join(data_folder, records[i])
    output_record = os.path.join(output_folder, records[i])
    
    # Run the dx classification model. Allow or disallow the model to fail on some of the data, which can be helpful for debugging.
    dx = run_dx_model(dx_model, data_record) ### Teams: Implement this function!!!
    
    # Save Challenge outputs.
    output_path = os.path.split(output_record)[0]
    os.makedirs(output_path, exist_ok=True)

    data_header = load_header(data_record)
    save_header(output_record, data_header)

    # if signal is not None:
    #     save_signal(output_record, signal)

    #     comment_lines = [l for l in data_header.split('\n') if l.startswith('#')]
    #     signal_header = load_header(output_record)
    #     signal_header += ''.join(comment_lines) + '\n'
    #     save_header(output_record, signal_header)

    if dx is not None:
        save_dx(output_record, dx)

print('Done.')


Finding the Challenge data...
Running the Challenge model(s) on the Challenge data...
-   1/998: 02000/02000_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 242ms/step
-   2/998: 02000/02001_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 13ms/step
-   3/998: 02000/02002_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
-   4/998: 02000/02003_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
-   5/998: 02000/02004_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
-   6/998: 02000/02005_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
-   7/998: 02000/02006_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
-   8/998: 02000/02007_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
-   9/998: 02000/02008_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
-  1

In [47]:
# Evaluate the models by computing their macro F-measure.

import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix


# Compute macro F-measure.

# def evaluate_model(label_folder, output_folder, extra_scores=False):
#     # Find the records.
#     records = find_records(label_folder)
#     num_records = len(records)

#     # Compute the classification metrics.
#     records_completed_classification = list()
#     label_dxs = list()
#     output_dxs = list()

#     # Iterate over the records.
#     for record in records:
#         # Load the classes, if available.
#         label_record = os.path.join(label_folder, record)
#         label_dx = load_dx(label_record)

#         if label_dx:
#             output_record = os.path.join(output_folder, record)
#             output_dx = load_dx(output_record)

#             if output_dx:
#                 records_completed_classification.append(record)

#             label_dxs.append(label_dx)
#             output_dxs.append(output_dx)

#     if records_completed_classification:
#         f_measure, per_class_f_measures, classes = compute_f_measure(label_dxs, output_dxs)

#         # --- Print per-class F-measures ---
#         print("Per-Class F-measures:")
#         for i, class_name in enumerate(classes):
#             print(f"  Class: {class_name}, F-measure: {per_class_f_measures[i]:.4f}")
#     else:
#         f_measure = float('nan')

#     # Return the results.
#     return f_measure

#########################################################################################################################################################################


def evaluate_model(label_folder, output_folder, extra_scores=False):
    records = find_records(label_folder)
    num_records = len(records)

    records_completed_classification = list()
    label_dxs = list()
    output_dxs = list()

    for record in records:
        label_record = os.path.join(label_folder, record)
        label_dx = load_dx(label_record)

        if label_dx:
            output_record = os.path.join(output_folder, record)
            output_dx = load_dx(output_record)

            if output_dx:
                records_completed_classification.append(record)

            label_dxs.append(label_dx)
            output_dxs.append(output_dx)

    print("Shape of label_dxs:", np.shape(label_dxs))
    print("Shape of output_dxs:", np.shape(output_dxs))

    if records_completed_classification:
        f_measure, per_class_f_measures, classes = compute_f_measure(label_dxs, output_dxs)

        # --- Print per-class F-measures ---
        print("Per-Class F-measures:")
        for i, class_name in enumerate(classes):
            print(f"  Class: {class_name}, F-measure: {per_class_f_measures[i]:.4f}")

        # --- Confusion Matrix ---
        cm = confusion_matrix(np.argmax(label_dxs, axis=1), np.argmax(output_dxs, axis=1))
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
        plt.xlabel("Predicted")
        plt.ylabel("True")
        plt.title("Confusion Matrix")
        plt.show()

    else:
        f_measure = float('nan')

    return f_measure




def compute_f_measure(labels, outputs):
    # Compute confusion matrix.
    classes = sorted(set.union(*map(set, labels)))
    labels = compute_one_hot_encoding(labels, classes)
    outputs = compute_one_hot_encoding(outputs, classes)
    A = compute_one_vs_rest_confusion_matrix(labels, outputs, classes)

    num_classes = len(classes)
    per_class_f_measure = np.zeros(num_classes)
    for k in range(num_classes):
        tp, fp, fn, tn = A[k, 0, 0], A[k, 0, 1], A[k, 1, 0], A[k, 1, 1]
        if 2 * tp + fp + fn > 0:
            per_class_f_measure[k] = float(2 * tp) / float(2 * tp + fp + fn)
        else:
            per_class_f_measure[k] = float('nan')

    if np.any(np.isfinite(per_class_f_measure)):
        macro_f_measure = np.nanmean(per_class_f_measure)
    else:
        macro_f_measure = float('nan')

    return macro_f_measure, per_class_f_measure, classes

def compute_one_vs_rest_confusion_matrix(labels, outputs, classes):
    assert np.shape(labels) == np.shape(outputs)

    num_instances = len(labels)
    num_classes = len(classes)

    A = np.zeros((num_classes, 2, 2))
    for i in range(num_instances):
        for j in range(num_classes):
            if labels[i, j] == 1 and outputs[i, j] == 1: # TP
                A[j, 0, 0] += 1
            elif labels[i, j] == 0 and outputs[i, j] == 1: # FP
                A[j, 0, 1] += 1
            elif labels[i, j] == 1 and outputs[i, j] == 0: # FN
                A[j, 1, 0] += 1
            elif labels[i, j] == 0 and outputs[i, j] == 0: # TN
                A[j, 1, 1] += 1

    return A

# sample_test_labels = '/home/vinayaka/physionet/sample_test_labels/'
# sample_test_predictions = '/home/vinayaka/physionet/sample_test_predictions/'

sample_test_labels = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Test_labels/02000'
sample_test_predictions = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/test_predictions_output/02000'

f_mes = evaluate_model(sample_test_labels, sample_test_predictions)

# Output the scores to screen and/or a file.
output_filename = "scores.txt"
output_string = f"F-score : {f_mes}"

save_text(output_filename, output_string) 

# Optionally, also print to the screen
print(output_string)

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (998,) + inhomogeneous part.

In [None]:
# #!/usr/bin/env python

# # Do *not* edit this script. Changes will be discarded so that we can process the models consistently.

# # This file contains functions for evaluating models for the Challenge. You can run it as follows:
# #
# #   python evaluate_model.py -d labels -o outputs -s scores.csv
# #
# # where 'labels' is a folder containing files with the labels, 'outputs' is a folder containing files with the outputs from your
# # model, and 'scores.csv' (optional) is a collection of scores for the model outputs.
# #
# # Each label or output file must have the format described on the Challenge webpage. The scores for the algorithm outputs are also
# # described on the Challenge webpage.

# import argparse
# import numpy as np
# import os
# import os.path
# import sys

# from helper_code import *

# # Parse arguments.
# def get_parser():
#     description = 'Evaluate the Challenge model(s).'
#     parser = argparse.ArgumentParser(description=description)
#     parser.add_argument('-d', '--label_folder', type=str, required=True)
#     parser.add_argument('-o', '--output_folder', type=str, required=True)
#     parser.add_argument('-x', '--extra_scores', action='store_true')    
#     parser.add_argument('-s', '--score_file', type=str, required=False)
#     return parser

# # Evaluate the models.
# def evaluate_model(label_folder, output_folder, extra_scores=False):
#     # Find the records.
#     records = find_records(label_folder)
#     num_records = len(records)

#     # Compute the signal reconstruction metrics.
#     channels = list()
#     records_completed_signal_reconstruction = list()
#     snrs = list()
#     snrs_median = list()
#     ks_metric = list()
#     asci_metric = list()
#     weighted_absolute_difference_metric = list()

#     # Iterate over the records.
#     for record in records:
#         # Load the signals, if available.
#         label_record = os.path.join(label_folder, record)
#         label_signal, label_fields = load_signal(label_record)

#         if label_signal is not None:
#             label_channels = label_fields['sig_name']
#             label_sampling_frequency = label_fields['fs']
#             label_num_samples = label_fields['sig_len']
#             channels.append(label_channels) # Use this variable if computing aggregate statistics for each channel.

#             output_record = os.path.join(output_folder, record)
#             output_signal, output_fields = load_signal(output_record)

#             if output_signal is not None:
#                 output_channels = output_fields['sig_name']
#                 output_sampling_frequency = output_fields['fs']                
#                 output_num_samples = output_fields['sig_len']
#                 records_completed_signal_reconstruction.append(record)

#                 ###
#                 ### TO-DO: Perform checks, such as sampling frequency, units, etc.
#                 ###

#                 # Reorder and trim or pad the signal as needed.
#                 output_signal = reorder_signal(output_signal, output_channels, label_channels)
#                 output_signal = trim_signal(output_signal, label_num_samples)

#             else:
#                 output_signal = np.zeros(np.shape(label_signal), dtype=label_signal.dtype)

#             # Compute the signal reconstruction metrics.
#             channels = label_channels
#             sampling_frequency = label_sampling_frequency
#             num_channels = len(label_channels)

#             values = list()
#             for j in range(num_channels):
#                 value = compute_snr(label_signal[:, j], output_signal[:, j])
#                 values.append(value)
#             snrs.append(values)

#             if extra_scores:
#                 values = list()
#                 for j in range(num_channels):
#                     value = compute_snr_median(label_signal[:, j], output_signal[:, j])
#                     values.append(value)
#                 snrs_median.append(values) 

#                 values = list()
#                 for j in range(num_channels):
#                     value = compute_ks_metric(label_signal[:, j], output_signal[:, j])
#                     values.append(value)
#                 ks_metric.append(values)

#                 values = list()
#                 for j in range(num_channels):
#                     value = compute_asci_metric(label_signal[:, j], output_signal[:, j])
#                     values.append(value)
#                 asci_metric.append(values)             
    
#                 values = list()
#                 for j in range(num_channels):
#                     value = compute_weighted_absolute_difference(label_signal[:, j], output_signal[:, j], sampling_frequency)
#                     values.append(value)
#                 weighted_absolute_difference_metric.append(values)

#     if records_completed_signal_reconstruction:
#         snrs = np.concatenate(snrs)
#         mean_snr = np.nanmean(snrs)

#         if extra_scores:
#             snrs_median = np.concatenate(snrs_median)
#             mean_snr_median = np.nanmean(snrs_median)

#             ks_metric = np.concatenate(ks_metric)
#             mean_ks_metric = np.nanmean(ks_metric)

#             asci_metric = np.concatenate(asci_metric)
#             mean_asci_metric = np.nanmean(asci_metric)

#             weighted_absolute_difference_metric = np.concatenate(weighted_absolute_difference_metric)
#             mean_weighted_absolute_difference_metric = np.nanmean(weighted_absolute_difference_metric)
#         else:
#             mean_snr_median = float('nan')
#             mean_ks_metric = float('nan')
#             mean_asci_metric = float('nan')     
#             mean_weighted_absolute_difference_metric = float('nan')          

#     else:
#         mean_snr = float('nan')
#         mean_snr_median = float('nan')
#         mean_ks_metric = float('nan')
#         mean_asci_metric = float('nan')  
#         mean_weighted_absolute_difference_metric = float('nan')          

#     # Compute the classification metrics.
#     records_completed_classification = list()
#     label_dxs = list()
#     output_dxs = list()

#     # Iterate over the records.
#     for record in records:
#         # Load the classes, if available.
#         label_record = os.path.join(label_folder, record)
#         label_dx = load_dx(label_record)

#         if label_dx:
#             output_record = os.path.join(output_folder, record)
#             output_dx = load_dx(output_record)

#             if output_dx:
#                 records_completed_classification.append(record)

#             label_dxs.append(label_dx)
#             output_dxs.append(output_dx)

#     if records_completed_classification:
#         f_measure, _, _ = compute_f_measure(label_dxs, output_dxs)
#     else:
#         f_measure = float('nan')

#     # Return the results.
#     return mean_snr, mean_snr_median, mean_ks_metric, mean_asci_metric, mean_weighted_absolute_difference_metric, f_measure

# # Run the code.
# def run(args):
#     # Compute the scores for the model outputs.
#     scores = evaluate_model(args.label_folder, args.output_folder, args.extra_scores)

#     # Unpack the scores.
#     snr, snr_median, ks_metric, asci_metric, mean_weighted_absolute_difference_metric, f_measure = scores

#     # Construct a string with scores.
#     if not args.extra_scores:
#         output_string = \
#             f'SNR: {snr:.3f}\n' + \
#             f'F-measure: {f_measure:.3f}\n'
#     else:
#         output_string = \
#             f'SNR: {snr:.3f}\n' + \
#             f'SNR median: {snr_median:.3f}\n' \
#             f'KS metric: {ks_metric:.3f}\n' + \
#             f'ASCI metric: {asci_metric:.3f}\n' \
#             f'Weighted absolute difference metric: {mean_weighted_absolute_difference_metric:.3f}\n' \
#             f'F-measure: {f_measure:.3f}\n'              

#     # Output the scores to screen and/or a file.
#     if args.score_file:
#         save_text(args.score_file, output_string)
#     else:
#         print(output_string)

# if __name__ == '__main__':
#     run(get_parser().parse_args(sys.argv[1:]))

In [12]:
import cv2
import numpy as np
from scipy.signal import resample, find_peaks

def preprocess_ecg_image(image_path):
    # Load the ECG image
    image = cv2.imread(image_path)
    
    # Preprocess the image
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    
    # Remove gridlines
    kernel = np.ones((3, 3), np.uint8)
    opening = cv2.morphologyEx(gray, cv2.MORPH_OPEN, kernel, iterations=1)
    closing = cv2.morphologyEx(opening, cv2.MORPH_CLOSE, kernel, iterations=1)
    gridless = cv2.subtract(gray, closing)
    
    # Threshold the image
    _, thresh = cv2.threshold(gridless, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    
    # Detect and extract the ECG signal
    contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    ecg_signal = []
    for contour in contours:
        if cv2.contourArea(contour) > 100:  # Filter contours based on area
            for point in contour:
                ecg_signal.append(point[0][1])  # Extract y-coordinate of the ECG signal
    
    # Normalize and resample the ECG signal
    ecg_signal = np.array(ecg_signal)
    ecg_signal = (ecg_signal - np.min(ecg_signal)) / (np.max(ecg_signal) - np.min(ecg_signal))
    ecg_signal = resample(ecg_signal, num=1000)  # Resample to 1000 data points
    
    return ecg_signal

def extract_rr_intervals_and_heart_rate(ecg_signal, sampling_rate):
    # Detect R-peaks using a peak detection algorithm
    r_peaks, _ = find_peaks(ecg_signal, distance=sampling_rate // 2, prominence=0.5)
    
    # Calculate R-R intervals
    rr_intervals = np.diff(r_peaks) / sampling_rate
    
    # Calculate heart rate
    heart_rate = 60 / np.mean(rr_intervals)
    
    return rr_intervals, heart_rate

# Usage
image_path = '/home/vinayaka/physionet/records100_images/Test_labels/02000/02005_lr-0.png'
sampling_rate = 500  # Adjust the sampling rate according to your ECG image

# Preprocess the ECG image
ecg_signal = preprocess_ecg_image(image_path)

# Extract R-R intervals and heart rate
rr_intervals, heart_rate = extract_rr_intervals_and_heart_rate(ecg_signal, sampling_rate)

print("R-R Intervals (in seconds):")
print(rr_intervals)
print("Heart Rate (in beats per minute):")
print(heart_rate)

R-R Intervals (in seconds):
[]
Heart Rate (in beats per minute):
nan


In [2]:
# Feature extravtion from test data


from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.applications.vgg19 import VGG19, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
import cv2
import os

# Set the path to your ECG image directory
ecg_dir = "/home/vinayaka/physionet/records100_images/Test"

# Set the target image size for VGG19
target_size = (224, 224)

# Load the pre-trained VGG16 model without the top classification layer
vgg_model = VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Function to preprocess and extract features from an ECG image
def extract_features(record):
    images = load_image(record)
    features = []
    for image in images:
        image = np.asarray(image)
        image = cv2.resize(image, target_size)
        image = img_to_array(image)
        image = np.expand_dims(image, axis=0)
        image = preprocess_input(image)
        feature = vgg_model.predict(image)
        feature = feature.flatten()
        features.append(feature)
    features = np.concatenate(features)
    return features

# Load the image(s) for a record.
def load_image(record):
    path = os.path.split(record)[0]
    image_files = get_image_files(record)
    
    images = list()
    for image_file in image_files:
        image_file_path = os.path.join(path, image_file)
        if os.path.isfile(image_file_path):
            image = Image.open(image_file_path)
            images.append(image)
    
    return images

# Get the image file(s) for a record.
def get_image_files(record):
    header_file = get_header_file(record)
    header = load_text(header_file)
    image_files = get_image_files_from_header(header)
    return image_files

# Get the image file(s) from a header or a similar string.
def get_image_files_from_header(string):
    images, has_image = get_variables(string, '#Image:')
    if not has_image:
        raise Exception('No images available: did you forget to generate or include the images?')
    return images

# Load the header for a record.
def load_header(record):
    header_file = get_header_file(record)
    header = load_text(header_file)
    return header

# Get the header file for a record.
import os

# Get the header file for a record.
def get_header_file(record):
    if not record.endswith('.hea'):
        header_file = record + '.hea'
    else:
        header_file = record
    return header_file

# Get variables from a text file.
def get_variables(string, variable_name, sep=','):
    variables = list()
    has_variable = False
    for l in string.split('\n'):
        if l.startswith(variable_name):
            variables += [variable.strip() for variable in l[len(variable_name):].strip().split(sep)]
            has_variable = True
    return variables, has_variable

# Load a text file as a string.
def load_text(filename):
    with open(filename, 'r') as f:
        string = f.read()
    return string

# Find the records in a folder and its subfolders.
def find_records(folder):
    records = set()
    for root, directories, files in os.walk(folder):
        for file in files:
            extension = os.path.splitext(file)[1]
            if extension == '.hea':
                record = os.path.relpath(os.path.join(root, file), folder)[:-4]
                records.add(record)
    records = sorted(records)
    return records

# Load and preprocess the ECG images
test_features = list()
test_dxs = list()

records = find_records(ecg_dir)
num_records = len(records)

if num_records == 0:
    raise FileNotFoundError('No data was provided.')

for i in range(num_records):
    width = len(str(num_records))
    print(f'- {i+1:>{width}}/{num_records}: {records[i]}...')

    record = os.path.join(ecg_dir, records[i])
    
    # Extract the features from the image, but only if the image has one or more dx classes.
    test_dx = load_dx(record)
    if test_dx:
        current_features = extract_features(record)
        test_features.append(current_features)
        test_dxs.append(test_dx)

if not test_dxs:
    raise Exception('There are no labels for the data.')

test_features = np.vstack(test_features)
test_classes = sorted(set.union(*map(set, test_dxs)))
#test_classes = ['CD', 'HYP', 'MI', 'NORM', 'STTC']
test_dxs = compute_one_hot_encoding(test_dxs, test_classes)

-   1/998: 02000/02000_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 280ms/step
-   2/998: 02000/02001_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 140ms/step
-   3/998: 02000/02002_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 154ms/step
-   4/998: 02000/02003_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 137ms/step
-   5/998: 02000/02004_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 138ms/step
-   6/998: 02000/02005_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 141ms/step
-   7/998: 02000/02006_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 137ms/step
-   8/998: 02000/02007_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 139ms/step
-   9/998: 02000/02008_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 184ms/step
-  10/998: 02000/02009_lr...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

In [24]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    hamming_loss,
    roc_auc_score,
    accuracy_score,
    jaccard_score
)
#from sklearn.multioutput import MultiOutputClassifier
from sklearn.metrics import accuracy_score, hamming_loss
from sklearn.multiclass import OneVsRestClassifier

# Split the dataset into training and testing sets
#X_train, X_test, Y_train, Y_test = train_test_split(features, dxs, test_size=0.3, random_state=789)

X_train = features
Y_train = dxs

X_test = test_features
Y_test = test_dxs

# Initialize the base classifier
base_lr = RandomForestClassifier(n_estimators=123, max_leaf_nodes=456, random_state=789)

# Use the OneVsRestClassifier to handle multi-label cases. Uses binary relevance technique to handle multi-label classification scenario
ovr_classifier = OneVsRestClassifier(base_lr)

# Train the classifier on the dataset
ovr_classifier.fit(X_train, Y_train)
    
# Make predictions
Y_pred = ovr_classifier.predict(X_test)
    

# # Calculate evaluation metrics
precision = precision_score(Y_test, Y_pred, average='weighted')
print(precision)
recall = recall_score(Y_test, Y_pred, average='weighted')
print(recall)
f1 = f1_score(Y_test, Y_pred, average='weighted')
print(f1)
# hamming = hamming_loss(Y_test, Y_pred)
# subset_accuracy = accuracy_score(Y_test, Y_pred)
# jaccard = jaccard_score(Y_test, Y_pred, average='samples')   


0.38276553106212424
0.3739144956579826
0.37631930527722113


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:
[0, 1, 0, 1, 1]