In [9]:
import nibabel as nib
import config.config as cf
import importlib
import numpy as np
from nibabel.orientations import aff2axcodes
from dipy.denoise.nlmeans import nlmeans
from dipy.denoise.noise_estimate import estimate_sigma
import subprocess
import os
from pathlib import Path
import ants
from nibabel.orientations import axcodes2ornt, io_orientation, inv_ornt_aff, apply_orientation
import shutil

importlib.reload(cf)


2025-07-22 13:18:34,603 - logger - INFO - Logger initialized
2025-07-22 13:18:34,603 - logger - INFO - Logger initialized
2025-07-22 13:18:34,603 - logger - INFO - Logger initialized


<module 'config.config' from 'C:\\Users\\User\\OneDrive\\Uni\\6_semester\\Bachelorarbeit\\project\\config\\config.py'>

In [None]:
train = os.path.join(cf.msseg2016, "Preprocessed/Training")


In [7]:
def patients_affine(patient_id, center, set, file):
    print(f"Patient {patient_id} from Center {center} of {set}:")
    img = nib.load(file)
    affine = img.affine
    print(affine)
    print(20*"-")


In [10]:
def windows_to_wsl_path(win_path: str) -> str:
    p = Path(win_path).resolve()
    drive = p.drive[0].lower()  # 'C:' -> 'c'
    rest = p.parts[1:]  # skip the 'C:\'
    return f"/mnt/{drive}/" + "/".join(rest)


# MSSEG 2016

## Reorienting

In [None]:
def reorient_to_std(input, output):
    try:
        input_wsl = windows_to_wsl_path(input)
        output_wsl = windows_to_wsl_path(output)
        
        cmd = [
            "wsl", "bash", "-ic",
            f"fslreorient2std '{f"{input_wsl}"}' '{f"{output_wsl}"}'"
        ]
        subprocess.run(cmd)
        before = nib.load(input)
        after = nib.load(output)
        orient_before = aff2axcodes(before.affine)
        orient_after = aff2axcodes(after.affine)
        
        print(f"Switching from {orient_before} to {orient_after}")
        print("Reoriented")
    except Exception as e:
        print(f"{e}")

for split in ["Training", "Testing"]:
    split_path = Path(cf.msseg2016) / split
    for center_path in split_path.iterdir():
        if center_path.is_dir():
            for patient_path in center_path.iterdir():
                print(f"üßç {patient_path.name} of {center_path.name} from {split}")
                if patient_path.is_dir():
                    for folder in ["Preprocessed_Data", "Raw_Data", "Masks"]:
                        modality_path = patient_path / folder
                        if modality_path.exists():
                            for nii_file in modality_path.glob("*.nii*"):
                                print(f"Reorienting {nii_file.name}")
                                output = modality_path / "reoriented"
                                output.mkdir(parents=True, exist_ok=True)
                                output_reoriented = output / f"{nii_file.name.replace('.nii.gz','')}_reoriented.nii.gz"
                                reorient_to_std(nii_file, output_reoriented)


In [None]:
for split in ["Training", "Testing"]:
    split_path = Path(cf.msseg2016) / split
    for center_path in split_path.iterdir():
        if center_path.is_dir():
            for patient_path in center_path.iterdir():
                if patient_path.is_dir():
                    print(f"{patient_path.name} of {center_path.name} from {split}")
                    for folder in ["Preprocessed_Data"]:
                        modality_path = patient_path / folder
                        if modality_path.exists():
                            for nii_file in modality_path.glob("*.nii*"):
                                before = nib.load(nii_file)
                                output = modality_path / "reoriented"
                                after = nib.load(output / (nii_file.stem.replace('.nii', '') + '_reoriented.nii.gz'))
                                
                                orient_before = before.affine
                                orient_after = after.affine
                                
                                print(f"File: {nii_file.name}")
                                print(f"    Before:  {orient_before}")
                                print(f"    After: {orient_after}")
                                print("-" * 40)

