In [24]:
# %% [markdown]
# # Prepare PTB-XL Data
# 
# This notebook prepares the PTB-XL database for use in the Challenge.

# %%
# Load libraries
import numpy as np
import os
import pandas as pd
import shutil
import ast

# %% [markdown]
# ## Helper Functions

# %%
def find_records(folder):
  records = set()
  for root, directories, files in os.walk(folder):
      for file in files:
          if file.endswith('.hea'):
              record = os.path.relpath(os.path.join(root, file), folder)[:-4]
              records.add(record)
  return sorted(records)

def load_text(filename):
  with open(filename, 'r') as f:
      return f.read()

def save_text(filename, string):
  with open(filename, 'w') as f:
      f.write(string)

def get_signal_files(record):
  header_file = record + '.hea'
  header = load_text(header_file)
  return get_signal_files_from_header(header)

def get_signal_files_from_header(string):
  signal_files = []
  for i, l in enumerate(string.split('\n')):
      arrs = [arr.strip() for arr in l.split(' ')]
      if i == 0 and not l.startswith('#'):
          num_channels = int(arrs[1])
      elif i <= num_channels and not l.startswith('#'):
          signal_file = arrs[0]
          if signal_file not in signal_files:
              signal_files.append(signal_file)
      else:
          break
  return signal_files

def cast_int_float_unknown(x):
  try:
      return int(x) if float(x).is_integer() else float(x)
  except ValueError:
      return 'Unknown'

# %% [markdown]
# ## Data Preparation

# %%
# Set file paths
input_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100/14000'
output_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/14000'
ptbxl_database_file = '/Users/vinayaka/Desktop/Physionet-24/physionet/python-example-2024/physionet.org/files/ptb-xl/1.0.3/ptbxl_database.csv'
ptbxl_mapping_file = '/Users/vinayaka/Desktop/Physionet-24/physionet/python-example-2024/physionet.org/files/ptb-xl/1.0.3/scp_statements.csv'
sl_database_file = '/Users/vinayaka/Desktop/Physionet-24/physionet/python-example-2024/physionet.org/files/ptb-xl/1.0.3/12sl_statements.csv'
sl_mapping_file = '/Users/vinayaka/Desktop/Physionet-24/physionet/python-example-2024/physionet.org/files/ptb-xl/1.0.3/12slv23ToSNOMED.csv'

# Assign each class to a superclass
df_ptbxl_mapping = pd.read_csv(ptbxl_mapping_file, index_col=0)
subclass_to_superclass = {i: row['diagnostic_class'] for i, row in df_ptbxl_mapping.iterrows() if row['diagnostic'] == 1}

def assign_superclass(subclasses):
  return list(set(subclass_to_superclass[subclass] for subclass in subclasses if subclass in subclass_to_superclass))

# Load the PTB-XL labels
df_ptbxl_database = pd.read_csv(ptbxl_database_file, index_col='ecg_id')
df_ptbxl_database.scp_codes = df_ptbxl_database.scp_codes.apply(ast.literal_eval)

# Map the PTB-XL classes to superclasses
df_ptbxl_database['diagnostic_superclass'] = df_ptbxl_database.scp_codes.apply(assign_superclass)

# Load the 12SL labels
df_sl_database = pd.read_csv(sl_database_file, index_col='ecg_id')

# Map the 12SL classes to the PTB-XL classes for acute myocardial infarction (MI) classes
df_sl_mapping = pd.read_csv(sl_mapping_file, index_col='StatementNumber')

acute_mi_statements = {821, 822, 823, 827, 829, 902, 903, 904, 963, 964, 965, 966, 967, 968}
acute_mi_classes = {df_sl_mapping.loc[statement]['Acronym'] for statement in acute_mi_statements if statement in df_sl_mapping.index}

# %% [markdown]
# ## Process Records

# %%
# Identify the header files
records = find_records(input_folder)

