### Interpolation - Masks - Super Sampling (Trilineal interpolation)

Este código tiene como objetivo ser capaz de calcular las métricas DVH para un conjunto de estructuras en un intervalo de tiempo pequeño. Los pasos a seguir son:

1. Se extraen las coordenadas y tiempos del RT Plan total. También se extrae la dose grid final que se quiere calcular y la información del mismo.
2. Se extrae las coordenadas de la dwell position de referencia y su dose grid también. 
3. Se calculan las matrices de rotación de cada dwell position.
4. Se crea la clase "estructura" usando el módulo *RTStructBuilder* extaído de la librería RT-UTILS. Se define mediante el archivo DICOM del RT-STRUCTURE y con los CT/MR del caso a estudiar.
5. Se extraen las máscaras de las estructuras existentes en el RT-STRUCTURE. Se rellenan las slices vacías mediante interpolación lineal.
6. Se definen el spacing y la orientación de las imágenes CT/RM para convertir los índices de los voxels de cada máscara interpolada en coordenadas en el sistema paciente.
7. Se extrae la información de la dose grid final extraída el Oncentra Brachy
8. Se define el interpolador lineal que se usará sobre todas las dose grids que se quieren interpolar y las coordenadas centradas.
9. Se llama a la función que calcula la dose grid interpolada para una dwell position en concreto y se crea un interpolador para esa dose grid en concreto.
10. Se les aplica a las coordenadas de los voxels dentro de las estructuras (convertidas en el apartado 6) el interpolador para la dose grid interpolada y se obtienen las dosis en cada una de las coordenadas.
11. Se guardan las dosis para cada set de coordenadas en un archivo con formato .npz (para facilitar la posterior lectura y que sea lo más eficiente posible).
12. Se lee cada uno de los archivos .npz definidos para cada dwell position y se suman, multiplicando cada array por el tiempo asociado.
13. Se ordenan de mayor a menor y se calculan las métricas deseadas (Dmean, D98, D95, D90 y D2cc).

En el slicing que se hace en la función "obtener_coordenadas_slicing_interpolador", se escoge un radio de 100 mm ya que la mayoría de dwell positions tienen como distancia máxima a su target más lejano de 100 mm aproximadamente. En caso de tener un caso clínico con distancias mayores, se aconseja modificar el radio para que concuerde con el resultado deseado.

---------------

1. Se importan las librerías que hacen falta para el código. En caso de necesitar instalar librerías que no se tengan, se utiliza el comando puesto en cada línea en comentarios:

In [1]:
# Librería Numpy para manejo de arrays
import numpy as np

# Librería de Scipy
from scipy.spatial.transform import Rotation as R # para rotaciones espaciales
from scipy.interpolate import RegularGridInterpolator # para interpolaciones en una grid regular

# Librería para leer archivos DICOM y manejar DVHs
import pydicom as dicom # !pip install pydicom
from rt_utils import RTStructBuilder # !pip install rt-utils

# Librería para referenciar archivos en el sistema operativo
import os

# Librería para generar identificadores únicos universales (necesario referenciar documentos DICOM)
import uuid

# Librería para manejo de copias de objetos
import copy

# Librería que maneja la fecha y hora
from datetime import datetime

# Librería para rellenar los contornos 
import cv2 # !pip install opencv-python

------------------------

