In [1]:
import nibabel as nib
import numpy as np
import cv2
from skimage.measure import label, regionprops
import os

def is_path_string(input_obj):
    """
    Check if the input is a valid file path string.
    """
    return isinstance(input_obj, str) and os.path.isfile(input_obj)

def retain_threshold_range(volume, lower_val, upper_val, kernel_size=(15, 15)):
    """
    Retain pixels within a specified threshold range in a 3D image and apply morphological opening to remove noise.

    Parameters:
    volume -- Input 3D image (numpy array)
    lower_val -- Lower bound of the threshold range
    upper_val -- Upper bound of the threshold range
    kernel_size -- Kernel size for the morphological operation, default is (5, 5)

    Returns:
    mask -- Processed 3D image mask
    """
    # Apply the threshold range
    mask = np.zeros_like(volume, dtype=np.uint8)
    mask[(volume >= lower_val) & (volume <= upper_val)] = 255
    
    # Create the kernel for morphological operations
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, kernel_size)
    
    # Apply morphological opening to remove noise
    for i in range(mask.shape[2]):
        mask[:, :, i] = cv2.morphologyEx(mask[:, :, i], cv2.MORPH_OPEN, kernel)
    
    return mask

def get_largest_connected_component(volume):
    """
    Extract the largest connected component from a 3D image mask and fill in internal holes.

    Parameters:
    volume -- Input 3D mask (numpy array)

    Returns:
    largest_component_mask -- Mask of the largest connected component in the 3D image
    """
    # Find all connected components
    labeled_volume, num_labels = label(volume, connectivity=3, return_num=True)

    # Find the largest connected component, excluding the background (label 0)
    largest_label = 1
    max_area = 0
    for region in regionprops(labeled_volume):
        if region.area > max_area:
            max_area = region.area
            largest_label = region.label

    # Create a mask that only retains the largest connected component
    largest_component_mask = np.zeros_like(volume, dtype=np.uint8)
    largest_component_mask[labeled_volume == largest_label] = 255

    # Fill in holes within the largest connected component
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (50, 50))
    for i in range(largest_component_mask.shape[2]):
        largest_component_mask[:, :, i] = cv2.morphologyEx(largest_component_mask[:, :, i], cv2.MORPH_CLOSE, kernel)

    return largest_component_mask

def apply_mask(volume, mask):
    """
    Apply a mask to the original image. Non-masked areas retain their original grayscale values, while masked areas are set to 0.

    Parameters:
    volume -- Original 3D image (numpy array)
    mask -- Mask 3D image (numpy array)

    Returns:
    masked_volume -- 3D image after applying the mask
    """
    masked_volume = np.where(mask == 255, volume, 0)
    return masked_volume

def modify_nifti_origin(fixed_image_path, moving_image_path, output_file=None):
    """
    Modify the origin of a NIfTI image.

    Parameters:
    moving_image_path (str): Path to the moving NIfTI file.
    fixed_image_path (str): Path to the fixed NIfTI file.
    output_file (str): Path to save the modified NIfTI file.
    """
    # Load the moving NIfTI image to obtain its affine matrix
    moving_img = nib.load(moving_image_path)
    moving_affine = moving_img.affine
    
    # Load the fixed NIfTI image
    fixed_img = nib.load(fixed_image_path)

    # Get image data and header
    fixed_data = fixed_img.get_fdata()
    fixed_header = fixed_img.header

    # Create a new NIfTI image using the affine matrix from the moving image
    new_img = nib.Nifti1Image(fixed_data, moving_affine, fixed_header)

    # If an output file is provided, save the new NIfTI image
    if output_file:
        nib.save(new_img, output_file)
        print(f"Modified NIfTI image saved as {output_file}")
    
    return new_img

