In [None]:
import os
import pydicom
import cv2
import xml.etree.ElementTree as ET
from multiprocessing import Pool, cpu_count

# Define paths (NVMe SSD)
dicom_root = r"E:\Final Year Project\Code\dcm images"
annotation_root = r"E:\Final Year Project\Code\Annotation"
output_root = r"E:\Final Year Project\Code\Processed"

# Check if paths exist
if not os.path.exists(dicom_root):
    raise FileNotFoundError(f"The folder '{dicom_root}' does not exist.")
if not os.path.exists(annotation_root):
    raise FileNotFoundError(f"The folder '{annotation_root}' does not exist.")

# Create output folder if it doesn't exist
os.makedirs(output_root, exist_ok=True)

def load_dicom_image(dicom_path):
    """Load a DICOM image and return it as a NumPy array."""
    dicom = pydicom.dcmread(dicom_path)
    image = dicom.pixel_array
    return image

def parse_xml_annotation(xml_path):
    """Parse XML annotations and extract bounding boxes and labels."""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    boxes = []
    labels = []
    for obj in root.findall('object'):
        label = obj.find('name').text
        bbox = obj.find('bndbox')
        xmin = int(bbox.find('xmin').text)
        ymin = int(bbox.find('ymin').text)
        xmax = int(bbox.find('xmax').text)
        ymax = int(bbox.find('ymax').text)
        boxes.append([xmin, ymin, xmax, ymax])
        labels.append(label)
    return boxes, labels

def convert_dicom_to_png(dicom_path, output_path):
    """Convert a DICOM image to PNG format and save it."""
    image = load_dicom_image(dicom_path)
    cv2.imwrite(output_path, image)

def find_xml_file(dicom_filename, annotation_root):
    """Recursively search for the corresponding XML file in subfolders."""
    for root, dirs, files in os.walk(annotation_root):
        for file in files:
            if file == dicom_filename.replace(".dcm", ".xml"):
                return os.path.join(root, file)
    return None

def process_file(args):
    dicom_path, annotation_root, output_root = args
    try:
        # Find the corresponding XML file
        xml_path = find_xml_file(os.path.basename(dicom_path), annotation_root)
        if not xml_path:
            print(f"Warning: No annotation found for {os.path.basename(dicom_path)}")
            return
        
        # Create output folder for the patient
        patient_folder = os.path.basename(os.path.dirname(os.path.dirname(dicom_path)))
        output_patient_folder = os.path.join(output_root, patient_folder)
        os.makedirs(output_patient_folder, exist_ok=True)
        
        # Convert DICOM to PNG
        output_image_path = os.path.join(output_patient_folder, os.path.basename(dicom_path).replace(".dcm", ".png"))
        convert_dicom_to_png(dicom_path, output_image_path)
        
        # Parse XML annotations
        boxes, labels = parse_xml_annotation(xml_path)
        print(f"Processed {os.path.basename(dicom_path)}: Boxes={boxes}, Labels={labels}")
    except Exception as e:
        print(f"Error processing {dicom_path}: {e}")

def find_dicom_files(dicom_root):
    """Recursively find all DICOM files in the given folder."""
    dicom_files = []
    for root, dirs, files in os.walk(dicom_root):
        for file in files:
            if file.endswith(".dcm"):
                dicom_files.append(os.path.join(root, file))
    return dicom_files

def process_dataset_parallel(dicom_root, annotation_root, output_root):
    """Process all DICOM images and XML annotations in parallel."""
    # Find all DICOM files
    dicom_files = find_dicom_files(dicom_root)
    print(f"Found {len(dicom_files)} DICOM files.")
    
    # Prepare tasks for parallel processing
    tasks = [(dicom_path, annotation_root, output_root) for dicom_path in dicom_files]
    
    # Use multiprocessing to process files in parallel
    with Pool(cpu_count()) as pool:
        pool.map(process_file, tasks)

# Run the parallel processing script
process_dataset_parallel(dicom_root, annotation_root, output_root)
print("Dataset processing complete!")

c:\Python311\Lib\site-packages\numpy\.libs\libopenblas64__v0.3.21-gcc_10_3_0.dll
c:\Python311\Lib\site-packages\numpy\.libs\libopenblas64__v0.3.23-gcc_10_3_0.dll


Found 188 DICOM files.


In [1]:
import time

def process_file(args):
    start_time = time.time()
    dicom_path, annotation_root, output_root = args
    try:
        # Find the corresponding XML file
        xml_path = find_xml_file(os.path.basename(dicom_path), annotation_root)
        if not xml_path:
            print(f"Warning: No annotation found for {os.path.basename(dicom_path)}")
            return
        
        # Create output folder for the patient
        patient_folder = os.path.basename(os.path.dirname(os.path.dirname(dicom_path)))
        output_patient_folder = os.path.join(output_root, patient_folder)
        os.makedirs(output_patient_folder, exist_ok=True)
        
        # Convert DICOM to PNG
        output_image_path = os.path.join(output_patient_folder, os.path.basename(dicom_path).replace(".dcm", ".png"))
        convert_dicom_to_png(dicom_path, output_image_path)
        
        # Parse XML annotations
        boxes, labels = parse_xml_annotation(xml_path)
        print(f"Processed {os.path.basename(dicom_path)}: Boxes={boxes}, Labels={labels}")
    except Exception as e:
        print(f"Error processing {dicom_path}: {e}")
    finally:
        print(f"Time taken for {os.path.basename(dicom_path)}: {time.time() - start_time:.2f} seconds")

