# LESSON 6: DICOM Anonymization
## Biomedical Image Processing - DICOM Module

In this lesson:
- Why anonymization is important (HIPAA, GDPR)
- Which tags contain PHI (Protected Health Information)
- Different anonymization strategies
- Implementing a DICOM anonymizer

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pydicom
from pydicom.dataset import Dataset, FileMetaDataset
from pydicom.uid import generate_uid
import os
import hashlib
from datetime import datetime, timedelta
import random
import string

print("Libraries imported successfully!")

## 1. Why Anonymization?

### Legal Requirements:
- **HIPAA** (USA): Health Insurance Portability and Accountability Act
- **GDPR** (EU): General Data Protection Regulation
- **KVKK** (Turkey): Kişisel Verilerin Korunması Kanunu

### Use Cases:
1. **Research**: Sharing data for academic research
2. **AI/ML Training**: Training machine learning models
3. **Education**: Teaching and demonstrations
4. **Multi-site Studies**: Collaborative research

### Protected Health Information (PHI):
- Patient name, ID, birth date
- Dates (study, admission, etc.)
- Phone numbers, addresses
- Device identifiers
- Biometric data
- Full-face photographs

## 2. PHI Tags in DICOM

DICOM Supplement 142 defines **Basic Profile** for de-identification.

In [None]:
# Tags containing PHI (Protected Health Information)
PHI_TAGS = {
    # Patient Information
    (0x0010, 0x0010): 'PatientName',
    (0x0010, 0x0020): 'PatientID',
    (0x0010, 0x0030): 'PatientBirthDate',
    (0x0010, 0x0032): 'PatientBirthTime',
    (0x0010, 0x0040): 'PatientSex',
    (0x0010, 0x1000): 'OtherPatientIDs',
    (0x0010, 0x1001): 'OtherPatientNames',
    (0x0010, 0x1010): 'PatientAge',
    (0x0010, 0x1020): 'PatientSize',
    (0x0010, 0x1030): 'PatientWeight',
    (0x0010, 0x1040): 'PatientAddress',
    (0x0010, 0x2154): 'PatientTelephoneNumbers',
    
    # Study Information
    (0x0008, 0x0020): 'StudyDate',
    (0x0008, 0x0021): 'SeriesDate',
    (0x0008, 0x0022): 'AcquisitionDate',
    (0x0008, 0x0023): 'ContentDate',
    (0x0008, 0x0030): 'StudyTime',
    (0x0008, 0x0031): 'SeriesTime',
    (0x0008, 0x0032): 'AcquisitionTime',
    (0x0008, 0x0033): 'ContentTime',
    (0x0008, 0x0050): 'AccessionNumber',
    (0x0008, 0x0080): 'InstitutionName',
    (0x0008, 0x0081): 'InstitutionAddress',
    (0x0008, 0x0090): 'ReferringPhysicianName',
    (0x0008, 0x1010): 'StationName',
    (0x0008, 0x1030): 'StudyDescription',
    (0x0008, 0x1048): 'PhysiciansOfRecord',
    (0x0008, 0x1050): 'PerformingPhysicianName',
    (0x0008, 0x1070): 'OperatorsName',
    
    # UIDs (can be used to link back to patient)
    (0x0020, 0x000D): 'StudyInstanceUID',
    (0x0020, 0x000E): 'SeriesInstanceUID',
    (0x0008, 0x0018): 'SOPInstanceUID',
    (0x0020, 0x0052): 'FrameOfReferenceUID',
}

print("PHI TAGS TO CONSIDER FOR ANONYMIZATION")
print("=" * 60)
for tag, name in PHI_TAGS.items():
    print(f"({tag[0]:04X},{tag[1]:04X}) - {name}")

## 3. Create Sample DICOM with PHI

