# 01 - DICOM I/O and Anonymization

This notebook demonstrates how to use the `diffusemri` library for common DICOM (Digital Imaging and Communications in Medicine) operations. We will cover:

1.  Converting DICOM series (both anatomical and Diffusion-Weighted Imaging - DWI) to NIfTI format.
2.  Extracting b-values and b-vectors for DWI conversions.
3.  Anonymizing DICOM files to remove or modify patient-identifying information (PII).

In [None]:
import os
import shutil
import numpy as np
import pydicom
from pydicom.dataset import Dataset, FileMetaDataset
from pydicom.uid import generate_uid, ExplicitVRLittleEndian, ImplicitVRLittleEndian
import nibabel as nib
from datetime import datetime, time

# diffusemri library imports
# Ensure your Python path is set up correctly if running this notebook outside the repo root, 
# or that 'diffusemri' is installed (e.g. pip install -e . from repo root)
from data_io.dicom_utils import (
    convert_dwi_dicom_to_nifti,
    convert_dicom_to_nifti_main,
    anonymize_dicom_directory,
    anonymize_dicom_file,
    DEFAULT_ANONYMIZATION_TAGS # For inspection if desired
)

# Define temporary directories for our examples
BASE_TEMP_DIR = "temp_dicom_examples"
ANAT_DICOM_DIR = os.path.join(BASE_TEMP_DIR, "anat_dicom_series")
DWI_DICOM_DIR = os.path.join(BASE_TEMP_DIR, "dwi_dicom_series")
ANON_DWI_DICOM_DIR = os.path.join(BASE_TEMP_DIR, "dwi_dicom_series_anonymized")
ANON_DWI_DICOM_DIR_CUSTOM = os.path.join(BASE_TEMP_DIR, "dwi_dicom_series_anonymized_custom")

ANAT_NIFTI_OUT = os.path.join(BASE_TEMP_DIR, "anatomical.nii.gz")
DWI_NIFTI_OUT = os.path.join(BASE_TEMP_DIR, "dwi.nii.gz")
DWI_BVAL_OUT = os.path.join(BASE_TEMP_DIR, "dwi.bval")
DWI_BVEC_OUT = os.path.join(BASE_TEMP_DIR, "dwi.bvec")

# Clean up any previous runs
if os.path.exists(BASE_TEMP_DIR):
    shutil.rmtree(BASE_TEMP_DIR)
os.makedirs(BASE_TEMP_DIR, exist_ok=True)
os.makedirs(ANAT_DICOM_DIR, exist_ok=True)
os.makedirs(DWI_DICOM_DIR, exist_ok=True)