## Denoising

In [None]:
for split in ["Training", "Testing"]:
    split_path = Path(cf.msseg2016) / split
    for center_path in split_path.iterdir():
        if center_path.is_dir():
            for patient_path in center_path.iterdir():
                if patient_path.is_dir():
                    print(f"{patient_path.name} of {center_path.name} from {split}")

                    for folder in ["Raw_Data"]:
                        modality_path = patient_path / folder
                        if modality_path.exists():
                            input = modality_path / "reoriented"
                            output = modality_path / "denoised"
                            output.mkdir(parents=True, exist_ok=True)
                            for mod in ["FLAIR", "DP", "GADO", "T1", "T2"]:
                                
                                img = nib.load(f"{input}/{mod}_reoriented.nii.gz")
                                data = img.get_fdata()
                                affine = img.affine
                            
                                sigma = estimate_sigma(data, N=4)
                                denoised = nlmeans(data, sigma=sigma)
                            
                                output_denoised = f"{output}/{mod}_denoised.nii.gz"
                            
                                nib.save(nib.Nifti1Image(denoised, affine), output_denoised)
                                print(f"Finished {mod} of {patient_path.name} of {center_path.name} from {split}")

## Registering

In [None]:
for split in ["Testing", "Training"]:
    split_path = Path(cf.msseg2016) / split
    for center_path in split_path.iterdir():
        if center_path.is_dir():
            for patient_path in center_path.iterdir():
                if patient_path.is_dir():
                    print(f"{patient_path.name} of {center_path.name} from {split}")

                    
                    for folder in ["Raw_Data"]:
                        modality_path = patient_path / folder
                        if modality_path.exists():
                            input = modality_path / "denoised"
                            output = modality_path / "registered"
                            output.mkdir(parents=True, exist_ok=True)
                            flair_path = f"{input}/FLAIR_denoised.nii.gz"
                            img = nib.load(flair_path)
                            output_registered = output / "FLAIR_denoised.nii.gz"
                            nib.save(img, output_registered)
                            flair_path_wsl = windows_to_wsl_path(output_registered)
                            
                            for mod in ["DP", "GADO", "T1", "T2"]:
                                registered_path = f"{output}/{mod}_rigid_to_FLAIR.nii.gz"
                                input_path = f"{input}/{mod}_denoised.nii.gz"
                                input_wsl = windows_to_wsl_path(input_path)
                                output_wsl = windows_to_wsl_path(registered_path)
                                cmd_str = f"flirt -in '{input_wsl}' -ref '{flair_path_wsl}' -out '{output_wsl}' -dof 6 -interp trilinear"
                                cmd = ["wsl", "bash", "-ic", cmd_str]
                                result = subprocess.run(cmd, capture_output=True, text=True)
                                print(f"    Rigid Registration of {mod} to FLAIR")

                    

## N4 Bias correction

In [None]:
for split in ["Training", "Testing"]:
    split_path = Path(cf.msseg2016) / split
    for center_path in split_path.iterdir():
        if center_path.is_dir():
            for patient_path in center_path.iterdir():
                if patient_path.is_dir():
                    print(f"{patient_path.name} of {center_path.name} from {split}")
                    #if f"{center_path.name}" == "Center_01" and split == "Training" or f"{center_path.name}" == "Center_07" and split == "Training":
                    #    continue
                    for folder in ["Raw_Data"]:
                        modality_path = patient_path / folder
                        if modality_path.exists():
                            input = modality_path / "registered"
                            output = modality_path / "bias_corrected"
                            output.mkdir(parents=True, exist_ok=True)
                            for mod in ["FLAIR", "DP", "GADO", "T1", "T2"]:
                                print(f"    n4 Bias Correction {mod}")
                                if mod == "FLAIR":
                                    img = ants.image_read(f"{input}/{mod}_denoised.nii.gz")
                                else:
                                    img = ants.image_read(f"{input}/{mod}_rigid_to_FLAIR.nii.gz")
                                    
                                corrected = ants.n4_bias_field_correction(img)
                                ants.image_write(corrected, f"{output}/{mod}_preprocessed_withskull.nii.gz")
                                

