In [3]:
from dotenv import load_dotenv
import os
import sys
import lightning as pl
load_dotenv()  # legge il file .env
repo_path = os.getenv("REPO_PATH")
sys.path.append(repo_path)
os.chdir(repo_path)
%load_ext autoreload
%autoreload 2

## Analyze experiment result

Unisce i risultati dai vari esperimenti unendoli in un unica tabella e creando una tabella di valori medi

In [4]:
import yaml

def dotdict_constructor(loader, node):
    """Costruisce in modo sicuro un dizionario da un nodo di mappatura YAML, ignorando il tag personalizzato."""
    return loader.construct_mapping(node, deep=True)

Loader = yaml.SafeLoader
Loader.add_constructor('tag:yaml.org,2002:python/object:utils.util.DotDict', dotdict_constructor)

def retrieve_hyperparameters(hparams_path: str) -> dict:
    """
    Legge un file hparams.yaml di Pytorch Lightning, estrae il tipo di generatore
    e la combinazione di loss utilizzate.

    Args:
        hparams_path (str): Il percorso del file hparams.yaml.

    Returns:
        dict: Un dizionario con le chiavi 'generator' e 'loss'.
              Restituisce un dizionario vuoto se il file non viene trovato,
              è vuoto, ha una struttura inattesa o si verifica un errore.
    """
    extracted_data = {}

    try:
        with open(hparams_path, 'r') as file:
            # Usa il loader personalizzato per analizzare correttamente il file senza errori.
            hparams = yaml.load(file, Loader=Loader)

        if not hparams or 'opt' not in hparams:
            print(f"Attenzione: Il file YAML '{hparams_path}' è vuoto o non ha la struttura attesa (manca la chiave 'opt').")
            return {}

        generator = hparams.get('opt', {}).get('model', {}).get('generator', {}).get('netG', 'Non specificato')

        losses_config = hparams.get('opt', {}).get('training', {}).get('losses', {})
        
        active_losses = [
            key.replace('lambda_', '').upper()
            for key, value in losses_config.items()
            if key.startswith('lambda_') and isinstance(value, (int, float)) and value > 0
        ]
        
        loss_str = '+'.join(sorted(active_losses)) if active_losses else 'Non specificata'

        extracted_data = {
            'generator': generator,
            'loss': loss_str
        }

    except FileNotFoundError:
        print(f"Errore: Il file non è stato trovato al percorso '{hparams_path}'")
    except (yaml.YAMLError, AttributeError) as e:
        print(f"Errore durante il parsing del file YAML '{hparams_path}': {e}")

    return extracted_data


In [5]:
import json
def read_metrics_from_json(file_path: str) -> dict:
    """
    Legge un file JSON contenente metriche di valutazione e le restituisce.

    Args:
        file_path (str): Il percorso del file JSON da leggere.  

    Returns:
        dict: Un dizionario contenente le metriche lette dal file.
              Restituisce un dizionario vuoto se il file non viene trovato,
              non è un JSON valido o si verifica un altro errore di lettura.
    """
    try:
        with open(file_path, 'r') as f:
            metrics = json.load(f)
        return metrics
    except FileNotFoundError:
        print(f"Errore: Il file non è stato trovato al percorso '{file_path}'")
        return {}
    except json.JSONDecodeError:
        print(f"Errore: Il file '{file_path}' non contiene un JSON valido o è corrotto.")
        return {}
    except Exception as e:
        print(f"Si è verificato un errore inatteso durante la lettura di '{file_path}': {e}")
        return {}