In [None]:
def create_dummy_dicom_file(filename, study_uid, series_uid, sop_instance_uid, instance_number, 
                             image_type="ORIGINAL\\PRIMARY\\AXIAL", patient_name="Test^Patient", patient_id="TestID001",
                             rows=64, cols=64, pixel_spacing=[1.0, 1.0], slice_thickness=5.0,
                             image_position=[0.0, 0.0, 0.0], image_orientation=[1,0,0,0,1,0],
                             dwi_tags=None, series_description="Test Series"):
    """Helper function to create a single dummy DICOM file."""
    ds = Dataset()
    ds.file_meta = FileMetaDataset()
    ds.file_meta.MediaStorageSOPClassUID = pydicom.uid.MRImageStorage # Standard MR Image
    ds.file_meta.MediaStorageSOPInstanceUID = sop_instance_uid
    ds.file_meta.TransferSyntaxUID = ImplicitVRLittleEndian # Common default

    # Patient and Study Information
    ds.PatientName = patient_name
    ds.PatientID = patient_id
    ds.StudyInstanceUID = study_uid
    ds.SeriesInstanceUID = series_uid
    ds.SOPInstanceUID = sop_instance_uid
    ds.SOPClassUID = pydicom.uid.MRImageStorage
    ds.InstanceNumber = instance_number
    ds.SeriesNumber = 1
    ds.AcquisitionNumber = 1
    ds.ImageType = image_type
    ds.SeriesDescription = series_description
    ds.StudyDate = datetime.now().strftime('%Y%m%d')
    ds.SeriesDate = datetime.now().strftime('%Y%m%d')
    ds.AcquisitionDate = datetime.now().strftime('%Y%m%d')
    ds.StudyTime = datetime.now().strftime('%H%M%S')
    ds.SeriesTime = datetime.now().strftime('%H%M%S')
    ds.AcquisitionTime = datetime.now().strftime('%H%M%S')
    
    # Image Pixel Data
    ds.Rows = rows
    ds.Columns = cols
    ds.PixelSpacing = pixel_spacing
    ds.SliceThickness = slice_thickness
    ds.ImagePositionPatient = image_position
    ds.ImageOrientationPatient = image_orientation
    ds.SamplesPerPixel = 1
    ds.PhotometricInterpretation = "MONOCHROME2"
    ds.BitsAllocated = 16
    ds.BitsStored = 16
    ds.HighBit = 15
    ds.PixelRepresentation = 0 # Unsigned integer
    
    # Create dummy pixel data (random uint16)
    pixel_array = np.random.randint(0, 4096, size=(rows, cols), dtype=np.uint16)
    ds.PixelData = pixel_array.tobytes()

    # Add DWI tags if provided
    if dwi_tags:
        # Siemens CSA header for bval (0029,1010) - often not directly used by converters, prefer standard tags
        # ds.add_new(0x00291010, 'FD', dwi_tags.get("b_value_csa", 0.0)) 
        if "DiffusionBValue" in dwi_tags:
            # Standard DICOM tag for b-value (0018,9087)
            ds.add_new(0x00189087, 'FD', float(dwi_tags["DiffusionBValue"])) 
        if "DiffusionGradientOrientation" in dwi_tags:
            # Standard DICOM tag for b-vector (0018,9089)
            ds.add_new(0x00189089, 'FD', [float(x) for x in dwi_tags["DiffusionGradientOrientation"]])
    
    pydicom.dcmwrite(filename, ds, write_like_original=False)
    return ds

## Part 1: DICOM to NIfTI Conversion

We'll start by creating some dummy DICOM series, first for a simple anatomical (non-DWI) scan, and then for a DWI scan.

### Creating Dummy Anatomical DICOM Data

In [None]:
def create_dummy_anatomical_dicom_series(output_dir, num_slices=5, rows=64, cols=64):
    os.makedirs(output_dir, exist_ok=True)
    study_uid = generate_uid()
    series_uid = generate_uid()
    print(f"Creating anatomical series in {output_dir}")
    for i in range(num_slices):
        instance_number = i + 1
        sop_instance_uid = generate_uid()
        # Vary ImagePositionPatient for each slice
        image_position = [0.0, 0.0, float(i * 5.0)] # 5mm slice increment
        filepath = os.path.join(output_dir, f"slice{instance_number:03d}.dcm")
        create_dummy_dicom_file(filepath, study_uid, series_uid, sop_instance_uid, instance_number,
                                image_position=image_position, series_description="T1w Anatomical")
    print(f"Created {num_slices} anatomical DICOM files.")

create_dummy_anatomical_dicom_series(ANAT_DICOM_DIR, num_slices=5)

### Running Anatomical DICOM to NIfTI Conversion

Now, let's convert this anatomical series to a NIfTI file using `convert_dicom_to_nifti_main`.

In [None]:
print(f"Converting anatomical DICOMs from {ANAT_DICOM_DIR} to {ANAT_NIFTI_OUT}")
success_anat = convert_dicom_to_nifti_main(ANAT_DICOM_DIR, ANAT_NIFTI_OUT)