In [30]:
def extraer_coordenadas_tiempos(plan, coords_canales, tiempo_final):
    """
    Esta función extra las coordenadas y los tiempos correspondientes a cada dwell position
    de cada canal para el caso de un plan general.

    Inputs:
    - plan: Dataset del RT Plan general del tratamiento

    Outputs:
    - coords_canales: Diccionario para almacenar las coordenadas de cada canal
    - tiempo_final: Diccionario para almacenar los tiempo finales de cada dwell position
    """

    # Variables auxiliares
    lista_coordenadas = []  # Lista para almacenar todas las coordenadas
    dic_ctw = {}  # Diccionario para almacenar los tiempos asociados a cada coordenada (Cumulative Time Weight)
    inicio_slicing = 0  # Índice para el inicio del slicing de las coordenadas

    # Se define el dataset de los canales del plan (suponiendo que solo hay un Application Setup, que es lo más común)
    canales = plan.ApplicationSetupSequence[0].ChannelSequence

    # Se recorre cada canal para extraer sus coordenadas y tiempos
    for i in range(len(canales)):

        # Se comprueba que el canal tiene la información necesaria
        if hasattr (canales[i],'ChannelNumber') and hasattr (canales[i],'ChannelTotalTime') and hasattr (canales[i],'FinalCumulativeTimeWeight'):

            # Se guarda el canal actual, su tiempo final acumulado y su tiempo total
            channel_number = canales[i].ChannelNumber
            tiempo_acumulado = canales[i].FinalCumulativeTimeWeight
            tiempo_total = canales[i].ChannelTotalTime

            # Se define el dataset de las dwell positions y sus tiempos del canal actual
            dataset_dwell = canales[i].BrachyControlPointSequence

            # Se recorre solo la mitad de puntos, ya que estos están siempre duplicados (por eso se hace //2 y no tiene riesgo de dar error ya que el número de dwell positions siempre es par)
            for j in range(len(dataset_dwell)//2):
                
                # Se extraen las coordenadas y los tiempos (siempre se extraen los impares, ya que los pares son idénticas)
                coordenadas = dataset_dwell[2 * j + 1].ControlPoint3DPosition
                tiempos = dataset_dwell[2 * j + 1].CumulativeTimeWeight

                # Si el tiempo es diferente de None o las coordenadas de 0.0, se guarda la información
                if  tiempos != tiempos != 'None' or coordenadas != '0.0':
                    coordenadas = np.round(np.array(coordenadas),5)
                    lista_coordenadas.append(coordenadas)
                    dic_ctw[tuple(coordenadas)] = float(tiempos)
                
                else:
                    continue

            # Despues de recorrer todos los puntos del canal, se indexan las coordenadas en el diccionario final con llave el número de canal
            # Se hace el slicing de la lista de coordenadas desde el índice de inicio hasta el final del diccionario de tiempos acumulados
            coords_canales[channel_number] = lista_coordenadas[inicio_slicing:len(dic_ctw)]
            inicio_slicing = len(dic_ctw)

            # Se calculan los tiempos finales asociados a cada dwell position del canal actual y se guardan en el diccionario de tiempos finales
            coord_actual = coords_canales[channel_number]

            if tiempo_acumulado == 0:
                for n in range(len(coord_actual)):
                    tiempo_final[tuple(coord_actual[n])] = 0
                continue

            # Si la lista contiene alguna coordenada, se calcula su tiempo asociado
            if len(coord_actual) > 0: 
                # Se recorren las coordenadas actuales para calcular sus tiempos finales
                for n in range(len(coord_actual)):

                    # Para la coordenada n-ésima, su tiempo final es la diferencia entre el tiempo actual y el anterior, multiplicado por el factor de conversión (tiempo_total/tiempo_acumulado)
                    if n > 0:
                        tiempo_final[tuple(coord_actual[n])] = (dic_ctw[tuple(coord_actual[n])] - dic_ctw[tuple(coord_actual[n-1])]) * (tiempo_total/tiempo_acumulado)
            
                    # Para la primera coordenada, su tiempo final es simplemente el tiempo actual multiplicado por el factor de conversión (tiempo_total/tiempo_acumulado)
                    else: 
                        tiempo_final[tuple(coord_actual[n])] = dic_ctw[tuple(coord_actual[n])] * (tiempo_total/tiempo_acumulado)
            else:
                continue

In [31]:
def obtener_vectores_directores(canales, vectores_directores):
    """
    Función que se encarga de obtener los vectores directores tangentes a cada uno de los dwell position.
    
    Inputs:
    - canales: Diccionario donde se almacenan las coordenadas de los puntos de cada canal

    Outputs:
    - vectores_directores: Diccionario donde se almacenan los vectores directores con su correspondiente llave (que son las coordenadas)
    """
    
    # Se recorre cada canal
    for i in canales.keys():
        
        # Si solo hay una posición de permanencia
        if len(canales[i]) < 2:
            vectores_directores[tuple(canales[i][0])] = canales[i][0] / np.linalg.norm(canales[i][0])

        # Si hay más de una posición de permanencia
        else:
            # Se recorre cada dwell position del canal actual
            for j in range(len(canales[i])):

                # Si es el primer elemento, se calcula como la diferencia del siguiente (j+1) con el actual (j)
                if j == 0:
                    vectores_directores[tuple(canales[i][j])] = canales[i][j+1] - canales[i][j]

                # Si es un elemento entre medio, se calcula como la diferencia del siguiente (j+1) con el anterior (j-1) respecto a (j)
                if (j != 0) and (j != (len(canales[i])-1)):
                    vectores_directores[tuple(canales[i][j])] = canales[i][j+1] - canales[i][j-1]

                # Si es el último elemento, se calcula como la diferencia entre el actual (j) con el anterior (j-1)
                if j == (len(canales[i])-1):
                    vectores_directores[tuple(canales[i][j])] = canales[i][j] - canales[i][j-1]

                # Se normalizan los vectores directores (se le agrega el menos para que apunten en el sentido hacia la primera posición de permanencia)
                vectores_directores[tuple(canales[i][j])] = -(vectores_directores[tuple(canales[i][j])]) / np.linalg.norm(vectores_directores[tuple(canales[i][j])])

In [32]:
def obtener_rotores(vectores_directores, rotores_finales):
    """
    Función que extrae los rotores para cada elemento en función de las coordenadas de referencia escogidas.
    Estos rotores sirven para saber cuanto deben rotar las coordenadas centradas para que sigan la anisotropía
    del dwell point que se quiere obtener.
    
    Input:
    - coords_ref: Coordenadas de la dwell point de referencia
    - vectores_directores: Diccionario donde se almacenan los vectores directores de cada dwell position

    Output:
    - rotores: Diccionario donde se almacenan los rotores de cada dwell position
    """

    # En todo este código se utiliza como vector de referencia el de la fuente de referencia (que siempre apunta en z)
    vector_director_ref = np.array([0,0,1])

    # Se recorre cada uno de los vectores directores para calcular su rotor asociado
    for elemento in vectores_directores:
    
        # Se calcula el vecetor perpendicular a los dos vectores tangentes 
        # (el de la posición de referencia y el vector seleccionado)    
        rot_vector = np.cross(vectores_directores[elemento],vector_director_ref)

        # Si el vector de rotación es distinto de cero, se calcula el rotor
        if np.linalg.norm(rot_vector) > 1e-8:

            # Se normaliza el vector perpendicular a la dirección de rotación
            rot_vector = rot_vector/np.linalg.norm(rot_vector)

            # Se calcula el ángulo que se tiene que rotar (se usa la función clip para evitar errores numéricos que den valores fuera del dominio de arccos)
            angle = np.arccos(np.clip(np.dot(vectores_directores[elemento], vector_director_ref), -1.0, 1.0))

            # Se aplica el elemento R.from_vector con el ángulo vector que se quiere rotar (el vector unitario de rotación por el ángulo)
            rotation_matrix = R.from_rotvec((angle)*rot_vector)

        else:
            # Si no hace falta rotación, se construye el vector de rotación nulo
            rotation_matrix = R.from_rotvec([0,0,0])
    
        # Se guarda el rotor en el diccionario con su correspondiente key (las coordenadas)
        rotores_finales[elemento] = rotation_matrix

In [33]:
def obtener_mascaras(estructuras):
    """"
    Función que extrae las máscaras 3D de cada una de las estructuras por separado.
    
    Input:
    - Estructuras: clase definida mediante la librería rt-utils

    Output:
    - masks_dict: Diccionario con las máscaras
    """

    # Obtener todos los nombres de las ROIs
    roi_names = estructuras.get_roi_names()

    # Diccionario para guardar las máscaras
    masks_dict = {}

    # Extraer la máscara de cada ROI
    for roi_name in roi_names:
            mask = estructuras.get_roi_mask_by_name(roi_name)
            if np.any(mask):  # Solo guardar si tiene contenido
                masks_dict[roi_name] = mask

    return masks_dict

In [34]:
def interpolar_contornos_faltantes(mask_3d):
    """
    Función que interpola los contornos faltantes en una máscara 3D. 
    Rellena slices vacíos entre slices con contornos mediante interpolación de formas (distance transform).
    
    Input:
    - mask_3d : Máscara binaria 3D.
                                 
    Output:
    - filled_mask: Máscara 3D con los slices intermedios interpolados.
    """
    from scipy.ndimage import distance_transform_edt

    # Detectar cuál es el eje de los slices (asumimos que es el que tiene menos dimensiones o el primero/último)
    # En DICOM y numpy suele ser (Z, Y, X) -> eje 0, o (Y, X, Z) -> eje 2
    # Aquí asumiremos que si la máscara viene de RTStructBuilder suele ser (Y, X, Z)
    # Pero para trabajar mejor, vamos a asegurarnos de trabajar en (Z, Y, X)
    
    # El eje Z suele ser el más corto en MRI/CT comparado con 512x512
    # O si viene de rt-utils, suele ser (Row, Col, Slice) -> (Y, X, Z)
    # Vamos a asumir formato (Y, X, Z) y lo transponemos a (Z, Y, X) para iterar fácil
    if mask_3d.shape[2] < mask_3d.shape[0] and mask_3d.shape[2] < mask_3d.shape[1]:

        # Probablemente (Y, X, Z)
        working_mask = mask_3d.transpose(2, 0, 1).copy() # (Z, Y, X)
        transposed = True
    else:

        # Probablemente ya es (Z, Y, X)
        working_mask = mask_3d.copy()
        transposed = False
        
    # Se definen cuántos slices se tienen y se hace una copia de la máscara con las dimensiones correctas
    num_slices = working_mask.shape[0]
    filled_mask = working_mask.copy()
    
    # Identificar qué slices tienen contornos
    has_contour = np.array([np.any(working_mask[z, :, :]) for z in range(num_slices)])
    
    # Encontrar índices de slices con contorno
    contour_indices = np.where(has_contour)[0]
    
    if len(contour_indices) < 2:
        # No hay suficientes slices con contorno para interpolar
        return mask_3d
    
    # Iterar sobre los huecos
    for i in range(len(contour_indices) - 1):
        z_start = contour_indices[i]
        z_end = contour_indices[i+1]
        
        # Si hay hueco entre z_start y z_end
        if z_end - z_start > 1:
            
            # Obtener las máscaras de inicio y fin
            mask_start = working_mask[z_start, :, :].astype(float)
            mask_end = working_mask[z_end, :, :].astype(float)
            
            # Calcular Signed Distance Transform (SDT)
            # Negativo dentro del contorno, positivo fuera
            # distance_transform_edt calcula distancia al fondo (0)
            
            # Para mask_start
            dist_out_start = distance_transform_edt(1 - mask_start)
            dist_in_start = distance_transform_edt(mask_start)
            sdt_start = dist_out_start - dist_in_start
            
            # Para mask_end
            dist_out_end = distance_transform_edt(1 - mask_end)
            dist_in_end = distance_transform_edt(mask_end)
            sdt_end = dist_out_end - dist_in_end
            
            # Interpolar para cada slice intermedio
            for z_curr in range(z_start + 1, z_end):
                
                # Factor de ponderación lineal (0.0 a 1.0)
                alpha = (z_curr - z_start) / (z_end - z_start)
                
                # Interpolación lineal de los mapas de distancia
                sdt_interp = (1 - alpha) * sdt_start + alpha * sdt_end
                
                # Generar nueva máscara: donde SDT < 0 es interior
                # Usamos un umbral de 0 para definir el borde
                new_mask = sdt_interp < 0
                
                filled_mask[z_curr, :, :] = new_mask
                
    # Devolver al formato original si fue transpuesto
    if transposed:
        return filled_mask.transpose(1, 2, 0)
    else:
        return filled_mask

In [35]:
def conversion_mascaras2coords(masks_dict_interpolated, origin, pixel_spacing, orientation):
    """Función que convierte los índices de los voxels dentro de las estructuras,
       definidas para cada máscara, en coordenadas reales
       
        Input:
        - masks_dict_interpolated:
        - origin: Origen de coordenadas
        - pixel_spacing: Espacido para las tres coordenadas (Y, X, Z)
        - orientation: Vectores directores de las imágenes RM/CT

        Output:
        - roi_coordinates: Coordenadas de los voxels que quedan dentro de las máscaras
       """
    
    # Diccionario para guardar las coordenadas reales de los puntos dentro de cada estructura
    roi_coordinates = {}

    for roi_name, mask in masks_dict_interpolated.items():
        # Obtener índices de los vóxeles que son parte de la máscara (valor > 0)
        # np.argwhere devuelve una matriz (N, 3) con índices [fila, columna, slice]
        indices = np.argwhere(mask)
        
        if len(indices) > 0:
            # Separar índices
            rows = indices[:, 0] # Índice Y (filas de la imagen)
            cols = indices[:, 1] # Índice X (columnas de la imagen)
            slcs = indices[:, 2] # Índice Z (slices)
            
            # Calcular coordenadas reales (mm)
            # Fórmula: P = Origen + (fila * espaciadoY * vec_col) + (col * espaciadoX * vec_fila) + (slice * espaciadoZ * vec_slice)
            # Nota: pixel_spacing suele ser [RowSpacing (Y), ColSpacing (X)]
            
            # Vectorización para eficiencia (evita bucles lentos)
            points = origin + \
                     (rows[:, None] * pixel_spacing[0] * orientation[0]) + \
                     (cols[:, None] * pixel_spacing[1] * orientation[1]) + \
                     (slcs[:, None] * pixel_spacing[2] * orientation[2])
            
            roi_coordinates[roi_name] = points

    return roi_coordinates

In [36]:
def obtener_coordenadas_slicing_interpolador(ds_ref, radio_slice):
    """
    Función que obtiene las coordenadas centradas (con el slicing aplicado) y crea el interpolador
    asociado a la dose grid recortada.

    Inputs:
    - ds_ref: Dataset del RT Dose de referencia
    - radio_slice: Radio en mm para hacer el slicing de la dose grid

    Outputs:
    - coords_slice: Coordenadas centradas con el slicing aplicado
    - interpolador: Interpolador RegularGridInterpolator asociado a la dose grid recortada
    """

    # Se extrae la dose grid de referencia
    dose_ref = ds_ref.pixel_array.astype(np.float32) * ds_ref.DoseGridScaling

    # Se obtienen las dimensiones de la dose grid
    nz, ny, nx = dose_ref.shape # número de slices, rows, columns

    # Se extrae la geometría del dose grid que se quiere calcular
    IPP_ref = np.array(ds_ref.ImagePositionPatient, dtype=float) # ImagenPositionPatient: Donde empieza la dose grid en mm
    spacing_ref = np.array(ds_ref.PixelSpacing, dtype=float) # PixelSpacing: Espaciado entre píxeles en mm
    z_offsets_ref = np.array(ds_ref.GridFrameOffsetVector, dtype = float) # Número de offsets (slices) para la coordenada z

    # Se calcula el IOP_ref
    #IOP_ref = np.array(ds_ref.ImageOrientationPatient, dtype=float) # ImageOrientationPatient: Orientación de la dose grid

    # Se calcula el vector director del eje Z
    #IOP_ref_z = np.cross(IOP_ref[0:3], IOP_ref[3:6])

    # Se crean los arrays de coordenadas para cada una de las dimensiones
    x = IPP_ref[0] + np.arange(nx) * spacing_ref[0] # Coordenadas en mm centradas en el origen
    y = IPP_ref[1] + np.arange(ny) * spacing_ref[1]  # Coordenadas en mm centradas en el origen
    z = IPP_ref[2] + np.arange(nz) * z_offsets_ref[1]  # Coordenadas en mm centradas en el origen

    # Se crean las coordenadas de cada uno de los voxels, haciéndolo en capas horizontales
    # El orden debe ser el expuesto aquí, ya que 'np.meshgrid' coge el último elemento 
    # en este caso (x) e itera sobre todos sus valores, después lo hace con (y) y finalmente con (z)
    Z, Y, X = np.meshgrid(z, y, x, indexing='ij')

    # Se usa ravel para aplanar los arrays y stack para juntarlos en un solo array de shape (N, 3). Axis = -1 lo pone en la última dimensión
    coords_final = np.stack([X.ravel(), Y.ravel(), Z.ravel()], axis=-1)

    # Índices para Z
    z0 = nz//2 - radio_slice
    z1 = nz//2 + radio_slice

    # Índices para Y
    y0 = ny//2 - radio_slice
    y1 = ny//2 + radio_slice

    # Índices para X
    x0 = nx//2 - radio_slice
    x1 = nx//2 + radio_slice

    # Slicing sobre la dose grid de referencia centrada
    dose_grid_radial = dose_ref[z0:z1+1, y0:y1+1, x0:x1+1]

    # Se crean los arrays de coordenadas únicos para cada una de las dimensiones
    x_rel_ref = np.unique(coords_final[:,0])
    y_rel_ref = np.unique(coords_final[:,1])
    z_rel_ref = np.unique(coords_final[:,2])

    # Se definen las coordenadas del slicing de la dose grid
    x_rel_ref_radial = x_rel_ref[x0:x1+1]
    y_rel_ref_radial = y_rel_ref[y0:y1+1]
    z_rel_ref_radial = z_rel_ref[z0:z1+1]

    del Z, Y, X
    del coords_final

    # Se define la precisión de los decimales a mostrar
    np.set_printoptions(suppress=True)

    # Se crea el interpolador TRILINEAL (lineal en 3D) usando las coordenadas centradas y la dose grid centrada
    # Se debe dar en ese orden las coordenadas de los ejes (z,y,x) para que concuerden con las dimensiones de la dose grid (nz,ny,nx)
    # Se debe usar 'bounds_error=False' y 'fill_value=0' para que no de error al pedir puntos fuera del rango
    interpolador = RegularGridInterpolator((z_rel_ref_radial, y_rel_ref_radial, x_rel_ref_radial), dose_grid_radial, method='linear', bounds_error=False, fill_value=0)

    # Matriz con todas las coordenadas 
    Z_radial, Y_radial, X_radial = np.meshgrid(z_rel_ref_radial, y_rel_ref_radial, x_rel_ref_radial, indexing='ij')
    
    # Stack de las coordenadas para que tengan la estructura correcta 
    coords_slice = np.stack([X_radial.ravel(), Y_radial.ravel(), Z_radial.ravel()], axis=-1)

    return coords_slice, interpolador

In [37]:
def obtener_dose_grid(rotor_prueba, coords_prueba, coords_slice, interpolador, dim_final, IPP_final, spacing_final, z_offset, max_dose_final, tiempo_total_ref, radio_slice):
    """
    Función que obtiene la dose grid interpolada final para dwell position.

    Inputs:
    - rotor_prueba: Elemento de scipy que indica la rotación necesaria para las coordenadas de prueba
    - coords_prueba: Coordenadas de la posición de permanencia que se quiere estudiar
    - coords_slice: Coordenadas reales de la dose grid que se quiere interpolar
    - interpolador: Objeto de interpolación creado a partir de la dose grid de referencia
    - dim_final: Dimensiones de la dose grid final (nz, ny, nx)
    - IPP_final: Coordenadas de la posición de la imagen del paciente desde donde empieza la dose grid final
    - spacing_final: Espaciado entre voxels de la dose grid final
    - z_offset: Distancia existente entre slices
    - max_dose_final: Dosis máxima en la dose grid final del RT-Dose
    - tiempo_total_ref: Tiempo total de la dose del RT-Plan de referencia
    - radio_slice: Distancia escogida para hacer el slicing sobre la dose grid de referencia

    Outputs:
    - dose_grid_interpolada: Dose grid interpolada en la posición de coords_prueba y con la orientación definida por rotor_prueba
    """

    # Se declaran la variable global del kerma_factor
    global kerma_factor

    # Se extraen las dimensiones de la dose grid final
    nz_final, ny_final, nx_final = dim_final

    # ---------------------------------- APLICACION DEL ROTOR A LAS COORDENADAS ----------------------------------

    # Se aplican las rotaciones a las coordenadas centradas para tener en cuenta la anisotropía de los aplicadores
    coords_prueba_center = rotor_prueba.apply(coords_slice)

    # Se reorganizan las coordenadas para que tengan la disposicion correcta según lo que pide el objeto "interpolador"
    coords_prueba_center = coords_prueba_center[:, [2,1,0]]

    # ----------------------------------- OBTENCION DE LA DOSE GRID INTERPOLADA -----------------------------------

    # Se calcula la dosis interpolada usando el objeto "interpolador" sobre las coordenadas rotadas
    dosis_interpolada = interpolador(coords_prueba_center)

    # Se obtienen las dimensiones de la dosis interpolada
    x_shape = int(2*radio_slice + 1)
    y_shape = int(2*radio_slice + 1)
    z_shape = int(2*radio_slice + 1)
    
    # Se redefine el shape de la matriz para que tenga el shape adecuado
    dose_grid_final_radial = dosis_interpolada.reshape((z_shape, y_shape, x_shape))

    # -------------------------- IMPLEMENTACION DEL SLICING EN UNA DOSE GRID DE SHAPE ORIGINAL --------------------------
    
    # Se inicializa una grid con las dimensiones que la dose grid final
    dose_grid_final = np.zeros((nz_final, ny_final, nx_final))

    # Radio inferior y superior en el eje Z
    z_0 = nz_final//2 - (z_shape//2)
    z_1 = nz_final//2 + (z_shape//2)

    # Radio inferior y superior en el eje Y
    y_0 = ny_final//2 - (y_shape//2)
    y_1 = ny_final//2 + (y_shape//2)

    # Radio inferior y superior en el eje X
    x_0 = nx_final//2 - (x_shape//2)
    x_1 = nx_final//2 + (x_shape//2)

    # Se inserta la dose grid calculada en la dose grid inicializada con dimensiones de la dose grid final
    dose_grid_final[z_0:z_1+1, y_0:y_1+1, x_0:x_1+1] = dose_grid_final_radial

    # -------------------------- TRASLACION DOSE GRID INTERPOLADA A LA POSICION ORIGINAL --------------------------

    # Se calculan los índices del dwell position de prueba
    i_prueba = int(np.round((coords_prueba[0] - IPP_final[0]) / spacing_final[0])) 
    j_prueba = int(np.round((coords_prueba[1] - IPP_final[1]) / spacing_final[1])) 
    k_prueba = int(np.round((coords_prueba[2] - IPP_final[2]) / z_offset)) 

    # Se guardan las coordenadas
    indices_coords_prueba = np.array([i_prueba, j_prueba, k_prueba])

    # Se calculan los índices de la posición central
    indices_centro = np.array([nx_final//2, ny_final//2, nz_final//2])

    # Se calcula el desplazamiento que hay entre las coordenadas de la posición de prueba y el centro de la dose grid
    x_tras, y_tras, z_tras = indices_coords_prueba - indices_centro

    # ------------------------------------ DOSE GRID INTERPOLADA FINAL       --------------------------------

    # Se inicializa una dose grid con las mismas dimensiones que la dose grid final (esta será la dose grid final que se devolverá)
    dose_grid_interpolada = np.zeros((nz_final, ny_final, nx_final))

    # {X|Y|Z}{0|1}_{src|dst}, indica el eje en cuestión (X|Y|Z), si es el inicio o el final de la matriz (0|1) y si es la matriz de origen o de destino (src|dst)
    # Este código calcula los índices de inicio y fin de la matriz de origen y destino para cada eje

    # Eje X
    x0_src = max(0, -x_tras)
    x1_src = min(nx_final, nx_final - x_tras)
    x0_dst = max(0, x_tras)
    x1_dst = x0_dst + (x1_src - x0_src)

    # Eje Y
    y0_src = max(0, -y_tras)
    y1_src = min(ny_final, ny_final - y_tras)
    y0_dst = max(0, y_tras)
    y1_dst = y0_dst + (y1_src - y0_src)

    # Eje Z
    z0_src = max(0, -z_tras)
    z1_src = min(nz_final, nz_final - z_tras)
    z0_dst = max(0, z_tras)
    z1_dst = z0_dst + (z1_src - z0_src)

    # Se traslada la posición de la dose grid interpolada del centro a la posición que le corresponde del dwell position
    dose_grid_interpolada[z0_dst:z1_dst, y0_dst:y1_dst, x0_dst:x1_dst] = dose_grid_final[z0_src:z1_src, y0_src:y1_src, x0_src:x1_src]

    # Factor para cortar dosis por encima del máximo permitido
    #mask_cut = dose_grid_interpolada >= max_dose_final
    #dose_grid_interpolada [mask_cut] = max_dose_final

    # Reescalado de la dose grid interpolada debido al cambio del Reference Kerma Factor que se aplicó
    #mask_reescalar = dose_grid_interpolada < max_dose_final
    #dose_grid_interpolada[mask_reescalar] = dose_grid_interpolada[mask_reescalar] * kerma_factor

    dose_grid_interpolada = dose_grid_interpolada * kerma_factor

    # Se divide por el tiempo total de referencia para obtener la dosis en Gy/s
    dose_grid_interpolada = dose_grid_interpolada / tiempo_total_ref

    return dose_grid_interpolada

In [38]:
def extraer_coords_contornos_raw(rtstructure):
    """
    Función que extra las coordenadas de los contornos de un archivo RT-Structure determinado.
    Las coordenadas se presentan en (mm) y relativas al paciente.

    Input:
    - rtstructure: Data set del RT-Structure a estudiar
    
    Output:
    - contornos_roi: Diccionario con las coordenadas de los contornos de las estructuras.
    Formato de ejemplo: {'11': [[contorno 1], [contorno 2], ...]}
    """

    # Se definen la lista donde se guardaran los contornos y el diccionario output
    lista_contorno_data = []
    contornos_roi = {}

    # Se recorre cada una de las secuencias de contornos
    for sec_contorno in rtstructure.ROIContourSequence:

        # Se extrae el ROI number asociado a la estructura estudiada
        roi_number = str(sec_contorno.ReferencedROINumber)

        # Se extrae cada uno de los data sets de los contornos
        for contorno in sec_contorno.ContourSequence:

            # Se les cambia la forma para que sean [x,y,z] y se guardan en una lista
            lista_contorno_data.append(np.array(contorno.ContourData).reshape(-1,3))

        # Una vez leídos todos los contornos para el ROI number en concretos
        # se guardan en el diccionario de output
        contornos_roi[roi_number] = lista_contorno_data.copy()
        lista_contorno_data.clear()

    return contornos_roi 

In [39]:
def calcular_dvh_metrics(sorted_doses_array, spacing, supersampling_factor=1):
    """
    Función que calcula las métricas más importantes (D98, D95, D90, D2cc, Dmean) para un array de valores ordenados
    de forma decreciente.
    
    Input:
    - Sorted_doses_array: Array con las dosis de los vóxels contenidos en las estructuras.
                          ordenado de forma decreciente.
    - Spacing: Espaciado de los vóxels en el eje X, Y, Z
    - supersampling_factor: Factor de subdivisión usado en el super-sampling (default=1, sin super-sampling)

    Output:  
    - Diccionario con las métricas calculadas.    
    """

    dict_dose_metrics = {}

    # --- Calcular métricas DVH (D90, D95, D98, D2cc, Dmean) ---

    # Se calcula la dosis media
    dict_dose_metrics['Dmean'] = float(np.mean(sorted_doses_array))

    # Total de sub-vóxeles en la ROI
    total_voxels = len(sorted_doses_array)

    if total_voxels > 0:
        # Calcular D90, D95, D98
        # Dx es la dosis mínima que cubre x% del volumen.
        # Como el array está ordenado de mayor a menor, el índice nos da el valor directamente.
        idx_d90 = int(round(0.90 * total_voxels))
        idx_d95 = int(round(0.95 * total_voxels))
        idx_d98 = int(round(0.98 * total_voxels))

        # Asegurarse de que los índices no se salgan de los límites
        d90 = sorted_doses_array[min(idx_d90, total_voxels - 1)]
        d95 = sorted_doses_array[min(idx_d95, total_voxels - 1)]
        d98 = sorted_doses_array[min(idx_d98, total_voxels - 1)]

        # Se guardan las metricas en el diccionario
        dict_dose_metrics['D98'] = float(d98)
        dict_dose_metrics['D95'] = float(d95)
        dict_dose_metrics['D90'] = float(d90)
        
        # Calcular D2cc (dosis mínima en los 2cc con mayor dosis)
        # Calcular el volumen de un vóxel en cm^3
        voxel_vol_mm3 = spacing[0] * spacing[1] * abs(spacing[2])
        voxel_vol_cc = voxel_vol_mm3/1000
        
        # Si hay super-sampling, cada punto representa un sub-vóxel con volumen reducido
        # El volumen de cada sub-vóxel es el volumen del vóxel original dividido por supersampling_factor^3
        subvoxel_volume_cc = voxel_vol_cc / (supersampling_factor ** 3)
    
        # Calcular cuántos sub-vóxeles corresponden a 2cc
        voxels_in_2cc = int(2.0 / subvoxel_volume_cc)

        # Si el volumen de la estructura es mayor que los 2cc, se busca el D2cc
        if total_voxels >= voxels_in_2cc:
            # El índice para D2cc es (número de sub-vóxeles en 2cc) - 1
            d2cc = sorted_doses_array[voxels_in_2cc]

            dict_dose_metrics['D2cc'] = float(d2cc)

        # Si el volumen es menor, se guarda 0
        else:
            dict_dose_metrics['D2cc'] = float(np.min(sorted_doses_array))
    else:
        print("\n No se pueden calcular las métricas DVH porque la máscara está vacía.")

    return dict_dose_metrics

In [40]:
def conversion_mascaras2coords_supersampled(masks_dict_interpolated, origin, pixel_spacing, orientation, supersampling_factor):
    """
    Función que convierte los índices de los voxels dentro de las estructuras en coordenadas reales
    con super-sampling para aumentar la densidad de puntos.
    
    Input:
    - masks_dict_interpolated: Diccionario con las máscaras interpoladas
    - origin: Origen de coordenadas
    - pixel_spacing: Espaciado para las tres coordenadas (X, Y, Z)
    - orientation: Vectores directores de las imágenes RM/CT
    - supersampling_factor: Factor de subdivisión por dimensión (default=2, genera 2³=8 puntos por voxel)
    
    Output:
    - roi_coordinates: Coordenadas de los sub-voxels dentro de las máscaras
    """
    
    # Diccionario para guardar las coordenadas reales de los puntos dentro de cada estructura
    roi_coordinates = {}

    for roi_name, mask in masks_dict_interpolated.items():
        # Obtener índices de los vóxeles que son parte de la máscara
        indices = np.argwhere(mask)
        
        if len(indices) > 0:
            # Separar índices
            rows = indices[:, 0]  # Índice Y
            cols = indices[:, 1]  # Índice X
            slcs = indices[:, 2]  # Índice Z
            
            # Crear offsets para super-sampling dentro de cada voxel
            # Dividimos cada voxel en 'supersampling_factor' sub-divisiones por eje
            offsets = np.linspace(0, 1, supersampling_factor + 1)[:-1] + 0.5 / supersampling_factor
            
            # Generar todas las combinaciones de offsets
            offset_grid = np.meshgrid(offsets, offsets, offsets, indexing='ij')
            offsets_flat = np.stack([offset_grid[0].ravel(), 
                                    offset_grid[1].ravel(), 
                                    offset_grid[2].ravel()], axis=1)
            
            # Número de sub-puntos por voxel
            num_subpoints = len(offsets_flat)
            
            # Expandir los índices para cada sub-punto
            rows_expanded = np.repeat(rows, num_subpoints)
            cols_expanded = np.repeat(cols, num_subpoints)
            slcs_expanded = np.repeat(slcs, num_subpoints)
            
            # Añadir los offsets a los índices
            offsets_tiled = np.tile(offsets_flat, (len(indices), 1))
            rows_subsampled = rows_expanded + offsets_tiled[:, 0]
            cols_subsampled = cols_expanded + offsets_tiled[:, 1]
            slcs_subsampled = slcs_expanded + offsets_tiled[:, 2]
            
            # Calcular coordenadas reales (mm)
            points = origin + \
                     (rows_subsampled[:, None] * pixel_spacing[0] * orientation[0]) + \
                     (cols_subsampled[:, None] * pixel_spacing[1] * orientation[1]) + \
                     (slcs_subsampled[:, None] * pixel_spacing[2] * orientation[2])
            
            roi_coordinates[roi_name] = points
            
            print(f"ROI '{roi_name}': {len(indices)} voxels originales → {len(points)} puntos super-sampled "
                  f"(factor {supersampling_factor}³ = {num_subpoints}x)")

    return roi_coordinates

----------

3. Se extraen las coordenadas y tiempos adecuados del plan. También se extran los vectores directores

In [67]:
# Se definen los file path para el directorio de estudio general y para los archivos totales DICOM descargados del Brachy Oncentra
directorio_estudio = os.path.join('Muestra poblacional','IC', '4749720','1')

In [68]:
# Se hace una lista de los elemenos del directorio a estudiar
for file_path in os.listdir(directorio_estudio):

    # Se analizan los archivos DICOM
    if file_path.endswith('dcm'):
        ds = dicom.dcmread(os.path.join(directorio_estudio, file_path))
        
        # Si es de tipo RTDOSE, se guarda
        if ds.Modality == 'RTDOSE':
            directorio_rd = os.path.join(directorio_estudio, file_path)
            
        # Si es de tipo RTPLAN, se guarda
        if ds.Modality == 'RTPLAN':
            directorio_rp = os.path.join(directorio_estudio, file_path)
        
        # Si es de tipo RTSTRUCTURE, se guarda
        if ds.Modality == 'RTSTRUCT':
            directorio_rs = os.path.join(directorio_estudio, file_path)

    if file_path == 'CT' or file_path == 'MR':
        series_path = os.path.join(directorio_estudio, file_path)

In [69]:
# Se extrae el dataset del RT Dose y del RT Plan final
rtdose_final = dicom.dcmread(directorio_rd)
rtplan_final = dicom.dcmread(directorio_rp)
rtstructure_final = dicom.dcmread(directorio_rs)

# Se calcula la dose grid del caso final
dose_final = rtdose_final.pixel_array * rtdose_final.DoseGridScaling

# Se inicializan las variables finales
coords_canales = {}  # Diccionario para almacenar las coordenadas de cada canal
tiempo_total = {}  # Diccionario para almacenar los tiempos totales de cada canal

# Se llama a la función para extraer las coordenadas y los tiempos de cada canal del plan
extraer_coordenadas_tiempos(rtplan_final, coords_canales, tiempo_total)

# Ahora se calcularán los vectores directores de cada dwell position
vectores_directores = {}  # Diccionario para almacenar los vectores directores
obtener_vectores_directores(coords_canales, vectores_directores)

4. Extraer coordenadas y dose grid de la posicion de referencia. También se extraen los rotores para cada dwell position.

In [70]:
# Introducir el path del archivo RP y RD que se quiere leer
RP_ref = os.path.join('Reference Source','RP.1.3.6.1.4.1.2452.6.3170520193.1192519936.4020589228.1667488529.dcm')
RD_ref = os.path.join('Reference Source','RD.1.3.6.1.4.1.2452.6.1073808979.1236671370.1718468265.2341769518.dcm')

# Leer archivos DICOMs del caso de referencia
rtplan_ref = dicom.dcmread(RP_ref)
rtdose_ref = dicom.dcmread(RD_ref)

# Coordenadas de referencia (saca las coordenadas de referencia indistintamente del canal que se haya escogido para ello)
channel_sequence = rtplan_ref.ApplicationSetupSequence[0].ChannelSequence

# Se recorre cada canal hasta encontrar las coordenadas de referencia
for n in range(len(channel_sequence)):
    for elem in channel_sequence[n].BrachyControlPointSequence:
        if hasattr(elem, 'ControlPoint3DPosition'):
            coords_ref = elem.ControlPoint3DPosition
            coords_ref = np.round(np.array(coords_ref),5)
            break

# Se extrae la dose grid de referencia
dose_grid_ref = rtdose_ref.pixel_array * rtdose_ref.DoseGridScaling

rotores_finales = {}  # Diccionario para almacenar los rotores de cada dwell position
obtener_rotores(vectores_directores, rotores_finales)

5. Se define la clase "estructuras" y se extraen las máscaras de cada estructura. Se rellenan las slices vacías mediante interpolación lineal.

In [80]:
# Se define la clase estructuras
estructuras = RTStructBuilder.create_from(series_path, directorio_rs)

# Se extraen las mascaras de cada organo
masks_dict = obtener_mascaras(estructuras)

# Se interpolan para obtener los volumenes rellenos
masks_dict_interpolated = {}
for key, mask in masks_dict.items():

    # Se saca la máscara que se quiera analizar
    original_mask = masks_dict[str(key)]

    # Aplicar interpolación
    interpolated_mask = interpolar_contornos_faltantes(original_mask)

    # Se guarda la máscara interpolada
    masks_dict_interpolated[str(key)] = interpolated_mask.copy()

6. Se definen el spacing y orientation de las imágenes CT/RM para pasar de los índices de los vóxels de las máscaras a coordenadas en el sistema paciente.

In [78]:
# Listar archivos DICOM
dicom_files = [os.path.join(series_path, f) for f in os.listdir(series_path) if f.endswith('.dcm')]

# Leer cabeceras para ordenar
slices = [dicom.dcmread(f) for f in dicom_files]

# Calcular vector normal al plano para ordenamiento
iop = np.array(slices[0].ImageOrientationPatient)
r_vec = iop[:3] # Dirección de filas (X imagen, incrementa columna) (row vector)
c_vec = iop[3:] # Dirección de columnas (Y imagen, incrementa fila) (col vector)
s_vec_calc = np.cross(r_vec, c_vec) # Dirección perpendicular a filas y columnas (meramente para hacer el calculo correcto)

# Ordenar slices por posición en Z
slices.sort(key=lambda x: np.dot(np.array(x.ImagePositionPatient), s_vec_calc))

# Parámetros geométricos del primer slice (origen del volumen)
origin = np.array(slices[0].ImagePositionPatient)
pixel_spacing = np.array(slices[0].PixelSpacing) # [RowSpacing (Y), ColSpacing (X)]

# Calcular espaciado en Z y vector de dirección de slice
if len(slices) > 1:
    p1 = np.array(slices[0].ImagePositionPatient)
    p2 = np.array(slices[1].ImagePositionPatient)

    # Proyección sobre la normal
    z_spacing = np.dot(p2 - p1, s_vec_calc)
    
    # Vector dirección real entre slices
    if abs(z_spacing) > 1e-5:
        s_vec = (p2 - p1) / np.linalg.norm(p2 - p1) * np.sign(z_spacing)
        z_spacing = np.linalg.norm(p2 - p1) * np.sign(z_spacing)
    else:
        s_vec = s_vec_calc
else:
    z_spacing = getattr(slices[0], 'SliceThickness', 1.0)
    s_vec = s_vec_calc

# Se definen los espaciados en cada direccón y los vectores directores
pixel_spacing_total = [pixel_spacing[0], pixel_spacing[1], z_spacing] # (Y,X,Z)
orientation = [c_vec, r_vec, s_vec] # (Y,X,Z)

# Se llama a la función conversión de índices en máscaras a coordenadas reales
coordenadas_roi = conversion_mascaras2coords(masks_dict_interpolated, origin, pixel_spacing_total, orientation)

In [73]:
# Aplicar super-sampling con subdivisión regular
supersampling_factor = 1  # Ajusta este valor según necesites más o menos densidad

7. Se extrae la información de la dose grid final.

In [48]:
# Se extraen las dimensiones y geometría de la dose grid final
dim_final = dose_final.shape
IPP_final = np.array(rtdose_final.ImagePositionPatient, dtype=float) # (X,Y,Z)
spacing_final = np.array(rtdose_final.PixelSpacing, dtype=float) # (X,Y,Z)
z_offset = np.array(rtdose_final.GridFrameOffsetVector, dtype = float)[1]
max_dose_final = np.max(dose_final)

# Se crean los arrays de coordenadas para cada una de las dimensiones
x = IPP_final[0] + np.arange(dim_final[2]) * spacing_final[0] # Coordenadas en mm centradas en el origen (X)
y = IPP_final[1] + np.arange(dim_final[1]) * spacing_final[1]  # Coordenadas en mm centradas en el origen (Y)
z = IPP_final[2] + np.arange(dim_final[0]) * z_offset  # Coordenadas en mm centradas en el origen (Z)

8. Se define el interpolador de referencia y las coordenadas centradas.

In [None]:
# Radio para slicing
radio_slice = 100
coords_slice, interpolador_ref = obtener_coordenadas_slicing_interpolador(rtdose_ref, radio_slice)

9. Se interpola la dose grid para la dwell position de estudio y se define su interpolador. Se les aplica el interpolador a las coordenadas de los voxels dentro de las estructuras.
Se guardan los arrays de dosis en un archivo de tipo .npz.

In [None]:
# Se extrae el tiempo total del plan de referencia
tiempo_total_ref = rtplan_ref.ApplicationSetupSequence[0].ChannelSequence[0].ChannelTotalTime

# Se globalizan las variables kerma_factor
global kerma_factor

# Ahora se calcula el kerma_factor para el reescalado de la dose grid interpolada
kerma_ref = rtplan_ref.SourceSequence[0].ReferenceAirKermaRate
kerma_final = rtplan_final.SourceSequence[0].ReferenceAirKermaRate
kerma_factor = kerma_final / kerma_ref

# Se crea el directorio donde se guardan los archivos
dosis_voxels = os.path.join(directorio_estudio, 'Dosis voxels')
os.makedirs(dosis_voxels, exist_ok=True)

# Se recorre los diferentes canales
for i in coords_canales.keys():

    # Se recorre cada dwell position del canal actual
    for j in range(len(coords_canales[i])):

        # Se extraen las coordenadas y el rotor asociado a la dwell position actual
        coords_prueba = coords_canales[i][j]
        rotor_prueba = rotores_finales[tuple(coords_prueba)]

        # Se obtiene la dose grid interpolada para la dwell position actual
        dose_grid_interpolada = obtener_dose_grid(rotor_prueba, coords_prueba, coords_slice, interpolador_ref, dim_final, IPP_final, spacing_final, z_offset, max_dose_final, tiempo_total_ref, radio_slice)

        # Se crea el interpolador para la dose grid interpolada calculada
        interpolador_dg_inter = RegularGridInterpolator((z,y,x), dose_grid_interpolada, bounds_error=False, fill_value=0)

        # Diccionario para guardar voxels de cada ROI
        voxels_dict = {}        

        # Se leen las coordenadas de los voxels dentro de las estructuras y se obtienen sus dosis interpoladas
        for roi_name, points in coordenadas_roi.items():
            if len(points) > 0:
                points_zyx = points[:, ::-1] 
                doses = interpolador_dg_inter(points_zyx)
                voxels_dict[f'{roi_name}'] = doses

        # Guardar con np.savezd (más eficiente en tiempo)
        filename = f"dwell_{i}_{j+1}.npz"
        filepath = os.path.join(dosis_voxels, filename)
        np.savez(filepath, **voxels_dict)

        # Se borra la dose grid interpolada para liberar memoria
        del dose_grid_interpolada
        del interpolador_dg_inter
        del voxels_dict

        # Se indica con un checkpoint la posición en la que se va  
        print(f'Elemento{i}{j+1} hecho', flush=True)

Elemento11 hecho
Elemento12 hecho
Elemento13 hecho
Elemento14 hecho
Elemento21 hecho
Elemento22 hecho
Elemento23 hecho
Elemento24 hecho
Elemento31 hecho
Elemento32 hecho
Elemento33 hecho
Elemento34 hecho
Elemento35 hecho
Elemento36 hecho
Elemento37 hecho


---------------

#### Caso de estudio de métricas para dose grid interpolada (Interpolación trilineal)

In [157]:
import time

# Se inicia el contador de tiempo
tiempo_inicio = time.time()

# Se ordena el directorio para que se lean los archivos en el orden adecuado
directorio_npz = sorted(
    os.listdir(os.path.join(directorio_estudio, 'Dosis voxels')),
    key=lambda x: [int(num) for num in x.replace('.npz', '').split('_')[1:]])

# Se extraen los tiempos de permanencia en el orden adecuado
tiempos_posiciones = list(tiempo_total.values())
rois_amount = len(coordenadas_roi)
metricas_finales = {}

# Se leen cada uno de los archivos npz
for i in range(len(directorio_npz)):
    
    # Se extrae el tiempo asociado a la posición de permanencia de estudio
    tiempo_i = tiempos_posiciones[i]

    # Se lee la información del archivo
    data = np.load(os.path.join(directorio_estudio, 'Dosis voxels', directorio_npz[i]))

    # Se lee cada uno de los arrays para cada ROI
    for roi_name in data.files:
        
        # Si es el primer elemento, se guarda
        if i == 0:
            metricas_finales[str(roi_name)] = data[str(roi_name)]*tiempo_i

        # Si es el siguiente elemento, se suma al existente 
        else:
            metricas_finales[str(roi_name)] += data[str(roi_name)]*tiempo_i

    # Se cierra la carpeta de los archivos npz
    data.close()

# Se crea el diccionario que guardará las métricas finales
DVH_mask_interpolado = {}

for roi_name, array_dosis in metricas_finales.items():

    # Se organiza el array de forma decreciente
    dose_sorted_desc = np.sort(array_dosis)[::-1] 

    # Se calculan las métricas y se guardan por ROI number
    DVH_mask_interpolado[str(roi_name)] = calcular_dvh_metrics(dose_sorted_desc, pixel_spacing_total, supersampling_factor) 

# Se calcula el tiempo total de ejecución
tiempo_fin = time.time()
tiempo = tiempo_fin - tiempo_inicio
print(f'\nTiempo total de ejecución: {tiempo:.4f} segundos')


Tiempo total de ejecución: 8.3205 segundos


In [158]:
DVH_mask_interpolado

{'HR CTV TC': {'Dmean': 15.007148127873615,
  'D98': 7.126792107113662,
  'D95': 7.629125316686703,
  'D90': 8.155113195737158,
  'D2cc': 24.84050014691022},
 'HR CTV2 TC': {'Dmean': 15.83729339244457,
  'D98': 7.793039619451906,
  'D95': 8.326288280415094,
  'D90': 8.958067443032176,
  'D2cc': 24.075402904910824},
 'RECTUM': {'Dmean': 2.5823073548169493,
  'D98': 1.125006608806746,
  'D95': 1.2145943821556011,
  'D90': 1.3257622997968206,
  'D2cc': 6.159615150741601},
 'BLADDER': {'Dmean': 3.756009471915288,
  'D98': 1.7796494254260635,
  'D95': 1.9436307089884477,
  'D90': 2.1581091537806163,
  'D2cc': 6.52405702797179},
 'BOWEL': {'Dmean': 1.6923203134340628,
  'D98': 0.5008223651637868,
  'D95': 0.5573587833853269,
  'D90': 0.6249593051306082,
  'D2cc': 6.11448778243798},
 'SIGMA': {'Dmean': 2.2091064421803135,
  'D98': 1.1334712709628616,
  'D95': 1.1937963442373405,
  'D90': 1.2662169696760062,
  'D2cc': 3.578428951877497}}

------------

#### Caso de estudio de métricas para dose grid real (RT-Dose)

In [26]:
import time

# Se inicia el contador de tiempo
tiempo_inicio = time.time()


interpolador_dg_real = RegularGridInterpolator((z,y,x), dose_final, bounds_error=False, fill_value=0)

voxels_dict_real = {}

for roi_name, points in coordenadas_roi.items():
    if len(points) > 0:
                points_zyx = points[:, ::-1] 
                doses = interpolador_dg_real(points_zyx)
                voxels_dict_real[f'{roi_name}'] = doses

# Se crea el diccionario que guardará las métricas finales
DVH_mask_real = {}

for roi_name, array_dosis in voxels_dict_real.items():

    # Se organiza el array de forma decreciente
    dose_sorted_desc = np.sort(array_dosis)[::-1] 

    # Se calculan las métricas y se guardan por ROI number
    DVH_mask_real[str(roi_name)] = calcular_dvh_metrics(dose_sorted_desc, pixel_spacing_total, supersampling_factor) 

# Se calcula el tiempo total de ejecución
tiempo_fin = time.time()
tiempo = tiempo_fin - tiempo_inicio
print(f'\nTiempo total de ejecución: {tiempo:.4f} segundos')


Tiempo total de ejecución: 0.1442 segundos


In [27]:
DVH_mask_real

{'HR CTV': {'Dmean': 19.698106440150397,
  'D98': 6.448850674266291,
  'D95': 7.625221859517975,
  'D90': 8.860728283723459,
  'D2cc': 31.39196768508008},
 'RECTUM': {'Dmean': 1.4591250679326855,
  'D98': 0.6905339930808383,
  'D95': 0.7576440720264171,
  'D90': 0.8280145081656743,
  'D2cc': 2.4436997418351947},
 'BLADDER': {'Dmean': 1.8827251777150529,
  'D98': 0.8196010288881648,
  'D95': 0.8894791458301065,
  'D90': 0.9852459528949352,
  'D2cc': 4.224405502774266},
 'BOWEL': {'Dmean': 1.4666409352807797,
  'D98': 0.33354766742864905,
  'D95': 0.4516050847551043,
  'D90': 0.5629545190905173,
  'D2cc': 4.615393547178714},
 'SIGMA': {'Dmean': 2.3432085882725167,
  'D98': 1.2826867478961939,
  'D95': 1.3750724189910184,
  'D90': 1.493668779672452,
  'D2cc': 2.99009791699482},
 'GTV': {'Dmean': 42.30656892620842,
  'D98': 20.789781204259764,
  'D95': 22.47308675367281,
  'D90': 24.72267579243317,
  'D2cc': 17.25786648679272}}

-------------

### Comparación métricas con dose grid real y dose grid interpolada

In [28]:
# Código de comparación de DVH metrics
for key in DVH_mask_real.keys():

        for metric in DVH_mask_real[key]:
            DVH_real = DVH_mask_real[key][metric]
            DVH_calculado = DVH_mask_interpolado[key][metric]

            # Si el caso real da 0, se evita de escribirlo para evitar la división por 0
            if DVH_real == 0:
                continue
            
            # Se calcula el error relativo que existe entre el valor calculado y el de referencia
            d_metric = ((DVH_calculado - DVH_real)/DVH_real)*100
            print('La diferencia del', metric ,'para', key, 'es de:', np.round(d_metric, 3), '%')
    
        print('------------------------------------------------------------')

La diferencia del Dmean para HR CTV es de: -4.384 %
La diferencia del D98 para HR CTV es de: -0.796 %
La diferencia del D95 para HR CTV es de: -0.416 %
La diferencia del D90 para HR CTV es de: 0.161 %
La diferencia del D2cc para HR CTV es de: -0.615 %
------------------------------------------------------------
La diferencia del Dmean para RECTUM es de: -0.701 %
La diferencia del D98 para RECTUM es de: -0.846 %
La diferencia del D95 para RECTUM es de: -0.85 %
La diferencia del D90 para RECTUM es de: -0.889 %
La diferencia del D2cc para RECTUM es de: -0.646 %
------------------------------------------------------------
La diferencia del Dmean para BLADDER es de: -0.779 %
La diferencia del D98 para BLADDER es de: -0.782 %
La diferencia del D95 para BLADDER es de: -0.897 %
La diferencia del D90 para BLADDER es de: -0.932 %
La diferencia del D2cc para BLADDER es de: -0.879 %
------------------------------------------------------------
La diferencia del Dmean para BOWEL es de: 0.371 %
La di

--------

In [74]:
# Se define un pequeño código que crea un diccionario de correspondencia entre el ROI Number y el ROI Name
roi_name = {str(elem.ROINumber) : str(elem.ROIName) for elem in rtstructure_final.StructureSetROISequence}

# Se extraen los contornos de las estructuras
contornos_roi = extraer_coords_contornos_raw(rtstructure_final)

# Se crea el directorio de los GIFs si no existe
os.makedirs(os.path.join(directorio_estudio, 'GIFs'), exist_ok=True)

In [None]:
# Visualizar todos los contornos del diccionario en 3D con selector de ROI
import plotly.graph_objects as go
import plotly.express as px

fig = go.Figure()

# Generar una paleta de colores (un color por ROI)
colors_roi = px.colors.qualitative.Plotly + px.colors.qualitative.Set3

# Crear traces para cada ROI
for roi_idx, (roi_key, contours_list) in enumerate(contornos_roi.items()):
    roi_color = colors_roi[roi_idx % len(colors_roi)]
    
    # Añadir cada contorno de esta ROI con el mismo color
    for i, contour in enumerate(contours_list):
        # Cerrar el contorno conectando el último punto con el primero
        closed_contour = np.vstack([contour, contour[0]])
        
        # Se grafica la traza de cada contorno
        fig.add_trace(go.Scatter3d(
            x=closed_contour[:, 0],
            y=closed_contour[:, 1],
            z=closed_contour[:, 2],
            mode='lines',
            name=f'{roi_key} - {roi_name[str(roi_key)]}',
            line=dict(width=3, color=roi_color),
            legendgroup=f'{roi_key} - {roi_name[str(roi_key)]}',
            showlegend=(i == 0),  # Solo mostrar en la leyenda el primer contorno de cada ROI
            visible=True  # Todas visibles por defecto
        ))

# Añadir las posiciones de permanencia de los catéteres
# Paleta de colores oscuros para mejor contraste con los contornos
colors_canales = ['#00008B', '#8B0000', '#000000', '#006400', '#8B008B', 
                  '#FF8C00', '#4B0082', '#8B4513', '#2F4F4F', '#800000',
                  '#191970', '#556B2F', '#483D8B', "#8B3D3D", "#578B3D"]

for canal_idx, (canal_num, coordenadas) in enumerate(coords_canales.items()):
    coords_array = np.array(coordenadas)

    # Se define el color para el canal
    canal_color = colors_canales[canal_idx % len(colors_canales)]
    
    # Se grafica la traza de cada catéter
    fig.add_trace(go.Scatter3d(
        x=coords_array[:, 0],
        y=coords_array[:, 1],
        z=coords_array[:, 2],
        mode='lines+markers',
        name=f'Channel {canal_num}',
        line=dict(width=5, color=canal_color),
        marker=dict(size=4, color=canal_color),
        showlegend=True,
        visible=True
    ))

# Crear botones para cada ROI + opción "Todas"
buttons = []

# Botón para mostrar todas las ROIs
buttons.append(
    dict(
        label="All ROIs",
        method="update",
        args=[{"visible": [True] * len(fig.data)}]
    )
)

# Calcular índices donde empiezan los catéteres
num_roi_traces = sum(len(contours_list) for contours_list in contornos_roi.values())
num_canal_traces = len(coords_canales)

# Botón para cada ROI individual
trace_idx = 0
for roi_idx, (roi_key, contours_list) in enumerate(contornos_roi.items()):
    num_contours = len(contours_list)
    
    # Crear lista de visibilidad: True para los contornos de esta ROI y para los catéteres
    visibility = [False] * len(fig.data)
    for i in range(trace_idx, trace_idx + num_contours):
        visibility[i] = True
    
    # Mantener visibles los catéteres (últimas trazas)
    for i in range(num_roi_traces, num_roi_traces + num_canal_traces):
        visibility[i] = True
    
    buttons.append(
        dict(
            label=f"{roi_key} - {roi_name[str(roi_key)]}",
            method="update",
            args=[{"visible": visibility}]
        )
    )
    
    trace_idx += num_contours

fig.update_layout(
    title=dict(
        text=f'RT-STRUCTURE contours',
        x=0.5,
        xanchor='center'
    ),
    scene=dict(
    xaxis_title='X (mm)',
    yaxis_title='Y (mm)',
    zaxis_title='Z (mm)',
    aspectmode='data',
    ),
    width=900,
    height=700,
    legend=dict(
        x=1.02,
        y=0.5,
        xanchor='left',
        yanchor='middle'
    ),
    updatemenus=[
        dict(
            type="dropdown",
            direction="down",
            x=-0.05,
            y=1.14,
            xanchor='left',
            yanchor='top',
            showactive=True,
            buttons=buttons
        )
    ]
)

fig.show()

# Guardar el gráfico en formato HTML
html_path = os.path.join(directorio_estudio, 'GIFs', 'RT-Structure contours.html')
fig.write_html(html_path)

In [None]:
# Visualización 3D de todas las máscaras con selector interactivo (Coordenadas Reales)
import plotly.graph_objects as go
from skimage import measure

# Listar archivos DICOM
dicom_files = [os.path.join(series_path, f) for f in os.listdir(series_path) if f.endswith('.dcm')]
    
# Leer cabeceras para ordenar
slices = [dicom.dcmread(f) for f in dicom_files]

# Calcular vector normal al plano para ordenamiento
s0 = slices[0]
iop = np.array(s0.ImageOrientationPatient)
r_vec = iop[:3] # Dirección de filas (X imagen, incrementa columna)
c_vec = iop[3:] # Dirección de columnas (Y imagen, incrementa fila)
s_vec_calc = np.cross(r_vec, c_vec)

# Ordenar slices por posición en Z
slices.sort(key=lambda x: np.dot(np.array(x.ImagePositionPatient), s_vec_calc))

# Parámetros geométricos del primer slice (origen del volumen)
origin = np.array(slices[0].ImagePositionPatient)
pixel_spacing = np.array(slices[0].PixelSpacing) # [RowSpacing (Y), ColSpacing (X)]

# Calcular espaciado en Z y vector de dirección de slice
if len(slices) > 1:
    p1 = np.array(slices[0].ImagePositionPatient)
    p2 = np.array(slices[1].ImagePositionPatient)
    # Proyección sobre la normal
    z_spacing = np.dot(p2 - p1, s_vec_calc)
    # Vector dirección real entre slices
    if abs(z_spacing) > 1e-5:
        s_vec = (p2 - p1) / np.linalg.norm(p2 - p1) * np.sign(z_spacing)
        z_spacing = np.linalg.norm(p2 - p1) * np.sign(z_spacing)
    else:
        s_vec = s_vec_calc
else:
    z_spacing = getattr(slices[0], 'SliceThickness', 1.0)
    s_vec = s_vec_calc

# Colores para cada ROI
colors_roi = [
    'rgba(40, 142, 186, 0.7)',   # Azul
    'rgba(230, 57, 70, 0.7)',     # Rojo
    'rgba(42, 157, 143, 0.7)',    # Verde azulado
    'rgba(247, 127, 0, 0.7)',     # Naranja
    'rgba(6, 255, 165, 0.7)',     # Verde brillante
    'rgba(188, 71, 73, 0.7)',     # Rojo oscuro
    'rgba(78, 205, 196, 0.7)',    # Turquesa
    'rgba(255, 107, 107, 0.7)',   # Rosa
    'rgba(149, 225, 211, 0.7)',   # Verde claro
    'rgba(243, 129, 129, 0.7)'    # Rosa claro
]

# Crear figura
fig = go.Figure()

# Lista para guardar los nombres de las ROIs para los botones
roi_names_list = []

# Procesar cada máscara
for idx, (roi_name, mask) in enumerate(masks_dict_interpolated.items()):
        roi_names_list.append(roi_name)
        
        # Extraer superficie 3D usando marching cubes
        verts, faces, normals, values = measure.marching_cubes(
            mask.astype(float), 
            level=0.5
        )
                
        # Transformar vértices de índices a coordenadas reales (mm)
        # P = Origin + (row_idx * dy * c_vec) + (col_idx * dx * r_vec) + (slice_idx * dz * s_vec)
        # verts[:, 0] -> row index (Y)
        # verts[:, 1] -> col index (X)
        # verts[:, 2] -> slice index (Z)
                
        real_verts = origin + \
                    np.outer(verts[:, 0], pixel_spacing[0] * c_vec) + \
                    np.outer(verts[:, 1], pixel_spacing[1] * r_vec) + \
                    np.outer(verts[:, 2], z_spacing * s_vec)
                
        # Color para esta ROI
        roi_color = colors_roi[idx % len(colors_roi)]
                
        # Añadir la malla 3D
        fig.add_trace(go.Mesh3d(
            x=real_verts[:, 0],
            y=real_verts[:, 1],
            z=real_verts[:, 2],
            i=faces[:, 0],
            j=faces[:, 1],
            k=faces[:, 2],
            color=roi_color,
            opacity=0.7,
            name=roi_name,
            hovertemplate=f'<b>{roi_name}</b><br>X: %{{x:.1f}} mm<br>Y: %{{y:.1f}} mm<br>Z: %{{z:.1f}} mm<extra></extra>',
            showlegend=True
        ))

# Añadir las posiciones de permanencia de los catéteres
# Paleta de colores oscuros para mejor contraste con los contornos
colors_canales = ['#00008B', '#8B0000', '#000000', '#006400', '#8B008B', 
                  '#FF8C00', '#4B0082', '#8B4513', '#2F4F4F', '#800000',
                  '#191970', '#556B2F', '#483D8B']

for canal_idx, (canal_num, coordenadas) in enumerate(coords_canales.items()):
    coords_array = np.array(coordenadas)

    # Se define el color para el canal
    canal_color = colors_canales[canal_idx % len(colors_canales)]
    
    # Se grafica la traza de cada catéter
    fig.add_trace(go.Scatter3d(
        x=coords_array[:, 0],
        y=coords_array[:, 1],
        z=coords_array[:, 2],
        mode='lines+markers',
        name=f'Channel {canal_num}',
        line=dict(width=5, color=canal_color),
        marker=dict(size=4, color=canal_color),
        showlegend=True,
        visible=True
    ))

# Crear botones para cada ROI + opción "Todas"
buttons = []
num_rois = len(masks_dict_interpolated)
num_canales = len(coords_canales)
total_traces = num_rois + num_canales

# Botón para mostrar todas las ROIs
buttons.append(
    dict(
        label="All ROIs",
        method="update",
        args=[{"visible": [True] * total_traces}]
    )
)

# Botón para cada ROI individual
for i, r_name in enumerate(roi_names_list):
    # Visibility: False for all ROIs except current i. True for all catheters.
    # Las trazas de ROI van de 0 a num_rois-1
    # Las trazas de catéteres van de num_rois a total_traces-1
    
    vis = [False] * total_traces
    vis[i] = True # Mostrar ROI actual
    
    # Mantener visibles los catéteres
    for j in range(num_rois, total_traces):
        vis[j] = True
    
    buttons.append(
        dict(
            label=r_name,
            method="update",
            args=[{"visible": vis}]
        )
    )

# Configurar layout
fig.update_layout(
    title=dict(
        text='3D interpolated masks',
        x=0.5,
        xanchor='center',
        font=dict(size=18, color='#333')
    ),
    scene=dict(
        xaxis=dict(
            title='X (mm)',
            backgroundcolor="rgb(230, 230, 230)",
            gridcolor="white",
            showbackground=True
        ),
        yaxis=dict(
            title='Y (mm)',
            backgroundcolor="rgb(230, 230, 230)",
            gridcolor="white",
            showbackground=True
        ),
        zaxis=dict(
            title='Z (mm)',
            backgroundcolor="rgb(230, 230, 230)",
            gridcolor="white",
            showbackground=True
        ),
        aspectmode='data',
        camera=dict(
            eye=dict(x=1.5, y=1.5, z=1.3)
        )
    ),
    width=1000,
    height=800,
    legend=dict(
        title=dict(text='<b>ROIs</b>'),
        x=1.02,
        y=0.5,
        xanchor='left',
        yanchor='middle',
        bgcolor='rgba(255, 255, 255, 0.8)',
        bordercolor='#000',
        borderwidth=1
    ),
    updatemenus=[
        dict(
            type="dropdown",
            direction="down",
            x=-0.05,
            y=1.14,
            xanchor='left',
            yanchor='top',
            showactive=True,
            buttons=buttons
        )
    ],
    hovermode='closest'
)

fig.show()

# Opcional: guardar como HTML
html_path = os.path.join(directorio_estudio, 'GIFs', '3D interpolated masks.html')
fig.write_html(html_path)