# Remove black edges from the image
def remove_black_edges(nib_obj, output_file):
    # Load the NIfTI image
    img = nib_obj
    data = nib_obj.get_fdata()

    # Find the boundaries of the non-black (non-zero) region
    non_zero_coords = np.argwhere(data)
    min_coords = non_zero_coords.min(axis=0)
    max_coords = non_zero_coords.max(axis=0)

    # Extract the non-black region
    cropped_data = data[min_coords[0]:max_coords[0]+1,
                        min_coords[1]:max_coords[1]+1,
                        min_coords[2]:max_coords[2]+1]

    # Create a new NIfTI image
    return nib.Nifti1Image(cropped_data, img.affine)

# Modify the affine matrix of a NIfTI image to match a target affine matrix.
def modify_nifti_affine(input_file, target_affine, output_file=None):
    """
    Modify the affine matrix of a NIfTI image to match a target affine matrix.

    Parameters:
    input_file (str): Path to the input NIfTI file.
    output_file (str): Path to save the modified NIfTI file.
    target_affine (numpy.ndarray): Target affine matrix.
    """
    # Load the NIfTI image
    img = nib.load(input_file)

    # Get image data and header
    data = img.get_fdata()
    header = img.header

    # Create a new NIfTI image with the target affine matrix
    new_img = nib.Nifti1Image(data, target_affine, header)

    # Save the new NIfTI image
    print(f"New affine matrix: \n{target_affine}")
    
    # Print the new affine matrix
    print(f"Modified NIfTI image saved as {output_file}")
    return new_img

def modify_nifti_affine_ants(ants_obj, target_affine_path, output_image_path=None):
    """
    Modify the affine matrix of a NIfTI image to match a target affine matrix.
    Modify the affine matrix of the input image to match the affine matrix of the target image.

    Parameters:
    ants_obj (str or nibabel object): Input NIfTI file path or nibabel object.
    target_affine_path (str): Path to the target affine matrix.
    output_image_path (str): Path to save the modified image.
    """
    input_image = {}
    if is_path_string(ants_obj):
        input_image = nib.load(ants_obj)
    else:
        input_image = ants_obj
    
    # Load the target affine matrix
    target_affine_image = nib.load(target_affine_path)
    target_affine = target_affine_image.affine
    
    # Modify the affine matrix of the input image to match the target affine matrix
    input_affine = input_image.get_affine()
    input_image.set_affine(target_affine)
    
    # Save the modified image
    if not output_image_path:
        output_image_path
    print(f"Modified affine matrix saved to {output_image_path}")

    return input_image

def removeBone(nii_path, resultPath=None, remove_black_edges=False):
    nii_img = {}
    try:
        if is_path_string(nii_path):
            nii_img = nib.load(nii_path)
        else:
            nii_img = nii_path
        
        volume = nii_img.get_fdata()
        # Set the threshold range
        lower_val = -10
        upper_val = 70
        # Apply thresholding
        mask = retain_threshold_range(volume, lower_val, upper_val)
        # Get the largest connected component
        largest_component_mask = get_largest_connected_component(mask)
        # Apply the mask
        masked_volume = apply_mask(volume, largest_component_mask)
        # Save the result
        masked_img = nib.Nifti1Image(masked_volume, nii_img.affine)
        nib.save(masked_img, "temp.nii.gz")
        
        print(f"removeBone finished!\n")
        return masked_img
        
    except FileNotFoundError:
        print(f"File not found: {nii_path}")
    except nib.filebasedimages.ImageFileError:
        print(f"Unable to load NIfTI file: {nii_path}")
    except Exception as e:
        print(f"An error occurred during processing: {e}")


In [2]:
# removeBone(r"C:\Users\ruxua\Desktop\normal\L40.nii.gz")

In [3]:
import nibabel as nib
import numpy as np
import ants

def convert_nibabel_obj_to_ants(nib_image):
    """
    Convert a NiBabel image object to an ANTsPy image object.

    Parameters:
    nib_image (nibabel.Nifti1Image): NiBabel image object.

    Returns:
    ants.ANTsImage: Converted ANTsPy image object.
    """
    # Extract image data and affine matrix
    nib_data = nib_image.get_fdata()
    nib_affine = nib_image.affine
    
    # Create an ANTsPy image object
    ants_image = ants.from_numpy(nib_data, origin=nib_affine[:3, 3], spacing=nib_image.header.get_zooms())
    
    return ants_image