In [None]:
import os
import pydicom
import cv2
import xml.etree.ElementTree as ET
from multiprocessing import Pool
import time
import warnings

# Suppress NumPy warnings
warnings.filterwarnings("ignore", category=UserWarning, module="numpy")

# Define paths (NVMe SSD)
dicom_root = r"E:\Final Year Project\Code\dcm images"
annotation_root = r"E:\Final Year Project\Code\Annotation"
output_root = r"E:\Final Year Project\Code\Processed"

# Check if paths exist
if not os.path.exists(dicom_root):
    raise FileNotFoundError(f"The folder '{dicom_root}' does not exist.")
if not os.path.exists(annotation_root):
    raise FileNotFoundError(f"The folder '{annotation_root}' does not exist.")

# Create output folder if it doesn't exist
os.makedirs(output_root, exist_ok=True)

def load_dicom_image(dicom_path):
    """Load a DICOM image and return it as a NumPy array."""
    dicom = pydicom.dcmread(dicom_path)
    image = dicom.pixel_array
    return image

def parse_xml_annotation(xml_path):
    """Parse XML annotations and extract bounding boxes and labels."""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    boxes = []
    labels = []
    for obj in root.findall('object'):
        label = obj.find('name').text
        bbox = obj.find('bndbox')
        xmin = int(bbox.find('xmin').text)
        ymin = int(bbox.find('ymin').text)
        xmax = int(bbox.find('xmax').text)
        ymax = int(bbox.find('ymax').text)
        boxes.append([xmin, ymin, xmax, ymax])
        labels.append(label)
    return boxes, labels

def convert_dicom_to_png(dicom_path, output_path, resize=(512, 512)):
    """Convert a DICOM image to PNG format and resize it."""
    image = load_dicom_image(dicom_path)
    image = cv2.resize(image, resize)
    cv2.imwrite(output_path, image)

def find_xml_file(dicom_filename, annotation_root):
    """Recursively search for the corresponding XML file in subfolders."""
    for root, dirs, files in os.walk(annotation_root):
        for file in files:
            if file == dicom_filename.replace(".dcm", ".xml"):
                return os.path.join(root, file)
    return None

def process_file(args):
    start_time = time.time()
    dicom_path, annotation_root, output_root = args
    try:
        # Find the corresponding XML file
        xml_path = find_xml_file(os.path.basename(dicom_path), annotation_root)
        if not xml_path:
            print(f"Warning: No annotation found for {os.path.basename(dicom_path)}")
            return
        
        # Create output folder for the patient
        patient_folder = os.path.basename(os.path.dirname(os.path.dirname(dicom_path)))
        output_patient_folder = os.path.join(output_root, patient_folder)
        os.makedirs(output_patient_folder, exist_ok=True)
        
        # Convert DICOM to PNG
        output_image_path = os.path.join(output_patient_folder, os.path.basename(dicom_path).replace(".dcm", ".png"))
        convert_dicom_to_png(dicom_path, output_image_path)
        
        # Parse XML annotations
        boxes, labels = parse_xml_annotation(xml_path)
        print(f"Processed {os.path.basename(dicom_path)}: Boxes={boxes}, Labels={labels}")
    except Exception as e:
        print(f"Error processing {dicom_path}: {e}")
    finally:
        print(f"Time taken for {os.path.basename(dicom_path)}: {time.time() - start_time:.2f} seconds")

def find_dicom_files(dicom_root):
    """Recursively find all DICOM files in the given folder."""
    dicom_files = []
    for root, dirs, files in os.walk(dicom_root):
        for file in files:
            if file.endswith(".dcm"):
                dicom_files.append(os.path.join(root, file))
    return dicom_files

def process_dataset_parallel(dicom_root, annotation_root, output_root):
    """Process all DICOM images and XML annotations in parallel."""
    # Find all DICOM files
    dicom_files = find_dicom_files(dicom_root)
    print(f"Found {len(dicom_files)} DICOM files.")
    
    # Prepare tasks for parallel processing
    tasks = [(dicom_path, annotation_root, output_root) for dicom_path in dicom_files]
    
    # Use multiprocessing to process files in parallel
    with Pool(2) as pool:  # Use 2 processes instead of cpu_count()
        pool.map(process_file, tasks)

# Run the parallel processing script
process_dataset_parallel(dicom_root, annotation_root, output_root)
print("Dataset processing complete!")

Found 60 DICOM files.