# Update the header files and copy signal files
for record in records:
  # Extract the demographics data
  record_path, record_basename = os.path.split(record)
  ecg_id = int(record_basename.split('_')[0])
  row = df_ptbxl_database.loc[ecg_id]

  date_string, time_string = row['recording_date'].split(' ')
  dd, mm, yyyy = date_string.split('-')[::-1]
  date_string = f'{dd}/{mm}/{yyyy}'

  age = cast_int_float_unknown(row['age'])
  sex = ['Male', 'Female', 'Unknown'][int(row['sex'])]
  height = cast_int_float_unknown(row['height'])
  weight = cast_int_float_unknown(row['weight'])

  scp_codes = [scp_code for scp_code, value in row['scp_codes'].items() if value >= 0]
  superclasses = row['diagnostic_superclass']

  sl_codes = df_sl_database.loc[ecg_id]['statements'] if ecg_id in df_sl_database.index else []

  # Determine labels
  labels = []
  if 'NORM' in superclasses:
      labels.append('NORM')
  if any(c in sl_codes for c in acute_mi_classes):
      labels.append('Acute MI')
  if 'MI' in superclasses and not any(c in sl_codes for c in acute_mi_classes):
      labels.append('Old MI')      
  if 'STTC' in superclasses:
      labels.append('STTC')
  if 'CD' in superclasses:
      labels.append('CD')
  if 'HYP' in superclasses:
      labels.append('HYP')
  if 'PAC' in scp_codes:
      labels.append('PAC')
  if 'PVC' in scp_codes:
      labels.append('PVC')
  if {'AFIB', 'AFLT'} & set(scp_codes):
      labels.append('AFIB/AFL')
  if {'STACH', 'SVTAC', 'PSVT'} & set(scp_codes):
      labels.append('TACHY')
  if 'SBRAD' in scp_codes:
      labels.append('BRADY') 
  labels = ', '.join(labels)

  # Update the header file
  input_header_file = os.path.join(input_folder, record + '.hea')
  output_header_file = os.path.join(output_folder, record + '.hea')

  output_path = os.path.join(output_folder, record_path)
  os.makedirs(output_path, exist_ok=True)

  input_header = load_text(input_header_file)
  lines = input_header.split('\n')
  
  record_line = ' '.join(lines[0].strip().split(' ')[:4]) + f' {time_string} {date_string}\n'
  signal_lines = '\n'.join(l.strip() for l in lines[1:] if l.strip() and not l.startswith('#')) + '\n'
  comment_lines = '\n'.join(l.strip() for l in lines[1:] 
      if l.startswith('#') and not any(l.startswith(x) for x in ('# Age:', '# Sex:', '# Height:', '# Weight:', '# Labels:')))
  comment_lines += f'\n# Age: {age}\n# Sex: {sex}\n# Height: {height}\n# Weight: {weight}\n# Labels: {labels}\n'

  output_header = record_line + signal_lines + comment_lines

  save_text(output_header_file, output_header)

  # Copy the signal files if the input and output folders are different
  if input_folder != output_folder:
      for signal_file in get_signal_files(os.path.join(input_folder, record)):
          input_signal_file = os.path.join(input_folder, record_path, signal_file)
          output_signal_file = os.path.join(output_folder, record_path, signal_file)
          if os.path.isfile(input_signal_file):
              shutil.copy2(input_signal_file, output_signal_file)

print("Data preparation completed.")

Data preparation completed.


In [25]:
import os
import sys
import random
import csv
import multiprocessing as mp
from helper_functions import find_records
import warnings
from multiprocessing import Pool, cpu_count
from ecg_processing import process_file  # Import the updated process_file function

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 
warnings.filterwarnings("ignore")

# Force the use of 'forkserver' start method
mp.set_start_method("forkserver", force=True)

# Function to parse command-line style arguments
def parse_args(args):
  parsed_args = {}
  i = 0
  while i < len(args):
      if args[i].startswith('--'):
          if i + 1 < len(args) and not args[i+1].startswith('--'):
              parsed_args[args[i][2:]] = args[i+1]
              i += 2
          else:
              parsed_args[args[i][2:]] = True
              i += 1
      elif args[i].startswith('-'):
          if i + 1 < len(args) and not args[i+1].startswith('-'):
              parsed_args[args[i][1:]] = args[i+1]
              i += 2
          else:
              parsed_args[args[i][1:]] = True
              i += 1
      else:
          i += 1
  return parsed_args