def load_nifti(file_path):
    """Load a NIfTI file and return the data and affine matrix."""
    img = nib.load(file_path)
    data = img.get_fdata()
    affine = img.affine
    return data, affine

def save_nifti(data, affine, file_path):
    """Save a NIfTI file."""
    img = nib.Nifti1Image(data, affine)
    nib.save(img, file_path)

def register_images(fixed_image_path, moving_image_path):
    """Perform image registration using ANTs."""
    # Perform pre-registration tasks
    # Change the position
    # Remove bones
    try:
        print("Starting bone removal")
        removeBone(moving_image_path)
        print("Bone removal completed")
        
        # Convert nib object to ants object
        moving_image = ants.image_read("temp.nii.gz")
        fixed_image = ants.image_read(fixed_image_path)
        
        # Check if images were successfully loaded
        if fixed_image is None or moving_image is None:
            print("Failed to read one or both images. Exiting...")
            exit(1)
        
        # Print image information
        print(f"fixed_image: {fixed_image}")
        print(f"moving_image: {moving_image}")
        
        print("Starting registration")
        transform = ants.registration(fixed=fixed_image, moving=moving_image, type_of_transform='SyN')
        
        print(f"Type of registration_result: {type(transform)}")
        print(f"Length of registration_result: {len(transform)}")
        
        if transform:
            print("The returned dictionary is not empty. Contents:")
            for key, value in transform.items():
                print(f"{key}: {value}")
        else:
            print("The returned dictionary is empty.")
        
        print("Registration completed")
        
        # Return the transformation result
        return transform
    except Exception as e:
        print(f"Error during registration: {e}")
        exit(1)

# Base CT image
fixed_image_path = r"C:\SynologyDrive\code\brain-seg\images\CTbase\20240804_1\L40nobone.nii.gz"
# Base CT functional area mask
functional_area_path = r"C:\SynologyDrive\code\brain-seg\images\CTbase\20240804_1\L40nobone7mask.nii.gz"

globalTF = {}


In [4]:
# # Image to be registered
# moving_image_path = r"C:\Users\Desktop\pei\restored_ct_0\CQ500-CT-4.nii.gz"
# # Lesion label for the image to be registered
# lesion_path = r"C:\Users\Desktop\pei\nn_registration_1\4.nii.gz"




In [5]:

# registered
# transform = register_images(fixed_image_path, moving_image_path)
# transform['warpedmovout'].to_file("temp.nii.gz")

In [6]:
# transform['warpedmovout'].to_file("temp.nii.gz")

In [7]:
import ants
import numpy as np
import pandas as pd

# Lesion label mapping table
lesion_label_map = {
    0: "background",
    1: "chuxue",
    2: "naoshi",
    3: "shuizhong",
    4: "zhuxue",
    5: "xia",
    6: "wai"
}

# Functional area label mapping table (can be expanded or adjusted as needed)
functional_area_label_map = {
    0: "Clear Label",
    200: "Deep_R",
    201: "Deep_L",
    202: "BrainStem_R",
    203: "Lobar_R",
    204: "Lobar_L",
    205: "BrainStem_L",
    206: "Cerebellum_R",
    207: "Cerebellum_L",
    208: "3rd4th ventricles_L",
    209: "Lateral ventricle_R",
    210: "Lateral ventricle_L",
    211: "3rd4th ventricles_R"
}

