<a href="https://colab.research.google.com/github/mariprati/notebooks/blob/main/main_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install dicom2nifti
!pip install nibabel
!pip install totalsegmentator
!pip install shutil
!pip install pydicom
!pip install matplotlib
!pip install pandas

Collecting argparse (from unittest2->batchgenerators>=0.25->nnunetv2>=2.2.1->totalsegmentator)
  Using cached argparse-1.4.0-py2.py3-none-any.whl.metadata (2.8 kB)
Using cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse
Successfully installed argparse-1.4.0


[31mERROR: Could not find a version that satisfies the requirement shutil (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for shutil[0m[31m


In [2]:
import os
import dicom2nifti
import nibabel as nib
from totalsegmentator.python_api import totalsegmentator
import shutil
import pydicom
import numpy as np
import dicom2nifti
import matplotlib.pyplot as plt
import pandas as pd

In [3]:
# %% FUNCTIONS

def dicom_folder_to_nifti(dicom_folder, output_folder):
    """Converts a directory of DICOM files to a NIfTI volume and saves it using dicom2nifti."""
    folder_name = os.path.basename(os.path.normpath(dicom_folder))
    filename = f"{folder_name}_nifti_volume.nii"

    # Convert the DICOM series in the specified directory to NIfTI
    dicom2nifti.dicom_series_to_nifti(dicom_folder, os.path.join(output_folder, filename), reorient_nifti=True)

    print(f"Saved NIfTI volume at {os.path.join(output_folder, filename)}")



In [4]:
def get_nifti_info(nifti_path):
    """Extracts and prints the dimensions and metadata of a NIfTI file."""
    # Load the NIfTI image
    nifti_img = nib.load(nifti_path)

    # Get the data array dimensions
    dimensions = nifti_img.shape  # This gives the number of pixels along each axis

    # Get the voxel size
    voxel_sizes = nifti_img.header.get_zooms()  # Pixel size along each axis in millimeters

    # Print the information
    print(f"Dimensions (pixels along each axis): {dimensions}")
    print(f"Voxel sizes (mm per axis): {voxel_sizes}")
    print(f"Data type: {nifti_img.get_data_dtype()}")
    print(f"Affine transformation matrix:\n{nifti_img.affine}")

    return {
        "dimensions": dimensions,
        "voxel_sizes": voxel_sizes,
        "data_type": nifti_img.get_data_dtype(),
        "affine": nifti_img.affine
    }

In [5]:
def find_and_save_slices(dicom_folder, target_value):
    """
    Finds the DICOM slice with a specific InstanceNumber, extracts its
    ImagePatientPosition, and returns it as a vector (x, y, z).

    Parameters:
        dicom_folder (str): Path to the folder containing DICOM files.
        target_value (int): The target InstanceNumber to find.

    Returns:
        tuple: The ImagePatientPosition as a tuple (x, y, z).
    """
    # Load all DICOM files in the folder and sort by Instance Number
    dicom_files = sorted(
        [f for f in os.listdir(dicom_folder) if f.endswith(".dcm")],
        key=lambda x: pydicom.dcmread(os.path.join(dicom_folder, x)).InstanceNumber
    )

    target_index = None
    img_patient_position = None

    # Iterate over DICOM files to find the slice with InstanceNumber equal to target_value
    for i, filename in enumerate(dicom_files):
        filepath = os.path.join(dicom_folder, filename)
        dicom_data = pydicom.dcmread(filepath)

        # Check if the InstanceNumber matches the target value
        if dicom_data.InstanceNumber == target_value:
            # Extract the ImagePatientPosition of this slice
            img_patient_position = dicom_data.ImagePositionPatient  # DICOM tag (0020,0032)
            target_index = i
            break

    # If target slice is not found, exit
    if target_index is None:
        print(f"No slice found with InstanceNumber equal to {target_value}")
        return None

    # If the slice is found, return the ImagePatientPosition as a tuple
    x, y, z = img_patient_position
    print(f"Slice with InstanceNumber {target_value} found at index {target_index}.")
    print(f"ImagePatientPosition: x={x}, y={y}, z={z}")
    return x, y, z


In [6]:
def world_to_voxel_coordinates(header, x, y, z):
    """
    Convert world coordinates (x, y, z) to voxel indices (i, j, k) using the NIfTI header.

    Parameters:
    header: NIfTI header object (e.g., from nibabel)
    x, y, z: World coordinates (float)

    Returns:
    i, j, k: Voxel indices (int)
    """
    # Extract necessary fields from the header
    pixdim = header["pixdim"]
    qoffset_x = header["qoffset_x"]
    qoffset_y = header["qoffset_y"]
    qoffset_z = header["qoffset_z"]
    qfac = pixdim[0]  # Either 1 or -1

    # Define the fixed rotation matrix R
    R = np.array([
        [1, 0,  0],
        [0, -1, 0],
        [0, 0, -1]
    ])

    # Subtract offsets to account for translation
    world_vector = np.array([x, y, z]) - np.array([qoffset_x, qoffset_y, qoffset_z])

    # Invert the rotation matrix (which is simple since it's a diagonal matrix)
    R_inv = np.linalg.inv(R)

    # Apply the inverse rotation
    voxel_vector = R_inv @ world_vector

    # Divide by the scaling factors to get the voxel indices
    i = voxel_vector[0] / pixdim[1]
    j = voxel_vector[1] / pixdim[2]
    k = voxel_vector[2] / (qfac * pixdim[3])

    # Round to the nearest integer to get the voxel indices
    i = round(i)
    j = round(j)
    k = round(k)

    return i, j, k

In [7]:
def find_sct_slice_number(csv_file, pid, study_yr):
    """
    Finds the sct_slice_number for the given pid and study_yr from a CSV file.

    Args:
        csv_file (str): Path to the CSV file.
        pid (str): Patient ID.
        study_yr (str): Study year.

    Returns:
        int: The corresponding sct_slice_number.
    """
    # Load the CSV file into a DataFrame
    df = pd.read_csv(csv_file)

    # Filter the DataFrame for the specific pid and study_yr
    row = df[(df['pid'] == pid) & (df['study_yr'] == study_yr)]

    # Check if the row exists and retrieve sct_slice_number
    if not row.empty:
        print(f"Matching row from CSV:\n{row}\n")
        return int(row['sct_slice_num'].values[0])
    else:
        raise ValueError(f"No matching entry found for pid={pid} and study_yr={study_yr}")


In [8]:
def find_sct_epi_loc(csv_file, pid, study_yr):
    """
    Finds the sct_epi_loc for the given pid and study_yr from a CSV file.

    Args:
        csv_file (str): Path to the CSV file.
        pid (str): Patient ID.
        study_yr (str): Study year.

    Returns:
        int: The corresponding sct_epi_loc.
    """
    # Load the CSV file into a DataFrame
    df = pd.read_csv(csv_file)

    # Filter the DataFrame for the specific pid and study_yr
    row = df[(df['pid'] == pid) & (df['study_yr'] == study_yr)]

    # Check if the row exists and retrieve sct_epi_loc
    if not row.empty:
        sct_epi_loc = int(row['sct_epi_loc'].values[0])
        print(f"Matching row from CSV:\n{row}\n")
        print(f"sct_epi_loc value: {sct_epi_loc}")
        return sct_epi_loc
    else:
        raise ValueError(f"No matching entry found for pid={pid} and study_yr={study_yr}")

In [9]:
def load_lung_lobe_nifti(masked_output_dir, folder_name, sct_epi_loc):
    """
    Loads the NIfTI file corresponding to the lobe based on sct_epi_loc and returns the region name.

    Args:
        masked_output_dir (str): Directory containing the masked NIfTI files.
        folder_name (str): Name of the folder containing the DICOM data.
        sct_epi_loc (int): Value determining which lobe to load.

    Returns:
        tuple: A tuple containing:
            - nib.Nifti1Image: The loaded NIfTI file.
            - str: The name of the region (e.g., 'lung_upper_lobe_right').
    """
    nifti_file = None
    region = None

    # Load the appropriate file based on sct_epi_loc
    if sct_epi_loc == 1:
        region = 'lung_upper_lobe_right'
        nifti_file = nib.load(os.path.join(masked_output_dir, f'{region}_{folder_name}.nii'))
        print("Loaded Right Upper Lobe")
    elif sct_epi_loc == 2:
        region = 'lung_middle_lobe_right'
        nifti_file = nib.load(os.path.join(masked_output_dir, f'{region}_{folder_name}.nii'))
        print("Loaded Right Middle Lobe")
    elif sct_epi_loc == 3:
        region = 'lung_lower_lobe_right'
        nifti_file = nib.load(os.path.join(masked_output_dir, f'{region}_{folder_name}.nii'))
        print("Loaded Right Lower Lobe")
    elif sct_epi_loc == 4:
        region = 'lung_upper_lobe_left'
        nifti_file = nib.load(os.path.join(masked_output_dir, f'{region}_{folder_name}.nii'))
        print("Loaded Left Upper Lobe")
    elif sct_epi_loc in [5, 6]:  # Lingula (5) treated as part of Left Lower Lobe (6)
        region = 'lung_lower_lobe_left'
        nifti_file = nib.load(os.path.join(masked_output_dir, f'{region}_{folder_name}.nii'))
        print("Loaded Left Lower Lobe (including Lingula)")
    else:
        raise ValueError(f"Invalid sct_epi_loc value: {sct_epi_loc}")

    return nifti_file, region

In [10]:
# %% MAIN


def main():
    # %% Set environment variables at the start of the script
    os.environ['LC_ALL'] = 'C.UTF-8'
    os.environ['LANG'] = 'C.UTF-8'
    os.environ['LANGUAGE'] = 'C.UTF-8'


    # --------------------------------------------------------------------------------------------
    # %% SEGMENTAZIONE
    # --------------------------------------------------------------------------------------------
    dicom_folder = '/Users/mariaprati/Desktop/100621_t2' # UNICA DA LASCIARE
    output_folder = '/Users/mariaprati/Desktop/risultati'

    # Extract the last part of the dicom_folder path (the folder name)
    folder_name = os.path.basename(os.path.normpath(dicom_folder))

    # Create a new output folder path inside Segmentazione2
    output_folder = os.path.join(output_folder, folder_name)

    # Ensure the new output folder exists
    os.makedirs(output_folder, exist_ok=True)
    dicom_folder_to_nifti(dicom_folder, output_folder)

    # Define paths for the segmentation
    input_image_path = os.path.join(output_folder, f"{folder_name}_nifti_volume.nii")
    output_dir = "/Users/mariaprati/Desktop/risulatati/Lung_segmentations"

    # Now, create the 'segmentazione_vcs' folder inside the newly updated output folder
    output_dir = os.path.join(output_folder, 'Lung_segmentations')

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Define the lung lobe names
    lobe_names = [
        'lung_upper_lobe_left',
        'lung_lower_lobe_left',
        'lung_upper_lobe_right',
        'lung_middle_lobe_right',
        'lung_lower_lobe_right'
    ]
    labels = {
        'lung_upper_lobe_left': 1,
        'lung_lower_lobe_left': 2,
        'lung_upper_lobe_right': 3,
        'lung_middle_lobe_right': 4,
        'lung_lower_lobe_right': 5
    }

    # Segment the lung lobes using TotalSegmentator
    totalsegmentator(
        input=input_image_path,
        output=output_dir,
        fast=True,
        device="cpu",
        task="total",
        roi_subset=lobe_names
    )

    # Load the original CT image
    ct_img = nib.load(input_image_path)
    ct_data = ct_img.get_fdata()

    # Output directory for masked images
    masked_output_dir = os.path.join(output_dir, 'Lung_segmented_masks')
    os.makedirs(masked_output_dir, exist_ok=True)

    # Create and save masked images for each region
    for region, label in labels.items():
        seg_path = os.path.join(output_dir, f'{region}.nii.gz')
        if os.path.exists(seg_path):
            # Load the specific segmentation file for the current region
            seg_img = nib.load(seg_path)
            seg_data = seg_img.get_fdata()

            # Create a mask for the current region
            mask = seg_data == 1  # Assuming the segmentation file has binary mask (1 for region, 0 for background)

            # Apply the mask to the original CT data, setting background to -1000
            masked_ct_data = np.where(mask, ct_data, -1000)

            # Create a new NIfTI image for the masked data
            masked_ct_img = nib.Nifti1Image(masked_ct_data, ct_img.affine, ct_img.header)

            # Save the masked image
            output_path = os.path.join(masked_output_dir, f'{region}_{folder_name}.nii')
            nib.save(masked_ct_img, output_path)
        else:
            print(f"Segmentation file for {region}_{folder_name} not found at {seg_path}")
    # Print the dimensions of the segmented NIfTI files
    for region in labels.keys():
        seg_path = os.path.join(masked_output_dir, f'{region}_{folder_name}.nii')
        if os.path.exists(seg_path):
            print(f"Dimensions for {region}_{folder_name}:")
            get_nifti_info(seg_path)
        else:
            print(f"Segmentation file for {region}_{folder_name} not found at {seg_path}")
    print("Masked NIfTI files have been saved.")

    # --------------------------------------------------------------------------------------------
    # %% SUBVOLUME
    # --------------------------------------------------------------------------------------------

    csv_file = "/Users/mariaprati/Desktop/TESI/NLST/new_csv/unique_filter_51_total.csv"  # Path to your CSV file
    pid = 100621
    study_yr = 2
    sct_epi_loc = find_sct_epi_loc(csv_file, pid, study_yr)
    nifti_file, region = load_lung_lobe_nifti(masked_output_dir, folder_name, sct_epi_loc)

    print(f"Loaded NIfTI file: {nifti_file}")
    print(f"Corresponding region: {region}")

    header = nifti_file.header
    affine = nifti_file.affine
    data = nifti_file.get_fdata()
    pixdim = header["pixdim"]
    dimen = header["dim"]
    sct_slice_number = find_sct_slice_number(csv_file, pid, study_yr)
    result = find_and_save_slices(dicom_folder, sct_slice_number)

    if result:
        x, y, z = result
        print(f"Target slice ImagePatientPosition: x={x}, y={y}, z={z}")
    else:
        print("No slice found.")
    print(x,y,z)

    i, j, k = world_to_voxel_coordinates(header, x, y, z)

    # Extract the slice from the data (for example, slice at 'k' in the z-direction)
    slice_at_z = data[:, :, k]

    # Plot the slice
    # plt.imshow(slice_at_z.T, cmap="gray")  # Transpose for correct orientation
    # plt.title(f"Slice at Z = {z}")
    # plt.show()

    # Define the window size around the slice
    window_size = 1  # Window of ±1 slices
    k_min = k - window_size
    k_max = k + window_size + 1

    # Extract the sub-volume
    sub_volume = data[:, :, k_min:k_max]

    # Adjust the affine matrix for the sub-volume
    new_affine = affine.copy()
    new_affine[:3, 3] += k_min * pixdim[3]  # Adjust the Z offset in the affine matrix

    # Create a new NIfTI object for the sub-volume
    new_nifti = nib.Nifti1Image(sub_volume, affine=new_affine, header=header)

    # Full path to the output file
    output_path = os.path.join(masked_output_dir, f"lung_subvolume_{folder_name}_{region}.nii")


    # Save the new NIfTI file
    #output_path = "/Users/mariaprati/Desktop/TESI/Segmentazione2/100176_t1/Lung_segmentations/Lung_segmented_masks/lung_subvolume.nii" #QUI POI METTERE NOME
    nib.save(new_nifti, output_path)
    print(f"Sottovolume salvato in: {output_path}")

    # Load the saved sub-volume NIfTI file
    subvolume_nifti = nib.load(output_path)
    subvolume_data = subvolume_nifti.get_fdata()

    # Print the dimensions of the sub-volume
    print("Dimensions of the lung sub-volume:", subvolume_data.shape)




In [11]:


if __name__ == '__main__':
    main()



FileNotFoundError: [Errno 2] No such file or directory: '/Users/mariaprati/Desktop/100621_t2'