# Command-line style arguments (modify as needed)
cmd_args = [
  '-i', '/Users/vinayaka/Desktop/Physionet-24/physionet/records100/14000',
  '-o', '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/14000',
  '--x_offset', '30',
  '--y_offset', '20',
  '-rot', '5',
  '-noise' '40',
  '--deterministic_rot',
  '--deterministic_noise',
#   '--hw_text', -- UnidentifiedImageError: cannot identify image file '/Users/vinayaka/Desktop/Physionet-24/physionet/ecg-image-kit/codes/ecg-image-generator/HandwrittenText.png'
  '-n', '4',
  '--wrinkles',
  '-ca', '45',
  '--print_header',
  '-se', '10',
  '--random_grid_color',
  '--add_qr_code'
]

# cmd_args = [
#   '-i', '/Users/vinayaka/Desktop/Physionet-24/physionet/records100/10000',
#   '-o', '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/10000',
#   '--augment',
#   '-rot', '5',
#   '-noise', '40',
#   '--deterministic_rot',
#   '--deterministic_noise',
#   '--hw_text',
#   '-n', '4',
#   '--x_offset', '30',
#   '--y_offset', '20',
#   '--wrinkles',
#   '-ca', '45',
#   '-se', '10',
#   '--print_header',
#   '--add_qr_code'
# ]

# Parse the arguments
args = parse_args(cmd_args)

# Set default values for required arguments if not provided
args.setdefault('input_directory', args.get('i'))
args.setdefault('output_directory', args.get('o'))
args.setdefault('seed', args.get('se', -1))
args.setdefault('num_leads', 'twelve')
args.setdefault('max_num_images', -1)
args.setdefault('config_file', 'config.yaml')

# Convert numeric strings to integers/floats
for key in ['seed', 'n', 'x_offset', 'y_offset', 'max_num_images']:
  if key in args and args[key] is not None:
      args[key] = int(args[key])

def run(args):
  global abs_input_directory, abs_output_directory
  random.seed(args['seed'])

  if not os.path.isabs(args['input_directory']):
      abs_input_directory = os.path.normpath(os.path.join(os.getcwd(), args['input_directory']))
  else:
      abs_input_directory = args['input_directory']

  if not os.path.isabs(args['output_directory']):
      abs_output_directory = os.path.normpath(os.path.join(os.getcwd(), args['output_directory']))
  else:
      abs_output_directory = args['output_directory']
  
  if not os.path.exists(abs_input_directory) or not os.path.isdir(abs_input_directory):
      raise Exception("The input directory does not exist. Please check the input directory path!")

  if not os.path.exists(abs_output_directory):
      os.makedirs(abs_output_directory)

  full_header_files, full_recording_files = find_records(abs_input_directory, abs_output_directory)
  file_tuples = list(zip(full_header_files, full_recording_files))

  if args['max_num_images'] != -1:
      file_tuples = file_tuples[:args['max_num_images']]

  num_processes = min(cpu_count(), len(file_tuples))  # Use all available CPU cores or the number of files, whichever is smaller

  with Pool(num_processes) as pool:
      results = pool.starmap(process_file, [(file_tuple, args, abs_input_directory, abs_output_directory) for file_tuple in file_tuples])

  total_images = sum(results)
  print(f"Generated {total_images} ECG images.")

# Run the function
run(args)

Generated 1000 ECG images.


In [26]:
import argparse
import json
import os
import os.path
import shutil
import sys
from collections import defaultdict

# Define helper functions (you'll need to implement these or import them)
def find_records(folder):
    """
    Find records in the folder.

    Args:
        folder: Path to the folder containing the records.

    Returns:
        A list of record names.
    """
    records = set()
    for root, directories, files in os.walk(folder):
        for file in files:
            extension = os.path.splitext(file)[1]
            if extension == '.hea':
                record = os.path.relpath(os.path.join(root, file), folder)[:-4]
                records.add(record)
    records = sorted(records)
    return records

def load_text(filename):
    """
    Load text from a file.

    Args:
        filename: Path to the file.

    Returns:
        The content of the file as a string.
    """
    with open(filename, 'r') as f:
        string = f.read()
    return string

def is_number(s):
    """
    Check if a string is a number.

    Args:
        s: The string to check.

    Returns:
        True if the string is a number, False otherwise.
    """
    try:
        float(s)
        return True
    except (ValueError, TypeError):
        return False

def get_signal_files(header_file):
    """
    Get signal files from a header file.

    Args:
        header_file: Path to the header file.

    Returns:
        A list of signal file names.
    """
    with open(header_file, 'r') as f:
        header = f.read()
    return get_signal_files_from_header(header)