if success_anat:
    print(f"Anatomical DICOM series converted successfully to {ANAT_NIFTI_OUT}")
    # Verify by loading the NIfTI file
    try:
        img = nib.load(ANAT_NIFTI_OUT)
        print(f"NIfTI image loaded: shape={img.shape}, affine=\n{img.affine}")
        # Check for JSON sidecar
        json_sidecar = ANAT_NIFTI_OUT.replace('.nii.gz', '.json')
        if os.path.exists(json_sidecar):
            print(f"JSON sidecar created: {json_sidecar}")
        else:
            print(f"JSON sidecar MISSING: {json_sidecar}")
    except Exception as e:
        print(f"Error loading or inspecting NIfTI file: {e}")
else:
    print("Anatomical DICOM conversion failed.")

### Creating Dummy DWI DICOM Data

Next, we create a dummy DWI series. This is more complex as it involves DWI-specific tags like b-values and gradient directions.

In [None]:
def create_dummy_dwi_dicom_series(output_dir, num_volumes=4, num_slices_per_vol=3, rows=32, cols=32):
    os.makedirs(output_dir, exist_ok=True)
    study_uid = generate_uid()
    series_uid = generate_uid()
    print(f"Creating DWI series in {output_dir}")
    
    # Define b-values and b-vectors for the volumes
    # Example: 1 b0, 3 diffusion directions
    b_values = [0, 1000, 1000, 1000] 
    b_vectors = [[0,0,0], [1,0,0], [0,1,0], [0,0,1]]
    if num_volumes != len(b_values):
        print(f"Warning: num_volumes ({num_volumes}) does not match b_values length ({len(b_values)}). Adjusting num_volumes.")
        num_volumes = len(b_values)

    instance_counter = 0
    for vol_idx in range(num_volumes):
        dwi_specific_tags = {
            "DiffusionBValue": b_values[vol_idx],
            "DiffusionGradientOrientation": b_vectors[vol_idx]
        }
        for slice_idx in range(num_slices_per_vol):
            instance_counter += 1
            sop_instance_uid = generate_uid()
            image_position = [0.0, 0.0, float(slice_idx * 5.0)] # 5mm slice increment
            filepath = os.path.join(output_dir, f"vol{vol_idx:02d}_slice{slice_idx:03d}.dcm")
            create_dummy_dicom_file(filepath, study_uid, series_uid, sop_instance_uid, instance_counter,
                                    image_position=image_position, dwi_tags=dwi_specific_tags,
                                    rows=rows, cols=cols, series_description="DWI Series")
    print(f"Created {instance_counter} DWI DICOM files ({num_volumes} volumes, {num_slices_per_vol} slices/vol).")

create_dummy_dwi_dicom_series(DWI_DICOM_DIR, num_volumes=4, num_slices_per_vol=3)

### Running DWI DICOM to NIfTI Conversion

This conversion uses `convert_dwi_dicom_to_nifti` and will also produce `.bval` and `.bvec` files.

In [None]:
print(f"Converting DWI DICOMs from {DWI_DICOM_DIR} to NIfTI/bval/bvec...")
success_dwi = convert_dwi_dicom_to_nifti(DWI_DICOM_DIR, DWI_NIFTI_OUT, DWI_BVAL_OUT, DWI_BVEC_OUT)