In [None]:
def analyze_experiment_results( main_path: str, 
                                testing_datasets: list[str],
                                metrics_to_analyze: list[str] = ['MAE', 'PSNR', 'SSIM', 'MM-SSIM'], 
                                outlier_metric: str = 'SSIM' ) -> tuple[dict, pd.DataFrame]:
    """
    Args:
        main_path (str): Percorso principale contenente le directory delle versioni.
        testing_datasets (list[str]): Lista dei nomi dei dataset di test.
        metrics_to_analyze (list[str], optional): Lista delle metriche da analizzare.
        outlier_metric (str, optional): Metrica su cui basare la rimozione degli outlier.

    Returns:
        tuple[dict, pd.DataFrame]:
            - Dizionario con i risultati dettagliati (grezzi) per ogni esperimento.
            - DataFrame riassuntivo con le medie (senza outlier).
    """
    all_results = []

    # 1. Standardizza i parametri di input in MAIUSCOLO per coerenza
    metrics_to_analyze_upper = [m.upper() for m in metrics_to_analyze]
    outlier_metric_upper = outlier_metric.upper()

    if outlier_metric_upper not in metrics_to_analyze_upper:
        raise ValueError(f"La metrica per l'outlier '{outlier_metric}' deve essere in 'metrics_to_analyze'.")

    print(f"Avvio analisi in: {main_path}")
    for version_folder in sorted(os.listdir(main_path)):
        version_path = os.path.join(main_path, version_folder)
        if not os.path.isdir(version_path) or not version_folder.startswith('version_'):
            continue
        
        # ... (recupero hparams) ...
        hparams_path = os.path.join(version_path, "hparams.yaml")
        h_params = retrieve_hyperparameters(hparams_path)
        if not h_params: continue

        test_results_base_path = os.path.join(version_path, "test_results")
        for test_dataset in testing_datasets:
            dataset_path = os.path.join(test_results_base_path, test_dataset)
            if not os.path.isdir(dataset_path): continue

            for subject in os.listdir(dataset_path):
                subject_path = os.path.join(dataset_path, subject)
                if not os.path.isdir(subject_path): continue
                
                metrics_path = os.path.join(subject_path, "metrics.json")
                metrics = read_metrics_from_json(metrics_path)

                if metrics:
                    standardized_metrics = {key.upper(): value for key, value in metrics.items()}

                    record = {
                        'VERSION': version_folder,
                        'GENERATOR': h_params.get('generator', 'N/A'),
                        'LOSS': h_params.get('loss', 'N/A'),
                        'TEST_DATASET': test_dataset,
                        'SUBJECT': subject,
                        **standardized_metrics  # Usa il dizionario con le chiavi già corrette
                    }
                    all_results.append(record)

    if not all_results:
        print("Nessun risultato trovato. Restituisco strutture vuote.")
        return {}, pd.DataFrame()

    master_df = pd.DataFrame(all_results)


    # Blocco di codice corretto e completo per la rimozione outlier
    outlier_free_df = master_df.copy()
    if outlier_metric_upper in outlier_free_df.columns:
        print(f"\nRimozione outlier basata su '{outlier_metric_upper}'...")
        
        # Assicurati che queste due righe siano identiche nel tuo codice
        q1 = master_df.groupby(['VERSION', 'TEST_DATASET'])[outlier_metric_upper].transform('quantile', 0.25)
        q3 = master_df.groupby(['VERSION', 'TEST_DATASET'])[outlier_metric_upper].transform('quantile', 0.75)
        
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        
        is_not_outlier = master_df[outlier_metric_upper] >= lower_bound
        outlier_free_df = master_df[is_not_outlier]
        
        print(f"Analisi completata. Rimossi {len(master_df) - len(outlier_free_df)} record su {len(master_df)}.")
    else:
        print(f"\nAttenzione: La metrica per outlier '{outlier_metric_upper}' non trovata. Salto la rimozione.")
    summary_df = pd.DataFrame()
    final_metrics_to_analyze = [m for m in metrics_to_analyze_upper if m in outlier_free_df.columns]

    if not final_metrics_to_analyze:
        print("Nessuna delle metriche specificate è stata trovata nei dati.")
    else:
        grouping_cols = ['VERSION', 'GENERATOR', 'LOSS', 'TEST_DATASET']
        
        # --- INIZIO MODIFICA ---
        
        # 1. Calcola sia la media che la deviazione standard con .agg()
        # Questo crea un DataFrame con colonne multi-livello (es. 'MAE' -> 'mean', 'std')
        summary_stats = outlier_free_df.groupby(grouping_cols)[final_metrics_to_analyze].agg(['mean', 'std'])

        # 2. Formatta le colonne delle metriche per mostrare "media ± std"
        for metric in final_metrics_to_analyze:
            # Estrai le colonne di media e std per la metrica corrente
            mean_col = (metric, 'mean')
            std_col = (metric, 'std')
            
            # Crea una nuova colonna formattata (es. "0.1234 ± 0.0123")
            # Usiamo .map() per applicare la formattazione a tutta la serie
            summary_stats[metric] = (
                summary_stats[mean_col].map('{:.4f}'.format) + 
                ' ± ' + 
                summary_stats[std_col].map('{:.4f}'.format)
            )

        # 3. Seleziona solo le colonne finali (quelle formattate) e reimposta l'indice
        # Il risultato finale avrà le colonne originali delle metriche, ma con i valori formattati.
            summary_df = outlier_free_df.groupby(grouping_cols)[final_metrics_to_analyze].agg(
                lambda s: f"{s.mean():.4f} \u00B1 {s.std():.4f}"
            ).reset_index()


    individual_dfs = {version: group for version, group in master_df.groupby('VERSION')}

    return individual_dfs, summary_df