# MSLesSeg

In [11]:
def reorient_to_ras(input_path, output_path):
    try:
        img = nib.load(str(input_path))
        current_ornt = io_orientation(img.affine)
        ras_ornt = axcodes2ornt(('R', 'A', 'S'))
        transform = nib.orientations.ornt_transform(current_ornt, ras_ornt)
        reoriented_data = apply_orientation(img.get_fdata(), transform)
        new_affine = img.affine @ inv_ornt_aff(transform, img.shape)
        reoriented_img = nib.Nifti1Image(reoriented_data, new_affine, header=img.header)
        print(f"Reoriented from {aff2axcodes(img.affine)} to {aff2axcodes(new_affine)}")
        nib.save(reoriented_img, str(output_path))

    except Exception as e:
        print(f"Error: {e}")

In [None]:
split_path = Path(cf.mslesseg) / "train"
for patient_path in split_path.iterdir():
    if patient_path.is_dir():
        for timepoint_path in patient_path.iterdir():
            print(f"{timepoint_path.name} of {patient_path.name} from {split}")
            if timepoint_path.is_dir():
                for nii_file in timepoint_path.glob("*.nii*"):
                    print(f"    Reorienting {nii_file.name}")
                    output = timepoint_path / "reoriented"
                    output.mkdir(parents=True, exist_ok=True)
                    output_reoriented = output / f"{nii_file.name.replace('.nii.gz','')}_reoriented.nii.gz"
                    reorient_to_ras(nii_file, output_reoriented)

split_path = Path(cf.mslesseg) / "test"
for patient_path in split_path.iterdir():
    if patient_path.is_dir():
        print(f"{patient_path.name} from {split}")
        for nii_file in patient_path.glob("*.nii*"):
            print(f"    Reorienting {nii_file.name}")
            output = patient_path / "reoriented"
            output.mkdir(parents=True, exist_ok=True)
            output_reoriented = output / f"{nii_file.name.replace('.nii.gz','')}_reoriented.nii.gz"
            reorient_to_ras(nii_file, output_reoriented)


### Recreating splits for MSLesSeg

In [27]:
import json
from collections import defaultdict

all_cases = [
    f"{patient_path.name}_{timepoint_path.name}"
    for patient_path in split_path.iterdir() if patient_path.is_dir()
    for timepoint_path in patient_path.iterdir()
]

patient_dict = defaultdict(list)
for case in all_cases:
    num = int(case.split("_")[0][1:])  
    patient_dict[num].append(case)

def get_cases_for_ranges(ranges):
    cases = []
    for r in ranges:
        if isinstance(r, int):
            cases += patient_dict.get(r, [])
        else:
            start, end = r
            for i in range(start, end + 1):
                cases += patient_dict.get(i, [])
    return sorted(cases)

folds = [
    {"train": [(1, 43)], "val": [(44, 53)]},
    {"train": [(1, 33), (44, 53)], "val": [(34, 43)]},
    {"train": [(1, 23), (34, 53)], "val": [(24, 33)]},
    {"train": [(1, 13), (24, 53)], "val": [(14, 23)]},
    {"train": [(11, 53)], "val": [(1, 10)]},
]

splits = []
for fold in folds:
    split = {
        "train": get_cases_for_ranges(fold["train"]),
        "val": get_cases_for_ranges(fold["val"])
    }
    splits.append(split)

with open(f"{cf.nnUNet_preprocessed}/{cf.dataset_211}/splits_final.json", "w") as f:
    json.dump(splits, f, indent=4)

print(f"splits_final.json for {cf.dataset_211} created")


splits_final.json erfolgreich erstellt.
