In [1]:
import numpy
import os

import dicom2nifti # to convert DICOM files to the NIftI format
import nibabel as nib # nibabel to handle nifti files
import SimpleITK as sitk
import json

In [7]:

import SimpleITK as sitk
import pandas as pd
import numpy as np
import os
import json

def analyze_images(folder_root, image_path, label_path):
    """
    Analyze the images in the given subfolder and return the statistics as a dictionary.
    
    Parameters:
    - subfolder_path: str, the path to the subfolder containing the images
    - suffix: str, the suffix of the image files (e.g., '_cropped.nii.gz')
    
    Returns:
    - result: dict, the statistics of the images
    """
    image_path = os.path.join(folder_root, image_path)
    label_path = os.path.join(folder_root, label_path)

    
    if os.path.exists(image_path) and os.path.exists(label_path):
        # Read the NIfTI files
        image = sitk.ReadImage(image_path)
        label = sitk.ReadImage(label_path)
        
        # Get spacing and size
        image_spacing = image.GetSpacing()
        label_spacing = label.GetSpacing()


        # Get orientation
        image_orientation = image.GetDirection()
        label_orientation = label.GetDirection()

        # Calculate the range of pixel values in the patient image
        image_array = sitk.GetArrayFromImage(image)
        pixel_min = np.min(image_array)
        pixel_max = np.max(image_array)
        pixel_range = (pixel_min, pixel_max)

        # Calculate unique values in liver and vessels images
        label_array = sitk.GetArrayFromImage(label)
        label_unique_values = np.unique(label_array)
        
        
        image_size = image.GetSize()
        label_size = label.GetSize()

        # Check if spacing and size are the same
        if image_spacing != label_spacing:
            raise ValueError(f"Spacing mismatch in subfolder {image_path}")
        if image_size != label_size:
            raise ValueError(f"Size mismatch in subfolder {image_path}")
        # Check if orientations are the same
        # if image_orientation != label_orientation:
        #     raise ValueError(f"Orientation mismatch in subfolder {image_path}")

        # Calculate length
        length = tuple(p * s for p, s in zip(image_spacing, image_size))

        # Record the results
        result = {
            'Subfolder': os.path.basename(image_path),
            'Spacing': list(image_spacing),
            'Size': list(image_size),
            'Length': list(length),
            'orientation': list(image_orientation),
            'label orientation': list(label_orientation),
            'Pixel Range': list(pixel_range),
            'label Unique Values': list(label_unique_values),
        }
        
        return result
    else:
        raise FileNotFoundError("One or more required files are missing.")
    
def analyze_and_save_images(root_folder, json_path, excel_filename):
    """
    Traverse all subfolders in folder_root, analyze the images, and save the results to an Excel file.
    
    Parameters:
    - folder_root: str, the root folder containing the subfolders with images
    - suffix: str, the suffix of the image files (e.g., '.nii.gz')
    - excel_filename: str, the name of the output Excel file
    """
    if not os.path.exists(json_path):
        raise FileNotFoundError(f"{json_path} does not exist.")
    
    with open(json_path, 'r') as f:
        data = json.load(f)

    results = []
    for key in ['training', 'valid', 'test']:
        for set in ['0', '1']:
            for image_label_pair in data[key][set]:
                image_path = image_label_pair['image']
                label_path = image_label_pair['label']
                root_path = os.path.join(root_folder, data["root_dir"])
                result = analyze_images(root_path,  image_path, label_path)
                results.append(result)

    # Create a DataFrame from the results
    df_results = pd.DataFrame(results)

    # Write the results to an Excel file
    output_excel_path = os.path.join("./", excel_filename)
    df_results.to_excel(output_excel_path, index=False)
    print(f"Results saved to {output_excel_path}")

In [8]:
root_folder = os.path.join(os.getcwd(), "..")
json_path = os.path.join(root_folder, 'dataset/amos/amos22_dataset_adjusted.json')
excel_filename = 'amos_results.xlsx'
analyze_and_save_images(root_folder, json_path, excel_filename)


Results saved to ./amos_results.xlsx


In [3]:
def clear_all(path_json):
    with open(path_json, 'r') as f:
        info = json.load(f)

    root_dir = info["root_dir"]

    for key in ["training", "valid", "test"]:
        for fix_move in ["0", "1"]:
            for image_label in info[key][fix_move]:
                image = os.path.join(root_dir, image_label["image"])
                label = os.path.join(root_dir, image_label["label"])
                os.remove(image)
                os.remove(label)


In [4]:
def display_voxel_spacing(path_json):
    with open(path_json, 'r') as f:
        info = json.load(f)

    root_dir = info["root_dir"]

    for key in ["training", "valid", "test"]:
        for fix_move in ["0", "1"]:
            for image_label in info[key][fix_move]:
                image = os.path.join(root_dir, image_label["image"])
                label = os.path.join(root_dir, image_label["label"])

                volume = sitk.ReadImage(image) # read and cast to float32
                original_spacing = volume.GetSpacing()
                original_size = volume.GetSize()
                image_data = sitk.GetArrayFromImage(volume)
                image_filed = [int(round(osz*ospc)) for osz,ospc in zip(original_size, original_spacing)]
                image_spacing = [int(round(i*100)) for i in original_spacing]
                print(f"image_filed: {image_filed} spacing: {image_spacing} size: {original_size} max: {image_data.max()}  min: {image_data.min()}")