In [46]:
MAIN_EXPERIMENT_PATH = 'lightning_logs/MRtoCT'
TESTING_DATASETS = ["RF", "SynthRAD2023"]

detailed_results, summary_results = analyze_experiment_results(MAIN_EXPERIMENT_PATH, TESTING_DATASETS)

Avvio analisi in: lightning_logs/MRtoCT

Rimozione outlier basata su 'SSIM'...
Analisi completata. Rimossi 79 record su 2296.


In [47]:
summary_results

Unnamed: 0,VERSION,GENERATOR,LOSS,TEST_DATASET,MAE,PSNR,SSIM,MM-SSIM
0,version_1,swinunetr_128,L1+PERCEPTUAL+STRUCTURAL,RF,0.0241 ± 0.0094,26.0741 ± 3.3469,0.9340 ± 0.0231,0.9480 ± 0.0169
1,version_1,swinunetr_128,L1+PERCEPTUAL+STRUCTURAL,SynthRAD2023,0.0243 ± 0.0079,24.5501 ± 1.8223,0.8635 ± 0.0248,0.8409 ± 0.0350
2,version_2,unet_128,L1+PERCEPTUAL+STRUCTURAL,RF,0.0310 ± 0.0114,24.2366 ± 3.0074,0.8359 ± 0.0599,0.9201 ± 0.0198
3,version_2,unet_128,L1+PERCEPTUAL+STRUCTURAL,SynthRAD2023,0.0251 ± 0.0112,24.6157 ± 2.3800,0.8410 ± 0.0361,0.8418 ± 0.0358
4,version_3,swinunetr_128,L1+STRUCTURAL,RF,0.0287 ± 0.0127,25.1301 ± 4.1493,0.9387 ± 0.0215,0.9487 ± 0.0173
5,version_3,swinunetr_128,L1+STRUCTURAL,SynthRAD2023,0.0234 ± 0.0113,24.9389 ± 2.4717,0.8729 ± 0.0244,0.8566 ± 0.0364
6,version_4,unet_128,L1+STRUCTURAL,RF,0.0300 ± 0.0114,24.1359 ± 3.1221,0.8749 ± 0.0446,0.9220 ± 0.0202
7,version_4,unet_128,L1+STRUCTURAL,SynthRAD2023,0.0236 ± 0.0112,25.0319 ± 2.5552,0.8523 ± 0.0354,0.8485 ± 0.0354
8,version_5,swinunetr_128,L1+PERCEPTUAL,RF,0.0232 ± 0.0086,26.0256 ± 2.9827,0.9118 ± 0.0269,0.9358 ± 0.0192
9,version_5,swinunetr_128,L1+PERCEPTUAL,SynthRAD2023,0.0229 ± 0.0103,24.9720 ± 2.3123,0.8625 ± 0.0269,0.8486 ± 0.0370


## Produce Latex table 

Partendo dal dataframe vado a creare una tabella importabile direttamente in latex