In [None]:
def create_sample_with_phi():
    """Create a sample DICOM with realistic PHI for testing."""
    
    file_meta = FileMetaDataset()
    file_meta.MediaStorageSOPClassUID = pydicom.uid.CTImageStorage
    file_meta.MediaStorageSOPInstanceUID = generate_uid()
    file_meta.TransferSyntaxUID = pydicom.uid.ExplicitVRLittleEndian
    file_meta.ImplementationClassUID = generate_uid()
    
    ds = Dataset()
    ds.file_meta = file_meta
    ds.is_little_endian = True
    ds.is_implicit_VR = False
    ds.preamble = b'\x00' * 128
    
    # Patient PHI
    ds.PatientName = "Yilmaz^Ahmet^Bay"
    ds.PatientID = "TC12345678901"
    ds.PatientBirthDate = "19850315"
    ds.PatientSex = "M"
    ds.PatientAge = "039Y"
    ds.PatientWeight = 78.5
    ds.PatientAddress = "Ataturk Cad. No:123 Ankara"
    ds.PatientTelephoneNumbers = "+90 532 123 4567"
    
    # Study PHI
    ds.StudyDate = "20240115"
    ds.StudyTime = "143052.123456"
    ds.SeriesDate = "20240115"
    ds.SeriesTime = "143125.789"
    ds.AcquisitionDate = "20240115"
    ds.AcquisitionTime = "143130.456"
    ds.StudyDescription = "BT Toraks - Ahmet Yilmaz icin"
    ds.AccessionNumber = "ACC2024011501"
    
    # Institution PHI
    ds.InstitutionName = "Ankara Universitesi Tip Fakultesi"
    ds.InstitutionAddress = "Sihhiye, Ankara, Turkiye"
    ds.StationName = "CT_SCANNER_ROOM1"
    ds.ReferringPhysicianName = "Dr^Mehmet^Demir"
    ds.PerformingPhysicianName = "Teknisyen^Ayse^Kaya"
    ds.OperatorsName = "Operator1"
    
    # UIDs
    ds.StudyInstanceUID = generate_uid()
    ds.SeriesInstanceUID = generate_uid()
    ds.SOPClassUID = pydicom.uid.CTImageStorage
    ds.SOPInstanceUID = file_meta.MediaStorageSOPInstanceUID
    ds.FrameOfReferenceUID = generate_uid()
    
    # Other metadata
    ds.Modality = "CT"
    ds.SeriesDescription = "Axial 2.5mm"
    ds.SeriesNumber = 1
    ds.InstanceNumber = 42
    ds.Manufacturer = "Medical Imaging Co."
    ds.ManufacturerModelName = "UltraCT 5000"
    
    # Image parameters
    ds.Rows = 64
    ds.Columns = 64
    ds.BitsAllocated = 16
    ds.BitsStored = 12
    ds.HighBit = 11
    ds.PixelRepresentation = 1
    ds.SamplesPerPixel = 1
    ds.PhotometricInterpretation = "MONOCHROME2"
    ds.PixelSpacing = [1.0, 1.0]
    ds.RescaleIntercept = -1024
    ds.RescaleSlope = 1
    ds.WindowCenter = 40
    ds.WindowWidth = 400
    ds.SliceThickness = 2.5
    ds.SliceLocation = 105.0
    
    # Create pixel data
    np.random.seed(42)
    pixel_array = np.random.randint(900, 1100, (64, 64), dtype=np.int16)
    ds.PixelData = pixel_array.tobytes()
    
    return ds

# Create sample
original_ds = create_sample_with_phi()

print("ORIGINAL DICOM WITH PHI")
print("=" * 60)
print(f"Patient Name: {original_ds.PatientName}")
print(f"Patient ID: {original_ds.PatientID}")
print(f"Birth Date: {original_ds.PatientBirthDate}")
print(f"Address: {original_ds.PatientAddress}")
print(f"Phone: {original_ds.PatientTelephoneNumbers}")
print(f"Study Date: {original_ds.StudyDate}")
print(f"Institution: {original_ds.InstitutionName}")
print(f"Referring Physician: {original_ds.ReferringPhysicianName}")

## 4. Anonymization Strategies

### Strategy 1: Remove (Delete tags)
### Strategy 2: Empty (Set to empty string)
### Strategy 3: Replace (Replace with dummy values)
### Strategy 4: Hash (Replace with hash for consistency)
### Strategy 5: Shift (Shift dates by random offset)

