In [1]:
import io
import os
import numpy as np
import pandas as pd
import pydicom
import matplotlib.pyplot as plt

import SimpleITK as sitk

from dotenv import load_dotenv
load_dotenv();

In [2]:
data_path = os.getenv("MAIN_PATH", "")

DICOM_FOLDER = f"{data_path}/MRI_SEG_DICOM/Breast_MRI_086/01-01-1990-NA-BREASTROUTINE-86704/5.000000-ax dyn 1st pass-39884/"
ANNOTATION_BOX_FILE = f"{data_path}/Supplemental-Data/Annotation_Boxes.xlsx"

## Tumor masks

### convert DICOM to NRRD

In [3]:
def collect_patient_data(root_dir, max_patients=0):

    collected_paths = []
    patient_count = 0

    for patient_folder in os.listdir(root_dir):
        patient_path = os.path.join(root_dir, patient_folder)

        if not os.path.isdir(patient_path):
            continue

        if patient_count >= max_patients:
            break

        # get the random folder inside the patient folder
        random_folder = next((f for f in os.listdir(patient_path) if os.path.isdir(os.path.join(patient_path, f))), None)
        if not random_folder:
            continue

        random_folder_path = os.path.join(patient_path, random_folder)

        for modality_folder in os.listdir(random_folder_path):
            modality_path = os.path.join(random_folder_path, modality_folder)
            if os.path.isdir(modality_path) and "segment" not in modality_path.lower():
                idx = modality_path.find("Breast_MRI_")
                collected_paths.append(modality_path[idx:])

        patient_count += 1

    return collected_paths

MRI_DICOM_FOLDER = f"{data_path}/MRI_SEG_DICOM/"
max_patients_to_process = 2
valid_paths = collect_patient_data(MRI_DICOM_FOLDER, max_patients=max_patients_to_process)

In [4]:
mapping_paths = {}
for chunk in pd.read_csv("/media/pedro-lima/HDD/downloads/File_Path_Mapping_Tables.csv", chunksize=1_000):
    chunk["original_path_and_filename"] = chunk["original_path_and_filename"].apply(os.path.dirname).str.replace("DICOM_Images/", data_path+"/MRI_NRRD/") + ".nrrd"
    
    chunk["descriptive_path"] = chunk["descriptive_path"].apply(os.path.dirname)
    chunk["descriptive_path"] = chunk["descriptive_path"].str.replace(r'BreastMRI(\d+)', r'Breast_MRI_\1', regex=True)
    chunk["descriptive_path"] = chunk["descriptive_path"].apply(lambda x: x[x.find("Breast_MRI_"):])

    chunk.drop_duplicates(inplace=True)
    mapping_paths.update(chunk.set_index("descriptive_path")["original_path_and_filename"].to_dict())

mapping_paths = {k: v for k, v in mapping_paths.items() if list(mapping_paths.values()).count(v) == 1}

In [5]:
new_dict = {}

for p in valid_paths:
    for m,v in mapping_paths.items():
        if p.split("/")[0] == m.split("/")[0] and p.split("-")[-1] == m.split("-")[-1]:
            new_dict[f"{data_path}/MRI_SEG_DICOM/{p}"] = v

In [6]:
def dicom_to_nrrd(dicom_folder, nrrd_path):
    """load the DICOM images, stack them into a 3D array, and save them as an NRRD file"""
    dicom_reader = sitk.ImageSeriesReader()
    dicom_files = dicom_reader.GetGDCMSeriesFileNames(dicom_folder)
    dicom_reader.SetFileNames(dicom_files)
    image_3d = dicom_reader.Execute()
    
    os.makedirs(os.path.dirname(nrrd_path), exist_ok=True)
    sitk.WriteImage(image_3d, nrrd_path)


In [None]:
for d_folder, nrrd_file  in new_dict.items():
    dicom_to_nrrd(d_folder, nrrd_file)

### get 3D bounding boxes from annotation boxes

In [None]:
import numpy as np

def create_mask(image_shape, annotation):

    mask = np.zeros(image_shape, dtype=np.uint8)
    start_row, end_row = annotation["Start Row"], annotation["End Row"]
    start_col, end_col = annotation["Start Column"], annotation["End Column"]
    start_slice, end_slice = annotation["Start Slice"], annotation["End Slice"]

    mask[start_slice:end_slice + 1, start_row:end_row + 1, start_col:end_col + 1] = 1
    return mask

import SimpleITK as sitk

def save_mask(mask, reference_image_path, output_path):
    """
    Save the binary mask as a NIfTI file with reference metadata.
    :param mask: 3D numpy array (binary mask).
    :param reference_image_path: Path to the original DICOM or reference NIfTI.
    :param output_path: Path to save the output mask.
    """
    reference_image = sitk.ReadImage(reference_image_path)
    mask_image = sitk.GetImageFromArray(mask)
    mask_image.CopyInformation(reference_image)
    sitk.WriteImage(mask_image, output_path)

import os

def process_all_patients(patients_dir, annotations, output_dir):
    """
    Process all patients to generate segmentation masks.
    :param patients_dir: Directory containing patient DICOM folders.
    :param annotations: DataFrame or dict containing annotation boxes for all patients.
    :param output_dir: Directory to save the masks.
    """
    for patient_id, annotation in annotations.items():
        patient_folder = os.path.join(patients_dir, f"Breast_MRI_{patient_id}")
        dicom_files = load_dicom_files(patient_folder)  # Your existing `load_dicom_files` function
        reference_image_path = dicom_files[0].filename

        # Create mask
        image_shape = (len(dicom_files), dicom_files[0].Rows, dicom_files[0].Columns)
        mask = create_mask(image_shape, annotation)

        # Save mask
        output_path = os.path.join(output_dir, f"Breast_MRI_{patient_id}_mask.nii.gz")
        save_mask(mask, reference_image_path, output_path)

In [None]:
# Create a 3D Mask Based on Annotation Boxes
# create a mask where the tumor voxels are set to 1 and other voxels are set to 0
import numpy as np

def create_3d_mask(dicom_folder, annotation_df, nrrd_path):
    # Load the 3D DICOM volume
    dicom_reader = sitk.ImageSeriesReader()
    dicom_files = dicom_reader.GetGDCMSeriesFileNames(dicom_folder)
    dicom_reader.SetFileNames(dicom_files)
    image_3d = dicom_reader.Execute()
    
    # Convert the image to a numpy array
    image_array = sitk.GetArrayFromImage(image_3d)

    # Create an empty mask (same size as the image)
    mask_array = np.zeros_like(image_array)

    # Iterate through each patient's annotation box
    for _, annotation in annotation_df.iterrows():
        # Extract annotation box coordinates for each slice
        start_row, end_row = int(annotation["Start Row"]), int(annotation["End Row"])
        start_column, end_column = int(annotation["Start Column"]), int(annotation["End Column"])
        start_slice, end_slice = int(annotation["Start Slice"]), int(annotation["End Slice"])

        # Fill the mask array for the tumor region
        mask_array[start_slice:end_slice, start_row:end_row, start_column:end_column] = 1

    # Convert the mask array back to a SimpleITK image
    mask_image = sitk.GetImageFromArray(mask_array)

    # Save the mask as NRRD (same format as the image)
    mask_path = 'path_to_output_mask.nrrd'
    sitk.WriteImage(mask_image, mask_path)

# Example usage
annotation_df = pd.read_csv('annotation_boxes.csv')  # Replace with the path to your annotation CSV
create_3d_mask(dicom_folder, annotation_df, nrrd_path)