def get_image_files(header_file):
    """
    Get image files from a header file.

    Args:
        header_file: Path to the header file.

    Returns:
        A list of image file names.
    """
    with open(header_file, 'r') as f:
        header = f.read()
    return get_image_files_from_header(header)

def get_signal_files_from_header(header):
    """
    Get signal files from a header string.

    Args:
        header: The header string.

    Returns:
        A list of signal file names.
    """
    signal_files = []
    for line in header.splitlines():
        if line.startswith('# Signals:'):
            signal_files = line.split(': ')[1].split(', ')
            break
    return signal_files

def get_image_files_from_header(header):
    """
    Get image files from a header string.

    Args:
        header: The header string.

    Returns:
        A list of image file names.
    """
    image_files = []
    for line in header.splitlines():
        if line.startswith('#Images:'):
            image_files = line.split(': ')[1].split(', ')
            break
    return image_files

# Define the substring_images variable
substring_images = '#Images:'  # Adjust this if needed

# Find files function
def find_files(folder, extensions, remove_extension=False, sort=False):
    selected_files = set()
    for root, directories, files in os.walk(folder):
        for file in files:
            extension = os.path.splitext(file)[1]
            if extension in extensions:
                file = os.path.relpath(os.path.join(root, file), folder)
                if remove_extension:
                    file = os.path.splitext(file)[0]
                selected_files.add(file)
    if sort:
        selected_files = sorted(selected_files)
    return selected_files

# Main function
def run(input_folder, output_folder):
    """
    Run the script.

    Args:
        input_folder: Path to the input folder.
        output_folder: Path to the output folder.
    """

    # Define variables
    image_file_types = ['.png', '.jpg', '.jpeg']

    # Find the header files
    records = find_records(input_folder)

    # Find the image files
    image_files = find_files(input_folder, image_file_types)
    record_to_image_files = defaultdict(set)
    for image_file in image_files:
        root, ext = os.path.splitext(image_file)
        record = '-'.join(root.split('-')[:-1])
        basename = os.path.basename(image_file)
        record_to_image_files[record].add(basename)

    # Update the header files and copy signal files
    for record in records:
        record_path, record_basename = os.path.split(record)
        record_image_files = record_to_image_files[record]

        # Sort the images
        record_suffixes = [os.path.splitext(image_file)[0].split('-')[-1] for image_file in record_image_files]
        if all(is_number(suffix) for suffix in record_suffixes):
            record_image_files = sorted(record_image_files, key=lambda image_file: float(os.path.splitext(image_file)[0].split('-')[-1]))
        else:
            record_image_files = sorted(record_image_files)
        
        # Update the header files
        input_header_file = os.path.join(input_folder, record + '.hea')
        output_header_file = os.path.join(output_folder, record + '.hea')

        input_header = load_text(input_header_file)
        output_header = ''
        for l in input_header.split('\n'):
            if not l.startswith(substring_images) and l:
                output_header += l + '\n'

        record_image_string = ', '.join(record_image_files)
        output_header += f'{substring_images} {record_image_string}\n'

        input_path = os.path.join(input_folder, record_path)
        output_path = os.path.join(output_folder, record_path)

        os.makedirs(output_path, exist_ok=True)

        with open(output_header_file, 'w') as f:
            f.write(output_header)

        # Copy the signal and image files if available
        if os.path.normpath(input_folder) != os.path.normpath(output_folder):
            relative_path = os.path.split(record)[0]

            signal_files = get_signal_files(output_header_file)
            for signal_file in signal_files:
                input_signal_file = os.path.join(input_folder, relative_path, signal_file)
                output_signal_file = os.path.join(output_folder, relative_path, signal_file)
                if os.path.isfile(input_signal_file):
                    shutil.copy2(input_signal_file, output_signal_file)

            image_files = get_image_files(output_header_file)
            for image_file in image_files:
                input_image_file = os.path.join(input_folder, relative_path, image_file)
                output_image_file = os.path.join(output_folder, relative_path, image_file)
                if os.path.isfile(input_image_file):
                    shutil.copy2(input_image_file, output_image_file)

# Run the script
input_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/14000'  # Replace with your input folder path
output_folder = '/Users/vinayaka/Desktop/Physionet-24/physionet/records100_images/Train/14000'  # Replace with your output folder path

run(input_folder, output_folder)