In [None]:
class DicomAnonymizer:
    """
    DICOM Anonymization utility.
    
    Supports multiple strategies:
    - remove: Delete the tag
    - empty: Set to empty string
    - replace: Replace with dummy value
    - hash: Replace with hash (maintains consistency)
    - shift: Shift dates by random offset
    """
    
    def __init__(self, seed=None):
        """Initialize anonymizer with optional random seed for reproducibility."""
        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)
        
        # Date shift (random days between -365 and +365)
        self.date_shift_days = random.randint(-365, 365)
        
        # Hash cache for consistent hashing
        self.hash_cache = {}
        
        # Counter for anonymous IDs
        self.anon_counter = 0
    
    def _hash_value(self, value, length=8):
        """Create a hash of a value."""
        str_value = str(value)
        if str_value not in self.hash_cache:
            hash_obj = hashlib.sha256(str_value.encode())
            self.hash_cache[str_value] = hash_obj.hexdigest()[:length].upper()
        return self.hash_cache[str_value]
    
    def _shift_date(self, date_str):
        """Shift a DICOM date by the configured offset."""
        if not date_str or len(date_str) < 8:
            return date_str
        try:
            date_obj = datetime.strptime(date_str[:8], '%Y%m%d')
            shifted = date_obj + timedelta(days=self.date_shift_days)
            return shifted.strftime('%Y%m%d')
        except:
            return date_str
    
    def _generate_anon_id(self):
        """Generate an anonymous ID."""
        self.anon_counter += 1
        return f"ANON{self.anon_counter:06d}"
    
    def anonymize(self, ds, strategy='replace', keep_dates=False):
        """
        Anonymize a DICOM dataset.
        
        Parameters:
        -----------
        ds : Dataset
            DICOM dataset to anonymize
        strategy : str
            'remove', 'empty', 'replace', or 'hash'
        keep_dates : bool
            If False, dates are shifted. If True, dates are preserved.
        
        Returns:
        --------
        Dataset : Anonymized copy of the dataset
        """
        # Create a copy
        anon_ds = ds.copy()
        
        # Patient Information
        if strategy == 'remove':
            self._remove_tags(anon_ds)
        elif strategy == 'empty':
            self._empty_tags(anon_ds)
        elif strategy == 'hash':
            self._hash_tags(anon_ds)
        else:  # replace
            self._replace_tags(anon_ds)
        
        # Handle dates
        if not keep_dates:
            self._shift_dates(anon_ds)
        
        # Always regenerate UIDs for safety
        self._regenerate_uids(anon_ds)
        
        return anon_ds
    
    def _remove_tags(self, ds):
        """Remove PHI tags."""
        tags_to_remove = [
            'PatientName', 'PatientID', 'PatientBirthDate', 'PatientBirthTime',
            'PatientAddress', 'PatientTelephoneNumbers', 'PatientAge',
            'OtherPatientIDs', 'OtherPatientNames',
            'ReferringPhysicianName', 'PerformingPhysicianName', 'OperatorsName',
            'InstitutionName', 'InstitutionAddress', 'StationName',
            'AccessionNumber', 'PhysiciansOfRecord'
        ]
        for tag in tags_to_remove:
            if hasattr(ds, tag):
                delattr(ds, tag)
    
    def _empty_tags(self, ds):
        """Set PHI tags to empty."""
        if hasattr(ds, 'PatientName'): ds.PatientName = ''
        if hasattr(ds, 'PatientID'): ds.PatientID = ''
        if hasattr(ds, 'PatientBirthDate'): ds.PatientBirthDate = ''
        if hasattr(ds, 'PatientAddress'): ds.PatientAddress = ''
        if hasattr(ds, 'PatientTelephoneNumbers'): ds.PatientTelephoneNumbers = ''
        if hasattr(ds, 'ReferringPhysicianName'): ds.ReferringPhysicianName = ''
        if hasattr(ds, 'PerformingPhysicianName'): ds.PerformingPhysicianName = ''
        if hasattr(ds, 'OperatorsName'): ds.OperatorsName = ''
        if hasattr(ds, 'InstitutionName'): ds.InstitutionName = ''
        if hasattr(ds, 'InstitutionAddress'): ds.InstitutionAddress = ''
        if hasattr(ds, 'StationName'): ds.StationName = ''
        if hasattr(ds, 'AccessionNumber'): ds.AccessionNumber = ''
    
    def _replace_tags(self, ds):
        """Replace PHI tags with dummy values."""
        anon_id = self._generate_anon_id()
        
        if hasattr(ds, 'PatientName'): ds.PatientName = f"Anonymous^Patient^{anon_id}"
        if hasattr(ds, 'PatientID'): ds.PatientID = anon_id
        if hasattr(ds, 'PatientBirthDate'): ds.PatientBirthDate = '19000101'
        if hasattr(ds, 'PatientAge'): ds.PatientAge = '000Y'
        if hasattr(ds, 'PatientAddress'): ds.PatientAddress = 'ANONYMIZED'
        if hasattr(ds, 'PatientTelephoneNumbers'): ds.PatientTelephoneNumbers = ''
        if hasattr(ds, 'ReferringPhysicianName'): ds.ReferringPhysicianName = 'ANONYMIZED'
        if hasattr(ds, 'PerformingPhysicianName'): ds.PerformingPhysicianName = 'ANONYMIZED'
        if hasattr(ds, 'OperatorsName'): ds.OperatorsName = 'ANONYMIZED'
        if hasattr(ds, 'InstitutionName'): ds.InstitutionName = 'ANONYMIZED'
        if hasattr(ds, 'InstitutionAddress'): ds.InstitutionAddress = 'ANONYMIZED'
        if hasattr(ds, 'StationName'): ds.StationName = 'ANON_STATION'
        if hasattr(ds, 'AccessionNumber'): ds.AccessionNumber = anon_id
    
    def _hash_tags(self, ds):
        """Replace PHI tags with hashed values (maintains consistency)."""
        if hasattr(ds, 'PatientName'):
            hash_val = self._hash_value(ds.PatientName)
            ds.PatientName = f"HASH^{hash_val}"
        
        if hasattr(ds, 'PatientID'):
            ds.PatientID = self._hash_value(ds.PatientID, 12)
        
        if hasattr(ds, 'PatientBirthDate'): ds.PatientBirthDate = '19000101'
        if hasattr(ds, 'PatientAge'): ds.PatientAge = '000Y'
        if hasattr(ds, 'PatientAddress'): ds.PatientAddress = ''
        if hasattr(ds, 'PatientTelephoneNumbers'): ds.PatientTelephoneNumbers = ''
        
        if hasattr(ds, 'ReferringPhysicianName'):
            ds.ReferringPhysicianName = self._hash_value(ds.ReferringPhysicianName)
        
        if hasattr(ds, 'InstitutionName'):
            ds.InstitutionName = self._hash_value(ds.InstitutionName)
        
        if hasattr(ds, 'AccessionNumber'):
            ds.AccessionNumber = self._hash_value(ds.AccessionNumber)
    
    def _shift_dates(self, ds):
        """Shift all dates by the configured offset."""
        date_tags = [
            'StudyDate', 'SeriesDate', 'AcquisitionDate', 'ContentDate'
        ]
        for tag in date_tags:
            if hasattr(ds, tag):
                original = getattr(ds, tag)
                setattr(ds, tag, self._shift_date(original))
    
    def _regenerate_uids(self, ds):
        """Regenerate all UIDs."""
        if hasattr(ds, 'StudyInstanceUID'):
            ds.StudyInstanceUID = generate_uid()
        if hasattr(ds, 'SeriesInstanceUID'):
            ds.SeriesInstanceUID = generate_uid()
        if hasattr(ds, 'SOPInstanceUID'):
            ds.SOPInstanceUID = generate_uid()
            ds.file_meta.MediaStorageSOPInstanceUID = ds.SOPInstanceUID
        if hasattr(ds, 'FrameOfReferenceUID'):
            ds.FrameOfReferenceUID = generate_uid()