In [48]:
def create_latex_table_from_dataframe(
    summary_results: pd.DataFrame,
    metrics=None,
    metric_headers=None,
    dataset_order=None,
    caption="Confronto delle metriche di performance per diversi generatori e combinazioni di loss.",
    label="tab:generation_metrics",
    max_loss_len=20
) -> str:
    import textwrap
    import re
    from collections import OrderedDict

    # --- Parametri di default ---
    if metrics is None:
        metrics = ["MAE", "MM-SSIM", "PSNR", "SSIM"]

    if metric_headers is None:
        metric_headers = OrderedDict([
            ('MAE', 'MAE $\\downarrow$'),
            ('MM-SSIM', 'MM-SSIM $\\uparrow$'),
            ('PSNR', 'PSNR $\\uparrow$'),
            ('SSIM', 'SSIM $\\uparrow$')
        ])

    df = summary_results.copy()

    # Normalizza i nomi delle colonne
    col_map = {c.lower().strip(): c for c in df.columns}
    rename_dict = {}
    if 'generator' not in col_map:
        raise ValueError(f"Colonna 'generator' non trovata. Colonne disponibili: {list(df.columns)}")
    if 'loss' not in col_map:
        raise ValueError(f"Colonna 'loss' non trovata. Colonne disponibili: {list(df.columns)}")
    if 'test_dataset' not in col_map and 'dataset' not in col_map:
        raise ValueError(f"Colonna 'test_dataset' o 'dataset' non trovata. Colonne disponibili: {list(df.columns)}")

    rename_dict[col_map['generator']] = 'generator'
    rename_dict[col_map['loss']] = 'loss'
    if 'test_dataset' in col_map:
        rename_dict[col_map['test_dataset']] = 'test_dataset'
    else:
        rename_dict[col_map['dataset']] = 'test_dataset'

    df = df.rename(columns=rename_dict)

    # Normalizzo generator e loss
    df['generator'] = df['generator'].astype(str).str.upper().str.strip()
    df['loss'] = df['loss'].astype(str).str.upper().str.strip()

    # Assicuro metriche in uppercase
    metrics = [m.upper() for m in metrics]
    metric_headers = {m.upper(): v for m, v in metric_headers.items()}

    # Ordine dataset
    if dataset_order:
        lower_map = {d.lower(): d for d in dataset_order}
        df['dataset_order'] = pd.Categorical(
            df['test_dataset'].astype(str).str.lower(),
            categories=[d.lower() for d in dataset_order],
            ordered=True
        )
        dataset_display_map = {d.lower(): lower_map[d.lower()] for d in lower_map}
    else:
        df['dataset_order'] = df['test_dataset'].astype(str).str.lower()
        dataset_display_map = {d.lower(): d for d in df['test_dataset'].unique()}

    df['loss_order'] = df['loss'].str.count(r'\+')
    df = df.sort_values(by=['dataset_order', 'loss_order']).reset_index(drop=True)

    # Pivot senza calcolo media
    df_pivot = df.pivot_table(
        index=['test_dataset', 'loss'],
        columns='generator',
        values=metrics,
        aggfunc=lambda x: x.iloc[0]
    ).swaplevel(0, 1, axis=1).sort_index(axis=1)

    generators = df_pivot.columns.get_level_values(0).unique().tolist()

    latex = []
    latex.append("\\begin{table*}[htbp]")
    latex.append("    \\centering")
    latex.append("    \\small")  # riduce la dimensione del font
    latex.append("    \\renewcommand{\\arraystretch}{1.2}")
    latex.append("    \\setlength{\\tabcolsep}{4pt}")
    latex.append(f"    \\caption{{{caption}}}")
    latex.append(f"    \\label{{{label}}}")
    latex.append("    \\begin{tabular}{l" + "c" * (len(generators) * len(metrics)) + "}")
    latex.append("        \\toprule")

    # Header generatori
    latex.append("        \\multirow{2}{*}{Loss Function}" + "".join(
        [f" & \\multicolumn{{{len(metrics)}}}{{c}}{{{gen}}}" for gen in generators]
    ) + " \\\\")
    cmid = []
    for i in range(len(generators)):
        start = 2 + i * len(metrics)
        end = start + len(metrics) - 1
        cmid.append(f"\\cmidrule(lr){{{start}-{end}}}")
    latex.append("        " + " ".join(cmid))

    # Header metriche
    latex.append("         " + "".join(
        [f" & {metric_headers[m]}" for _ in generators for m in metrics]
    ) + " \\\\")
    latex.append("        \\midrule")

    # Corpo tabella
    last_dataset = None
    for (dataset, loss), row in df_pivot.iterrows():
        dataset_lower = str(dataset).lower()
        if dataset_lower != last_dataset:
            if last_dataset is not None:
                latex.append("        \\midrule")
            latex.append(f"        \\\\[ -0.8em ]")
            latex.append(f"        \\multicolumn{{{1 + len(generators)*len(metrics)}}}{{c}}{{\\textbf{{{dataset_display_map[dataset_lower]}}}}} \\\\")
            latex.append(f"        \\\\[ -0.5em ]")
            last_dataset = dataset_lower

        # Spezzatura nome loss
        loss_fmt = loss.replace('+', ' + ')
        if len(loss_fmt) > max_loss_len:
            loss_fmt = "\\makecell[l]{" + " \\\\ ".join(textwrap.wrap(loss_fmt, max_loss_len)) + "}"

        # Funzione di formattazione valori con std su riga separata
        def format_val(val):
            if pd.isnull(val):
                return "-"
            if isinstance(val, (int, float)):
                return f"{val:.3f}"
            if isinstance(val, str):
                match = re.match(r"\s*([+-]?\d*\.?\d+)\s*(?:±|\+?-)\s*([+-]?\d*\.?\d+)\s*", val)
                if match:
                    mean, std = match.groups()
                    try:
                        return f"\\makecell{{{float(mean):.3f} \\\\ {float(std):.3f}}}"
                    except ValueError:
                        return val
                return val
            return str(val)

        values = [format_val(val) for val in row.values]
        latex.append(f"        {loss_fmt} & " + " & ".join(values) + " \\\\")

    latex.append("        \\bottomrule")
    latex.append("    \\end{tabular}")
    latex.append("\\end{table*}")

    return "\n".join(latex)