if success_dwi:
    print(f"DWI DICOM series converted successfully:")
    print(f"  NIfTI: {DWI_NIFTI_OUT}")
    print(f"  bval:  {DWI_BVAL_OUT}")
    print(f"  bvec:  {DWI_BVEC_OUT}")
    
    # Verify by loading NIfTI and checking bval/bvec content
    try:
        img_dwi = nib.load(DWI_NIFTI_OUT)
        print(f"DWI NIfTI loaded: shape={img_dwi.shape}, affine=\n{img_dwi.affine}")
        
        if os.path.exists(DWI_BVAL_OUT):
            bvals = np.loadtxt(DWI_BVAL_OUT)
            print(f"bvals loaded: {bvals}")
        else:
            print(f"bval file MISSING: {DWI_BVAL_OUT}")
            
        if os.path.exists(DWI_BVEC_OUT):
            bvecs = np.loadtxt(DWI_BVEC_OUT)
            print(f"bvecs loaded: shape={bvecs.shape}\n{bvecs}")
        else:
            print(f"bvec file MISSING: {DWI_BVEC_OUT}")
        json_sidecar_dwi = DWI_NIFTI_OUT.replace('.nii.gz', '.json')
        if os.path.exists(json_sidecar_dwi):
            print(f"JSON sidecar created: {json_sidecar_dwi}")
        else:
            print(f"JSON sidecar MISSING: {json_sidecar_dwi}")
            
    except Exception as e:
        print(f"Error loading or inspecting DWI NIfTI/bval/bvec: {e}")
else:
    print("DWI DICOM conversion failed.")

## Part 2: DICOM Anonymization

DICOM anonymization is crucial for protecting patient privacy when sharing or archiving data. The `diffusemri` library provides tools to anonymize DICOM files by removing or modifying identifying tags.

### Anonymizing a Directory of DICOM Files

We'll use the dummy DWI series we created earlier (`DWI_DICOM_DIR`) as input for anonymization.

In [None]:
print(f"Anonymizing DICOM directory: {DWI_DICOM_DIR}")
print(f"Outputting anonymized files to: {ANON_DWI_DICOM_DIR}")

# The anonymize_dicom_directory function should create the output directory if it doesn't exist.
processed_count, failed_count = anonymize_dicom_directory(DWI_DICOM_DIR, ANON_DWI_DICOM_DIR)

print(f"Anonymization complete: {processed_count} files processed, {failed_count} files failed.")

# Verify anonymization by checking some tags from an anonymized file
if processed_count > 0 and os.path.exists(ANON_DWI_DICOM_DIR):
    try:
        # List files, ensuring to pick a .dcm file if other files might exist
        anon_files = [f for f in os.listdir(ANON_DWI_DICOM_DIR) if f.endswith('.dcm')]
        if anon_files:
            first_anon_file_path = os.path.join(ANON_DWI_DICOM_DIR, anon_files[0])
            ds_anon = pydicom.dcmread(first_anon_file_path)
            print(f"--- Tags from first anonymized file ({anon_files[0]}) ---")
            print(f"PatientName: {ds_anon.get('PatientName', 'Not Present')}")
            print(f"PatientID: {ds_anon.get('PatientID', 'Not Present')}")
            print(f"PatientBirthDate: {ds_anon.get('PatientBirthDate', 'Not Present')}")
            # Check a tag that should be preserved
            print(f"SeriesDescription: {ds_anon.get('SeriesDescription', 'Not Present')}") 
            print(f"DiffusionBValue: {ds_anon.get(0x00189087, 'Not Present')}") # Access by tag number
        else:
            print("No .dcm files found in the anonymized directory for verification.")
    except Exception as e:
        print(f"Error reading or checking anonymized file: {e}")
else:
    print("No files were processed or output directory not found, skipping verification.")

### (Optional) Anonymizing a Single DICOM File

You can also anonymize a single DICOM file using `anonymize_dicom_file`.

In [None]:
if os.path.exists(DWI_DICOM_DIR) and len(os.listdir(DWI_DICOM_DIR)) > 0:
    # Pick the first file from the original DWI series for this example
    dwi_files = [f for f in os.listdir(DWI_DICOM_DIR) if f.endswith('.dcm')]
    if dwi_files:
        single_file_to_anonymize = os.path.join(DWI_DICOM_DIR, dwi_files[0])
        anonymized_single_file_path = os.path.join(BASE_TEMP_DIR, "single_anonymized.dcm")
        
        print(f"\nAnonymizing single file: {single_file_to_anonymize}")
        success_single = anonymize_dicom_file(single_file_to_anonymize, anonymized_single_file_path)
        
        if success_single and os.path.exists(anonymized_single_file_path):
            print(f"Single file anonymized successfully to: {anonymized_single_file_path}")
            ds_single_anon = pydicom.dcmread(anonymized_single_file_path)
            print(f"Anonymized PatientName (single file): {ds_single_anon.get('PatientName', 'Not Present')}")
            os.remove(anonymized_single_file_path) # Clean up this specific file
        else:
            print("Single file anonymization failed or output file not found.")
    else:
        print("No .dcm files found in DWI_DICOM_DIR for single file anonymization example.")