print("DicomAnonymizer class created!")

## 5. Testing Different Strategies

In [None]:
def compare_anonymization(original, strategy):
    """Compare original and anonymized datasets."""
    anonymizer = DicomAnonymizer(seed=42)
    anon = anonymizer.anonymize(original, strategy=strategy)
    
    print(f"\n{'='*60}")
    print(f"STRATEGY: {strategy.upper()}")
    print(f"{'='*60}")
    
    tags_to_check = [
        'PatientName', 'PatientID', 'PatientBirthDate',
        'StudyDate', 'InstitutionName', 'ReferringPhysicianName'
    ]
    
    print(f"{'Tag':<25} {'Original':<25} {'Anonymized'}")
    print("-" * 80)
    
    for tag in tags_to_check:
        orig_val = str(getattr(original, tag, 'N/A'))[:24]
        anon_val = str(getattr(anon, tag, '[REMOVED]'))[:24] if hasattr(anon, tag) else '[REMOVED]'
        print(f"{tag:<25} {orig_val:<25} {anon_val}")
    
    return anon

# Test all strategies
for strategy in ['remove', 'empty', 'replace', 'hash']:
    compare_anonymization(original_ds, strategy)

## 6. Batch Anonymization

In [None]:
def anonymize_directory(input_dir, output_dir, strategy='replace'):
    """
    Anonymize all DICOM files in a directory.
    
    Parameters:
    -----------
    input_dir : str
        Directory containing original DICOM files
    output_dir : str
        Directory for anonymized files
    strategy : str
        Anonymization strategy
    """
    os.makedirs(output_dir, exist_ok=True)
    
    # Use same anonymizer for consistency (same patient gets same anon ID)
    anonymizer = DicomAnonymizer(seed=42)
    
    processed = 0
    errors = 0
    
    for filename in os.listdir(input_dir):
        input_path = os.path.join(input_dir, filename)
        
        try:
            ds = pydicom.dcmread(input_path)
            anon_ds = anonymizer.anonymize(ds, strategy=strategy)
            
            output_path = os.path.join(output_dir, f"anon_{filename}")
            pydicom.dcmwrite(output_path, anon_ds)
            
            processed += 1
        except Exception as e:
            errors += 1
    
    print(f"Processed: {processed} files")
    print(f"Errors: {errors} files")
    
    return processed, errors