def register_images(fixed_image_path, moving_image_path):
    """Perform image registration using ANTs."""
    try:
        # Pre-registration preparation
        # Remove bones
        print("Starting bone removal")
        removeBone(moving_image_path)
        print("Bone removal completed")
        
        # Convert nib object to ants object
        moving_image = ants.image_read("temp.nii.gz")
        fixed_image = ants.image_read(fixed_image_path)
        
        # Check if images were successfully loaded
        if fixed_image is None or moving_image is None:
            print("Failed to read one or both images. Exiting...")
            exit(1)
        
        # Print image information
        print(f"fixed_image: {fixed_image}")
        print(f"moving_image: {moving_image}")
        print("Starting registration")
        
        transform = ants.registration(fixed=fixed_image, moving=moving_image, type_of_transform='SyN') 
        
        print(f"Type of registration_result: {type(transform)}")
        print(f"Length of registration_result: {len(transform)}")
        
        if transform:
            print("The returned dictionary is not empty. Contents:")
            for key, value in transform.items():
                print(f"{key}: {value}")
        else:
            print("The returned dictionary is empty.")
        
        print("Registration completed")   
        return transform 
    except Exception as e:
        print(f"Error during registration: {e}")
        exit(1)

def apply_transform(moving_image, reference_image, transform):
    """Apply transformation to the moving image."""
    transformed_image = ants.apply_transforms(fixed=reference_image, moving=moving_image, transformlist=transform['fwdtransforms'], interpolator='nearestNeighbor')
    return transformed_image

def map_lesion_to_functional_areas(lesion_path, functional_area_path, transform, output_path):
    """Map lesions to functional areas."""
    lesion_image = ants.image_read(lesion_path)
    functional_area_image = ants.image_read(functional_area_path)
    
    # Convert to rounded integer type
    original_lesion_data = np.round(lesion_image.numpy()).astype(int)
    original_unique_labels = np.unique(original_lesion_data)
    print(f"Unique labels before mapping: {len(original_unique_labels)}")
    print(f"Unique labels before mapping: {original_unique_labels}")

    transformed_lesion = apply_transform(lesion_image, functional_area_image, transform)
    lesion_data = np.round(transformed_lesion.numpy()).astype(int)
    
    functional_area_data = np.round(functional_area_image.numpy()).astype(int)

    # Print unique labels after mapping
    transformed_unique_labels = np.unique(lesion_data)
    print(f"Unique labels after mapping: {len(transformed_unique_labels)}")
    print(f"Unique labels after mapping: {transformed_unique_labels}")

    # Print unique labels of functional areas
    functional_area_labels = np.unique(functional_area_data)
    print(f"Unique labels in functional areas: {len(functional_area_labels)}")
    print(f"Unique labels in functional areas: {functional_area_labels}")

    results = []

    for lesion_label in original_unique_labels:
        if lesion_label == 0:
            continue
        
        # Calculate total area of the specific lesion label
        lesion_area = np.sum(lesion_data == lesion_label)
        if lesion_area == 0:
            continue
        
        # Initialize all functional area ratios to 0
        ratios = {f"func_{func_label}": 0 for func_label in functional_area_labels if func_label != 0}
        
        # Calculate ratios within functional areas
        for func_label in functional_area_labels:
            overlap = np.logical_and(lesion_data == lesion_label, functional_area_data == func_label)
            overlap_area = np.sum(overlap)
            
            ratio = overlap_area / lesion_area if lesion_area > 0 else 0
            ratios[f"func_{func_label}"] = ratio
        
        # Save results
        result = {
            "file_name": os.path.basename(lesion_path),
            "lesion": lesion_label,
            "total_area": lesion_area
        }
        result.update(ratios)
        results.append(result)
    
    if results:
        # Save results as DataFrame
        df = pd.DataFrame(results)
    else:
        # If results are empty, create a DataFrame with specific columns and one empty row
        columns = ["file_name", "lesion", "total_area"] + [f"func_{label}" for label in functional_area_labels if label != 0]
        df = pd.DataFrame(columns=columns)
        empty_row = {col: 0 for col in columns}
        empty_row["file_name"] = os.path.basename(lesion_path)
        df = pd.concat([df, pd.DataFrame([empty_row])], ignore_index=True)
        print("No valid lesion labels found, generated an empty DataFrame with specific columns.")
    
    # Map functional area labels
    df.rename(columns=functional_area_label_map, inplace=True)
    
    # Map lesion labels
    df['lesion'] = df['lesion'].map(lesion_label_map)
    
    # Save as Excel file
    output_excel_path = output_path.replace('.nii.gz', '.xlsx')
    df.to_excel(output_excel_path, index=False)
    print(f"Results saved as Excel file: {output_excel_path}")
    
    ants.image_write(transformed_lesion, output_path)
    print(f"Transformed lesion image saved as: {output_path}")

    return df