In [49]:
latex_code = create_latex_table_from_dataframe(summary_results)
print(latex_code)

\begin{table*}[htbp]
    \centering
    \small
    \renewcommand{\arraystretch}{1.2}
    \setlength{\tabcolsep}{4pt}
    \caption{Confronto delle metriche di performance per diversi generatori e combinazioni di loss.}
    \label{tab:generation_metrics}
    \begin{tabular}{lcccccccc}
        \toprule
        \multirow{2}{*}{Loss Function} & \multicolumn{4}{c}{SWINUNETR_128} & \multicolumn{4}{c}{UNET_128} \\
        \cmidrule(lr){2-5} \cmidrule(lr){6-9}
          & MAE $\downarrow$ & MM-SSIM $\uparrow$ & PSNR $\uparrow$ & SSIM $\uparrow$ & MAE $\downarrow$ & MM-SSIM $\uparrow$ & PSNR $\uparrow$ & SSIM $\uparrow$ \\
        \midrule
        \\[ -0.8em ]
        \multicolumn{9}{c}{\textbf{RF}} \\
        \\[ -0.5em ]
        L1 & \makecell{0.023 \\ 0.008} & \makecell{0.931 \\ 0.020} & \makecell{26.064 \\ 3.050} & \makecell{0.925 \\ 0.019} & \makecell{0.030 \\ 0.012} & \makecell{0.927 \\ 0.020} & \makecell{24.655 \\ 3.423} & \makecell{0.840 \\ 0.064} \\
        L1 + PERCEPTUAL & \makecell{0

--- 

# Image Generation

Vado a produrre le immagini per il paper. In particolare ci sono differenti blocchi : 

- Comparison Image
- 3D stuck visualization
- Visualization of predict example

## Comparison image

Creazione di pannel  per comparazioni immagini

In [None]:
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from monai.transforms import Compose, LoadImage, EnsureChannelFirst, Orientation, Spacing
import SimpleITK as sitk



def plot_image_comparison(
    image_paths: list[str],
    image_names: list[str],
    output_path: str,
    views_to_show: list[str] = ["Assiale", "Coronale", "Sagittale"],
    title: str = '',
    reorientation: bool = True,
    ct_window_level: int = 40,    # NUOVO: Livello della finestra per CT (Soft Tissue)
    ct_window_width: int = 60,
    windowing : bool = False    # NUOVO: Ampiezza della finestra per CT (Soft Tissue)
):
    """
    Carica immagini NIfTI, le preprocessa e genera un confronto.
    Applica un windowing specifico per le immagini identificate come 'CT'.
    """
    if len(image_paths) != len(image_names):
        raise ValueError("La lunghezza di 'image_paths' e 'image_names' deve essere la stessa.")

    transform_list = [
        LoadImage(image_only=True, reader="nibabelreader"),
        EnsureChannelFirst()
    ]
    if reorientation:
        transform_list.extend([
            Orientation(axcodes="RAS"),
            Spacing(pixdim=(1.0, 1.0, 1.0), mode="bilinear")
        ])
    
    transforms = Compose(transform_list)

    loaded_images = []
    for path in image_paths:
        try:
            img = transforms(path)
            if img.dim() == 4:
                img = img.squeeze(0)
            loaded_images.append(img)
            print(f"Caricata e processata: {os.path.basename(path)}, shape finale: {img.shape}")
        except Exception as e:
            print(f"Errore durante il caricamento di {path}: {e}")
            return

    n_rows = len(loaded_images)
    n_cols = len(views_to_show)
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 5 * n_rows), facecolor='black', squeeze=False)
    fig.suptitle(title, fontsize=20, color='white')
    view_to_axis = {"Sagittale": 0, "Coronale": 1, "Assiale": 2}

    for i, (img_tensor, img_name) in enumerate(zip(loaded_images, image_names)):
        for j, view_name in enumerate(views_to_show):
            ax = axes[i, j]
            axis_idx = view_to_axis.get(view_name)
            if axis_idx is None:
                ax.axis('off')
                continue

            mid_slice_idx = img_tensor.shape[axis_idx] // 2
            slice_data = torch.index_select(img_tensor, axis_idx, torch.tensor([mid_slice_idx])).squeeze()
            slice_np = slice_data.cpu().numpy()
            
            if 'ct' in img_name.lower() and windowing :
                # Applica il windowing per la CT
                min_val = ct_window_level - (ct_window_width / 2)
                max_val = ct_window_level + (ct_window_width / 2)
                slice_np = np.clip(slice_np, min_val, max_val)
            else:
                # Applica la normalizzazione con percentile per altre modalità (es. MR)
                p_min, p_max = np.percentile(slice_np, [1, 99])
                slice_np = np.clip(slice_np, p_min, p_max)
            # -----------------------------------------------

            ax.imshow(np.rot90(slice_np), cmap='gray', aspect='equal')
            ax.axis('off')

            if j == 0:
                ax.text(-0.1, 0.5, img_name, transform=ax.transAxes, ha='right', va='center', fontsize=16, color='white', rotation=90)
            #if i == 0:
                #ax.set_title(view_name, fontsize=16, color='white')

    plt.tight_layout(rect=[0, 0, 1, 0.95])
    
    output_dir = os.path.dirname(output_path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
    try:
        plt.savefig(output_path, bbox_inches='tight', pad_inches=0.1, facecolor='black', dpi=300)
        print(f"Immagine di confronto salvata in: {output_path}")
    except Exception as e:
        print(f"Errore durante il salvataggio dell'immagine: {e}")
    
    plt.close(fig)

def plot_ct_mr_comparison(ct_path: str, mr_path: str, output_path: str):
    """
    Wrapper per la funzione generalizzata per mantenere la retrocompatibilità.
    Carica una CT e una MR e genera un'immagine di confronto.

    Args:
        ct_path (str): Percorso del file NIfTI della CT.
        mr_path (str): Percorso del file NIfTI della MR.
        output_path (str): Percorso dove salvare l'immagine PNG generata.
    """
    plot_image_comparison(
        image_paths=[ct_path, mr_path],
        image_names=["CT", "MR"],
        output_path=output_path,
        title='Paired CT and MR sample'
    )


### CTandMR

In [None]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/sub-1126260_ct.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/sub-1126260_t1w.nii.gz"
plot_ct_mr_comparison(ct_path, mr_path,"./CTandMR.jpeg")


NameError: name 'plot_ct_mr_comparison' is not defined

### Generated Comparison

In [17]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/CT/CTonTemplate/sub-1126260ctTemplatespace.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/MR/MRonTemplate/sub-1126260mrTemplateSpaceNormalized.nii.gz"
swin_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/CTtoMR/version_1/test_results/RF/sub-1126260/sub-1126260ctTemplatespace_generated_mr.nii.gz"
unet_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/CTtoMR/version_2/test_results/RF/sub-1126260/sub-1126260-ctTemplatespace_generated_mr.nii.gz"
plot_image_comparison([ct_path,mr_path,swin_gen,unet_gen],["CT","MR","SwinGen","UnetGen"],"/home/jovyan/work/repository/CT2MR/Generation_clean/ctTomr_RF.jpeg",windowing = True)

Caricata e processata: sub-1126260ctTemplatespace.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1126260mrTemplateSpaceNormalized.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1126260ctTemplatespace_generated_mr.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1126260-ctTemplatespace_generated_mr.nii.gz, shape finale: torch.Size([193, 229, 193])
Immagine di confronto salvata in: /home/jovyan/work/repository/CT2MR/Generation_clean/ctTomr_RF.jpeg


In [18]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/CT/CTonMR/sub-1126260ctMRspaceMRCTmasked.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/MR/sub-1126260mrMRCTmasked.nii.gz"
swin_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/MRtoCT/version_1/test_results/RF/sub-1126260/sub-1126260mrMRCTmasked_generated_mr.nii.gz"
unet_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/MRtoCT/version_2/test_results/RF/sub-1126260/sub-1126260mrMRCTmasked_generated_mr.nii.gz"
plot_image_comparison([mr_path,ct_path,swin_gen,unet_gen],["MR","CT","SwinGen","UnetGen"],"/home/jovyan/work/repository/CT2MR/Generation_clean/mrToct_RF.jpeg",windowing = False)

Caricata e processata: sub-1126260mrMRCTmasked.nii.gz, shape finale: torch.Size([180, 240, 240])
Caricata e processata: sub-1126260ctMRspaceMRCTmasked.nii.gz, shape finale: torch.Size([180, 240, 240])
Caricata e processata: sub-1126260mrMRCTmasked_generated_mr.nii.gz, shape finale: torch.Size([180, 240, 240])
Caricata e processata: sub-1126260mrMRCTmasked_generated_mr.nii.gz, shape finale: torch.Size([180, 240, 240])
Immagine di confronto salvata in: /home/jovyan/work/repository/CT2MR/Generation_clean/mrToct_RF.jpeg


In [19]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/SynthRAD2023/sub-1BB026/processed/CT/CTonTemplate/sub-1BB026-ctTemplatespace.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/SynthRAD2023/sub-1BB026/processed/MR/MRonTemplate/sub-1BB026-mrTemplateSpaceNormalized.nii.gz"
swin_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/CTtoMR/version_1/test_results/SynthRAD2023/sub-1BB026/sub-1BB026-ctTemplatespace_generated_mr.nii.gz"
unet_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/CTtoMR/version_2/test_results/SynthRAD2023/sub-1BB026/sub-1BB026-ctTemplatespace_generated_mr.nii.gz"
plot_image_comparison([ct_path,mr_path,swin_gen,unet_gen],["CT","MR","SwinGen","UnetGen"],"/home/jovyan/work/repository/CT2MR/Generation_clean/ctTOmr_synthrad.jpeg",windowing = True)

Caricata e processata: sub-1BB026-ctTemplatespace.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1BB026-mrTemplateSpaceNormalized.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1BB026-ctTemplatespace_generated_mr.nii.gz, shape finale: torch.Size([193, 229, 193])
Caricata e processata: sub-1BB026-ctTemplatespace_generated_mr.nii.gz, shape finale: torch.Size([193, 229, 193])
Immagine di confronto salvata in: /home/jovyan/work/repository/CT2MR/Generation_clean/ctTOmr_synthrad.jpeg


In [31]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/SynthRAD2023/sub-1BA141/processed/CT/CTonMR/sub-1BA141-ctMRspaceMRCTmasked.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/SynthRAD2023/sub-1BA141/processed/MR/sub-1BA141mrMRCTmasked.nii.gz"
swin_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/MRtoCT/version_1/test_results/SynthRAD2023/sub-1BA141/sub-1BA141mrMRCTmasked_generated_mr.nii.gz"
unet_gen = "/home/jovyan/work/repository/CT2MR/Generation_clean/lightning_logs/MRtoCT/version_2/test_results/SynthRAD2023/sub-1BA141/sub-1BA141mrMRCTmasked_generated_mr.nii.gz"
plot_image_comparison([mr_path,ct_path,swin_gen,unet_gen],["MR","CT","SwinGen","UnetGen"],"/home/jovyan/work/repository/CT2MR/Generation_clean/mrToct_synthrad.jpeg",windowing = False)

Caricata e processata: sub-1BA141mrMRCTmasked.nii.gz, shape finale: torch.Size([183, 257, 190])
Caricata e processata: sub-1BA141-ctMRspaceMRCTmasked.nii.gz, shape finale: torch.Size([183, 257, 190])
Caricata e processata: sub-1BA141mrMRCTmasked_generated_mr.nii.gz, shape finale: torch.Size([183, 257, 190])
Caricata e processata: sub-1BA141mrMRCTmasked_generated_mr.nii.gz, shape finale: torch.Size([183, 257, 190])
Immagine di confronto salvata in: /home/jovyan/work/repository/CT2MR/Generation_clean/mrToct_synthrad.jpeg


## 3D stuck visualization 

Visualizzazione a "pila" di volumi per grafico modello

In [None]:
import SimpleITK as sitk
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  
from matplotlib.colors import Normalize  
from typing import Union, Tuple, Optional

from data.datautils import window_image, get_window_viewing_value

def plot_slice_stack(volume_3d: np.ndarray, num_slices: int = 20, output_filename: str = "pila_di_slice.png", stride: int = 2, windowing: Optional[Union[str, Tuple[float, float]]] = None):
    """
    Crea una visualizzazione 3D di una pila di slice da un volume NumPy 3D.
    Questa funzione è CPU-bound a causa di Matplotlib.

    Args:
        volume_3d (np.ndarray): L'array NumPy 3D da visualizzare. Può anche essere un tensore PyTorch.
        num_slices (int): Numero di slice da visualizzare.
        output_filename (str): Nome del file per l'immagine di output.
        stride (int): Il passo per il campionamento della superficie (qualità vs velocità).
        windowing (Optional[Union[str, Tuple[float, float]]]): Applica il windowing.
            Può essere una stringa di preset (es. 'brain', 'bone') o una tupla (width, level).
            Se None, usa la normalizzazione con percentili.
    """

    if hasattr(volume_3d, 'cpu'): # Controlla se è un tensore PyTorch
        print("Rilevato tensore PyTorch, spostamento su CPU e conversione in NumPy...")
        volume_3d = volume_3d.cpu().numpy()
    

    if volume_3d.ndim > 3:
        volume_3d = volume_3d.squeeze()
        print(f"Array ridotto a 3 dimensioni, nuova shape: {volume_3d.shape}")


    depth = volume_3d.shape[0]
    central_slice_index = depth // 2

    slice_indices = np.linspace(0, central_slice_index, num_slices, dtype=int)
    selected_slices = volume_3d[slice_indices]

    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection='3d')

    windowing_applied = False
    if windowing:
        print(f"Applicando windowing: {windowing}")
        if isinstance(windowing, str):
            try:
                width, level = get_window_viewing_value(windowing)
                windowing_applied = True
            except ValueError as e:
                print(f"Attenzione: {e}. Ritorno alla normalizzazione con percentili.")
        elif isinstance(windowing, (tuple, list)) and len(windowing) == 2:
            width, level = windowing
            windowing_applied = True
        else:
            print(f"Attenzione: formato 'windowing' non valido. Ritorno alla normalizzazione con percentili.")
        
        if windowing_applied:

             volume_3d = window_image(volume_3d, window_width=width, window_level=level)
             vmin, vmax = np.min(volume_3d), np.max(volume_3d)
    
    if not windowing_applied:
        print("Nessun windowing specificato, uso la normalizzazione con percentili.")
        vmin = np.percentile(volume_3d, 1)
        vmax = np.percentile(volume_3d, 99)

    if vmin == vmax: 
        vmin, vmax = vmin - 1, vmax + 1
    norm = Normalize(vmin=vmin, vmax=vmax)

    for i, slice_2d in enumerate(selected_slices):
        height, width = slice_2d.shape
        x, y = np.meshgrid(np.arange(width), np.arange(height))
        z_position = slice_indices[i]
        
        facecolors = plt.cm.gray(norm(slice_2d))
        
        ax.plot_surface(x, y, np.full_like(x, z_position),
                        facecolors=facecolors, rstride=stride, cstride=stride, shade=False)

    ax.set_axis_off()

    # 'elev=85' guarda la pila quasi dall'alto.
    # 'azim=-90' ruota la vista per allinearla con l'asse.
    ax.view_init(elev=85, azim=-90)
    ax.dist = 7
    plt.savefig(output_filename, dpi=150, transparent=True, bbox_inches='tight', pad_inches=0)
    plt.close(fig)
    print(f"✅ Immagine salvata come '{output_filename}'")