print("Batch anonymization function ready!")

## 7. Verification

In [None]:
def verify_anonymization(ds, verbose=True):
    """
    Verify that a DICOM has been properly anonymized.
    
    Returns:
    --------
    tuple: (is_safe, warnings)
    """
    warnings = []
    
    # Check for remaining PHI
    phi_checks = [
        ('PatientName', lambda v: v and 'ANON' not in str(v).upper() and 'HASH' not in str(v).upper()),
        ('PatientID', lambda v: v and len(str(v)) > 12 and not str(v).startswith('ANON')),
        ('PatientBirthDate', lambda v: v and v != '19000101' and v != ''),
        ('PatientAddress', lambda v: v and v != '' and v != 'ANONYMIZED'),
        ('PatientTelephoneNumbers', lambda v: v and v != ''),
        ('ReferringPhysicianName', lambda v: v and 'ANON' not in str(v).upper() and len(str(v)) > 10),
        ('InstitutionName', lambda v: v and 'ANON' not in str(v).upper() and len(str(v)) > 10),
    ]
    
    for tag, check_func in phi_checks:
        if hasattr(ds, tag):
            value = getattr(ds, tag)
            if check_func(value):
                warnings.append(f"Potential PHI in {tag}: {value}")
    
    is_safe = len(warnings) == 0
    
    if verbose:
        print("ANONYMIZATION VERIFICATION")
        print("=" * 40)
        if is_safe:
            print("[PASS] No obvious PHI detected")
        else:
            print("[WARN] Potential PHI found:")
            for w in warnings:
                print(f"  - {w}")
    
    return is_safe, warnings

# Verify original (should fail)
print("Checking ORIGINAL dataset:")
verify_anonymization(original_ds)

# Verify anonymized (should pass)
print("\nChecking ANONYMIZED dataset:")
anonymizer = DicomAnonymizer(seed=42)
anon_ds = anonymizer.anonymize(original_ds, strategy='replace')
verify_anonymization(anon_ds)

## 8. Complete Anonymization Example

In [None]:
# Create, anonymize, save, and verify

# 1. Create original
original = create_sample_with_phi()

# 2. Anonymize
anonymizer = DicomAnonymizer(seed=123)
anonymized = anonymizer.anonymize(original, strategy='replace')

# 3. Save both
pydicom.dcmwrite('original_phi.dcm', original)
pydicom.dcmwrite('anonymized.dcm', anonymized)

print("Files saved:")
print("  - original_phi.dcm (contains PHI - DO NOT SHARE!)")
print("  - anonymized.dcm (safe to share)")

# 4. Verify
print("\n" + "="*60)
print("SIDE-BY-SIDE COMPARISON")
print("="*60)

tags = ['PatientName', 'PatientID', 'PatientBirthDate', 'PatientAddress',
        'StudyDate', 'InstitutionName', 'StudyInstanceUID']

print(f"{'Tag':<25} {'Original':<30} {'Anonymized'}")
print("-" * 90)

for tag in tags:
    orig_val = str(getattr(original, tag, 'N/A'))[:28]
    anon_val = str(getattr(anonymized, tag, 'N/A'))[:28]
    print(f"{tag:<25} {orig_val:<30} {anon_val}")

In [None]:
# Clean up
for f in ['original_phi.dcm', 'anonymized.dcm']:
    if os.path.exists(f):
        os.remove(f)
print("Test files cleaned up.")

## 9. Best Practices

### Do:
1. **Always verify** anonymization before sharing
2. **Use consistent seeds** for same-patient files
3. **Regenerate UIDs** to prevent linking
4. **Shift dates** rather than removing them (preserves relative timing)
5. **Document** your anonymization process
6. **Test** with verification tools

### Don't:
1. **Don't just remove visible tags** - check for PHI in all tags
2. **Don't forget private tags** - vendor-specific tags may contain PHI
3. **Don't share original UIDs** - they can be used to link data
4. **Don't assume pixel data is safe** - burned-in text may contain PHI
5. **Don't use weak anonymization** for truly sensitive data

### Burned-in Text Warning:
Some DICOM images have patient information burned into the pixel data. 
This requires image processing (OCR + redaction) to remove!

## Summary

What we learned:
1. **PHI** must be removed/modified before sharing DICOM data
2. **Multiple strategies**: remove, empty, replace, hash
3. **Date shifting** preserves temporal relationships while hiding actual dates
4. **UID regeneration** prevents linking anonymized data back to originals
5. **Verification** is essential - always check your work
6. **Batch processing** for large datasets