else:
    print("Original DWI DICOM directory does not exist or is empty, skipping single file anonymization example.")

### (Optional) Custom Anonymization Rules

The anonymization functions use a default set of rules (`DEFAULT_ANONYMIZATION_TAGS` from `dicom_utils`). You can provide custom rules as a dictionary. 
The keys can be DICOM tag keywords (e.g., 'PatientName') or (group, element) tuples (e.g., (0x0010, 0x0010)).
The values can be:
*   A new string/value to replace the tag's current value.
*   A callable function that takes the original value and returns a new value.
*   Special markers from `dicom_utils` like `_REMOVE_TAG_` (to delete the tag) or `_EMPTY_STRING_`.

In [None]:
from data_io.dicom_utils import _REMOVE_TAG_, _EMPTY_STRING_

custom_rules = {
    "PatientName": "ANON_PatientXYZ", # Replace with a specific string
    "PatientID": _EMPTY_STRING_,      # Replace with an empty string
    "SeriesDescription": lambda desc: f"Processed_{desc if desc else 'Series'}", # Modify existing
    (0x0010, 0x0030): None,             # PatientBirthDate - same as _REMOVE_TAG_ if value is None
    "InstitutionName": _REMOVE_TAG_    # Remove this tag entirely
}

print(f"\nAnonymizing DICOM directory {DWI_DICOM_DIR} with CUSTOM rules...")
print(f"Outputting to: {ANON_DWI_DICOM_DIR_CUSTOM}")

processed_custom, failed_custom = anonymize_dicom_directory(
    DWI_DICOM_DIR, 
    ANON_DWI_DICOM_DIR_CUSTOM, 
    anonymization_rules=custom_rules
)

print(f"Custom anonymization complete: {processed_custom} files processed, {failed_custom} files failed.")

if processed_custom > 0 and os.path.exists(ANON_DWI_DICOM_DIR_CUSTOM):
    try:
        custom_anon_files = [f for f in os.listdir(ANON_DWI_DICOM_DIR_CUSTOM) if f.endswith('.dcm')]
        if custom_anon_files:
            first_custom_anon_file_path = os.path.join(ANON_DWI_DICOM_DIR_CUSTOM, custom_anon_files[0])
            ds_custom_anon = pydicom.dcmread(first_custom_anon_file_path)
            print(f"--- Tags from first custom anonymized file ({custom_anon_files[0]}) ---")
            print(f"PatientName: {ds_custom_anon.get('PatientName', 'Not Present')}")
            print(f"PatientID: {ds_custom_anon.get('PatientID', 'Not Present')}")
            print(f"SeriesDescription: {ds_custom_anon.get('SeriesDescription', 'Not Present')}")
            print(f"InstitutionName: {ds_custom_anon.get('InstitutionName', 'Not Present (Good!)')}")
        else:
            print("No .dcm files found in the custom anonymized directory for verification.")
    except Exception as e:
        print(f"Error reading or checking custom anonymized file: {e}")
else:
    print("No files were processed (custom) or output directory not found, skipping verification.")

## Cleanup

Finally, let's remove the temporary directories and files we created.

In [None]:
print(f"Cleaning up temporary directory: {BASE_TEMP_DIR}")
if os.path.exists(BASE_TEMP_DIR):
    shutil.rmtree(BASE_TEMP_DIR)
print("Cleanup complete.")