def plot_slice_stack_from_file_sitk(image_path: str, num_slices: int = 20, output_filename: str = "pila_di_slice_sitk.png", stride: int = 4, windowing: Optional[Union[str, Tuple[float, float]]] = None):
    """
    Funzione wrapper per caricare un'immagine da file con SimpleITK e visualizzarla.

    Args:
        image_path (str): Percorso del file dell'immagine (es. NIfTI, DICOM).
        num_slices (int): Numero di slice da visualizzare.
        output_filename (str): Nome del file per l'immagine di output.
        stride (int): Il passo per il campionamento della superficie.
        windowing (Optional[Union[str, Tuple[float, float]]]): Preset o valori (width, level) per il windowing.
    """
    try:
        image = sitk.ReadImage(image_path)
        print(f"Immagine caricata con successo da: '{image_path}'")
        volume_3d = sitk.GetArrayFromImage(image)

        plot_slice_stack(volume_3d, num_slices, output_filename, stride, windowing)
    except Exception as e:
        print(f" Errore durante il caricamento o la visualizzazione dell'immagine: {e}")
        return

In [23]:
ct_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/CT/CTonTemplate/sub-1126260ctTemplatespace.nii.gz"
mr_path = "/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/MR/MRonTemplate/sub-1126260mrTemplateSpaceNormalized.nii.gz"



plot_slice_stack_from_file_sitk(
    image_path=mr_path, 
    num_slices=10, 
    output_filename="pila_di_slice_da_sitk.png",
    stride = 1
)


✅ Immagine caricata con successo da: '/home/jovyan/work/data/CTMR/DATASET/RF/sub-1126260/processed/MR/MRonTemplate/sub-1126260mrTemplateSpaceNormalized.nii.gz'
Nessun windowing specificato, uso la normalizzazione con percentili.
✅ Immagine salvata come 'pila_di_slice_da_sitk.png'