def print_label_ratios(df):
    """Print the ratios of lesions in each functional area."""
    for _, row in df.iterrows():
        lesion_name = row['lesion']
        print(f'Lesion label {lesion_name} ratio in each functional area:')
        for col in df.columns:
            if col not in ['lesion', 'total_overlap_area', 'total_area', 'file_name'] and not pd.isna(row[col]):
                print(f'  Functional area {col}: {row[col]:.4f}')

# Example call
# output_path = 'path_to_save_transformed_lesion.nii.gz'

# df = map_lesion_to_functional_areas(lesion_path, functional_area_path, transform, output_path)
# print_label_ratios(df)


# "Multithreading with Progress Bar"

In [None]:
import os
import ants
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def process_lesion(lesion_file, moving_image_folder, lesion_folder, functional_area_path, output_folder, fixed_image_path):
    try:
        lesion_path = os.path.join(lesion_folder, lesion_file)
        moving_image_file = "CQ500-CT-" + lesion_file
        moving_image_path = os.path.join(moving_image_folder, moving_image_file)

        if not os.path.isfile(moving_image_path):
            print(f"Moving image {moving_image_path} does not exist, skipping.")
            return None

        # Register images
        transform = register_images(fixed_image_path, moving_image_path)

        # Map lesion labels to functional areas
        output_path = os.path.join(output_folder, lesion_file.replace('.nii.gz', '_transformed.nii.gz'))
        df = map_lesion_to_functional_areas(lesion_path, functional_area_path, transform, output_path)
        print("Completed one")
        return df

    except Exception as e:
        print(f"Processing failed for {lesion_file}: {e}")
        return None

def process_all_lesions(moving_image_folder, lesion_folder, functional_area_path, output_folder, fixed_image_path):
    lesion_files = [f for f in os.listdir(lesion_folder) if f.endswith('.nii.gz')]
    os.makedirs(output_folder, exist_ok=True)

    all_results = []

    with ThreadPoolExecutor(max_workers=8) as executor:  # You can adjust the maximum number of threads as needed
        futures = {executor.submit(process_lesion, lesion_file, moving_image_folder, lesion_folder, functional_area_path, output_folder, fixed_image_path): lesion_file for lesion_file in lesion_files}

        for future in tqdm(as_completed(futures), total=len(futures), desc="Processing lesions"):
            result = future.result()
            if result is not None:
                all_results.append(result)

    if all_results:
        combined_df = pd.concat(all_results, ignore_index=True)
        excel_output_path = os.path.join(output_folder, 'combined_results.xlsx')
        combined_df.to_excel(excel_output_path, index=False)
        print(f"All results have been saved as an Excel file: {excel_output_path}")

# Example call
# Fixed CT image
fixed_image_path = r"C:\SynologyDrive\code\brain-seg\images\CTbase\20240804_0\L40nobone.nii.gz"
# Fixed CT segmentation
functional_area_path = r"C:\SynologyDrive\code\brain-seg\images\CTbase\20240804_0\L40nobone7mask.nii.gz"

moving_image_folder = r"C:\Users\Desktop\pei\restored_ct_0"
lesion_folder = r"C:\Users\Desktop\pei\nn_registration_0"
output_folder = r"C:\Users\Desktop\pei\processed_results"

process_all_lesions(moving_image_folder, lesion_folder, functional_area_path, output_folder, fixed_image_path)