In [2]:
def resample_volume(input_path, output_path, new_size, new_spacing, min=0, max=800, is_mask=False, filed_clip=(0, 0, 0)):
    volume = sitk.ReadImage(input_path, sitk.sitkFloat32) # read and cast to float32

    volume_data = sitk.GetArrayFromImage(volume)

    if not is_mask:
        if max > min:
            volume_data = numpy.clip(volume_data, min, max)
            volume_data = (volume_data - min)/(max - min)
        elif max == min:
            max = numpy.max(volume_data)
            min = numpy.min(volume_data)
            assert( max > min)
            volume_data = (volume_data - min)/(max - min)
        else :
            raise ValueError()



    img_range_adjusted = sitk.GetImageFromArray(volume_data)
    img_range_adjusted.CopyInformation(volume)

    original_spacing = img_range_adjusted.GetSpacing()
    original_size = img_range_adjusted.GetSize()
    original_origin = img_range_adjusted.GetOrigin()

    original_filed = [a*b for a,b in zip(original_spacing, original_size)]
    new_filed = [a*b for a,b in zip(new_spacing, new_size)]
    offset = [(b- a)/2 for a,b in zip(new_filed, original_filed)]
    new_origin = [c + b*a for a,b,c in zip([1, -1, 1], offset, original_origin)]
            
    moved_origin = [c + b*a for a,b,c in zip(filed_clip, offset, new_origin)]

    interpolator =  sitk.sitkNearestNeighbor if is_mask  else sitk.sitkLinear
    img_resampled = sitk.Resample(img_range_adjusted, new_size, sitk.Transform(), interpolator,
                         moved_origin, new_spacing, img_range_adjusted.GetDirection(), 0,
                         img_range_adjusted.GetPixelID())
    

    
    sitk.WriteImage(img_resampled, output_path)

In [3]:
def resample_all(path_json):
    with open(path_json, 'r') as f:
        info = json.load(f)

    root_dir = os.path.join(os.getcwd(), "../", info["root_dir"])
    
    for key in ["training", "valid", "test"]:
        for fix_move in ["0", "1"]:
            spacing = info["voxelSpacing"][fix_move]
            size = info["tensorImageShape"][fix_move]
            range_min,range_max = info["voxelRange"][fix_move]
            filed_clip = info["filedClip"][fix_move]

            for image_label in info[key][fix_move]:
                image = os.path.join(root_dir, image_label["image"])
                label = os.path.join(root_dir, image_label["label"])

                name_image = os.path.basename(image)
                dir_name = os.path.dirname(image)
                if name_image.split('.')[-2:] == ['nii','gz']:
                    name_image_new = name_image.split('.')[0] + "_adjust.nii.gz"
                    output_image = os.path.join(dir_name , name_image_new)
                else:
                    raise ValueError()

                name_label = os.path.basename(label)
                dir_name = os.path.dirname(label)
                if name_label.split('.')[-2:] == ['nii','gz']:
                    name_label_new = name_label.split('.')[0] + "_adjust.nii.gz"
                    output_label = os.path.join(dir_name, name_label_new)
                else:
                    raise ValueError()            

                resample_volume(image, output_image, size, spacing, range_min, range_max, filed_clip=filed_clip)
                resample_volume(label, output_label, size, spacing, range_min, range_max, is_mask=True, filed_clip=filed_clip)

In [4]:
def update_json(origin_json_path, new_json_path):
    with open(origin_json_path, 'r') as f:
        info = json.load(f)

    root_dir = info["root_dir"]

    for key in ["training", "valid", "test"]:
        for fix_move in ["0", "1"]:
            for index,image_label in enumerate(info[key][fix_move]):
                image = image_label["image"]
                label = image_label["label"]

                name_image = os.path.basename(image)
                dir_name = os.path.dirname(image)
                if name_image.split('.')[-2:] == ['nii','gz']:
                    name_image_new = name_image.split('.')[0] + "_adjust.nii.gz"
                    output_image = os.path.join(dir_name , name_image_new)
                else:
                    raise ValueError()

                name_label = os.path.basename(label)
                dir_name = os.path.dirname(label)
                if name_label.split('.')[-2:] == ['nii','gz']:
                    name_label_new = name_label.split('.')[0] + "_adjust.nii.gz"
                    output_label = os.path.join(dir_name, name_label_new)
                else:
                    raise ValueError()
                
                info[key][fix_move][index]["image"] = output_image
                info[key][fix_move][index]["label"] = output_label

    with open(new_json_path, "w") as outfile:
        json.dump(info, outfile, indent=4)

                

In [5]:
resample_all("../dataset/chaos/chaos_dataset.json")
update_json("../dataset/chaos/chaos_dataset.json", "../dataset/chaos/chaos_dataset_adjusted.json")

In [11]:
resample_all("../dataset/amos/amos22_dataset_reorient.json")
update_json("../dataset/amos/amos22_dataset_reorient.json", "../dataset/amos/amos22_dataset_adjusted.json")