Environment Setup


In [None]:
!pip install ipywidgets
!pip install matplotlib ipywidgets
!pip install boto3


In [5]:
import os
import pydicom
from collections import defaultdict
import pandas as pd
from ipywidgets import interact, fixed
import ipywidgets as widgets
import boto3
from botocore.exceptions import ClientError
import botocore
import matplotlib.pyplot as plt
import csv
import sys
from tqdm import tqdm

Get Directory 

In [6]:
def get_directory_path():
    """
    Get directory path from user input.
    
    Returns:
    - directory (str): The directory path entered by the user.
    """
    while True:
        directory = input("Enter directory path (local directory or S3 bucket prefix): ").strip()
        if os.path.isdir(directory):
            return directory, 'local'
        elif directory.startswith('s3://'):
            return directory[5:], 's3'
        else:
            print("Invalid directory path. Please enter a valid local directory or S3 bucket prefix.")


Verify Dicom Data

In [10]:
import pandas as pd

def verify_dicom_files(directory, location='local', output_csv=None):
    """
    Verify if DICOM files in a directory (local or S3 bucket) and its subdirectories are valid and can be read.
    
    Args:
    - directory (str): The root directory containing DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    - output_csv (str): Optional. If provided, output the invalid DICOM files to a CSV file.
    
    Returns:
    - valid_files (list): List of paths to valid DICOM files.
    - invalid_files (list): List of tuples containing paths to invalid DICOM files and encountered errors.
    """
    valid_files = []
    invalid_files = []
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            for filename in files:
                file_path = os.path.join(root, filename)
                if filename.endswith('.dcm'):
                    try:
                        pydicom.dcmread(file_path)
                        valid_files.append(file_path)
                    except Exception as e:
                        invalid_files.append((file_path, str(e)))
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    try:
                        # Read DICOM file directly from S3
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])
                        valid_files.append(key)
                    except Exception as e:
                        invalid_files.append((key, str(e)))
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    if output_csv:
        # Save invalid DICOM files to a CSV file
        df = pd.DataFrame(invalid_files, columns=['File', 'Error'])
        df.to_csv(output_csv, index=False)
        print(f"Invalid DICOM files saved to: {output_csv}")
    
    return valid_files, invalid_files


In [None]:
# Example usage:
directory, location = get_directory_path()
valid_files, invalid_files = verify_dicom_files(directory, location, output_csv=r'Your Path')
print("Valid DICOM files:")
print(valid_files)
print("\nInvalid DICOM files:")
print(invalid_files)

Run Count Checks

In [17]:
def count_dicom_files(directory, location='local'):
    """
    Count the number of DICOM files in a directory and its subdirectories (local or S3 bucket).
    
    Args:
    - directory (str): The root directory to search for DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - total_count (int): Total number of DICOM files found.
    - counts_by_subdirectory (dict): Dictionary containing counts of DICOM files in each subdirectory.
    """
    total_count = 0
    counts_by_subdirectory = {}
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            dicom_files = [file for file in files if file.endswith('.dcm')]
            if dicom_files:
                counts_by_subdirectory[root] = len(dicom_files)
                total_count += len(dicom_files)
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Count DICOM files directly from S3
                    total_count += 1
                    subdirectory = os.path.dirname(key[len(prefix)+1:])  # Get relative subdirectory
                    counts_by_subdirectory.setdefault(subdirectory, 0)
                    counts_by_subdirectory[subdirectory] += 1
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    return total_count, counts_by_subdirectory

In [None]:
# Example usage:
directory, location = get_directory_path()
total_count, counts_by_subdirectory = count_dicom_files(directory, location)
print("Total number of DICOM files:", total_count)
print("Counts by subdirectory:", counts_by_subdirectory)

Check for duplicate SOP's

In [19]:
def check_duplicate_sop_uids(directory, location='local'):
    """
    Check for duplicate SOP Instance UIDs (SOP UID) in DICOM files within a directory and its subdirectories (local or S3 bucket).
    
    Args:
    - directory (str): The root directory to search for DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - duplicate_uids (dict): Dictionary containing lists of files with duplicate SOP UID for each UID.
    """
    duplicate_uids = defaultdict(list)
    sop_uids = defaultdict(list)
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.dcm'):
                    file_path = os.path.join(root, file)
                    try:
                        ds = pydicom.dcmread(file_path)
                        sop_uid = ds.SOPInstanceUID
                        sop_uids[sop_uid].append(file_path)
                    except Exception as e:
                        print(f"Error reading DICOM file '{file_path}': {e}")
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Read DICOM file directly from S3
                    try:
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])
                        sop_uid = ds.SOPInstanceUID
                        sop_uids[sop_uid].append(key)
                    except Exception as e:
                        print(f"Error reading DICOM file '{key}' from S3: {e}")
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    # Find duplicate SOP UIDs
    for uid, files in sop_uids.items():
        if len(files) > 1:
            duplicate_uids[uid] = files
    
    return duplicate_uids

In [None]:
# Example usage:
directory, location = get_directory_path()
duplicate_uids = check_duplicate_sop_uids(directory, location)
print("Duplicate SOP Instance UIDs:")
for uid, files in duplicate_uids.items():
    print(f"SOP UID: {uid}, Files: {files}")

Run Dicom consistency check

In [None]:
def check_dicom_consistency(directory, location='local'):
    """
    Perform a consistency check on all DICOM files within a directory and its subdirectories (local or S3 bucket).
    
    Args:
    - directory (str): The root directory to search for DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - consistency_report (dict): Dictionary containing consistency check results.
    """
    consistency_report = defaultdict(dict)
    patient_ids = set()
    study_instance_uids = set()
    series_instance_uids = defaultdict(set)
    sop_instance_uids = defaultdict(set)
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.dcm'):
                    file_path = os.path.join(root, file)
                    try:
                        ds = pydicom.dcmread(file_path)
                        
                        # Check presence of required attributes
                        if 'PatientID' not in ds:
                            consistency_report[file_path]['missing_PatientID'] = True
                        if 'StudyInstanceUID' not in ds:
                            consistency_report[file_path]['missing_StudyInstanceUID'] = True
                        if 'SeriesInstanceUID' not in ds:
                            consistency_report[file_path]['missing_SeriesInstanceUID'] = True
                        if 'SOPInstanceUID' not in ds:
                            consistency_report[file_path]['missing_SOPInstanceUID'] = True
                        
                        # Check consistency of PatientID
                        patient_id = ds.get('PatientID', '')
                        patient_ids.add(patient_id)
                        
                        # Check consistency of StudyInstanceUID
                        study_uid = ds.get('StudyInstanceUID', '')
                        study_instance_uids.add(study_uid)
                        
                        # Check consistency of SeriesInstanceUID within each study
                        series_uid = ds.get('SeriesInstanceUID', '')
                        series_instance_uids[study_uid].add(series_uid)
                        
                        # Check consistency of SOPInstanceUID within each series
                        sop_uid = ds.get('SOPInstanceUID', '')
                        sop_instance_uids[series_uid].add(sop_uid)
                        
                    except Exception as e:
                        consistency_report[file_path]['error'] = str(e)
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Read DICOM file directly from S3
                    try:
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])
                        
                        # Check presence of required attributes
                        if 'PatientID' not in ds:
                            consistency_report[key]['missing_PatientID'] = True
                        if 'StudyInstanceUID' not in ds:
                            consistency_report[key]['missing_StudyInstanceUID'] = True
                        if 'SeriesInstanceUID' not in ds:
                            consistency_report[key]['missing_SeriesInstanceUID'] = True
                        if 'SOPInstanceUID' not in ds:
                            consistency_report[key]['missing_SOPInstanceUID'] = True
                        
                        # Check consistency of PatientID
                        patient_id = ds.get('PatientID', '')
                        patient_ids.add(patient_id)
                        
                        # Check consistency of StudyInstanceUID
                        study_uid = ds.get('StudyInstanceUID', '')
                        study_instance_uids.add(study_uid)
                        
                        # Check consistency of SeriesInstanceUID within each study
                        series_uid = ds.get('SeriesInstanceUID', '')
                        series_instance_uids[study_uid].add(series_uid)
                        
                        # Check consistency of SOPInstanceUID within each series
                        sop_uid = ds.get('SOPInstanceUID', '')
                        sop_instance_uids[series_uid].add(sop_uid)
                        
                    except Exception as e:
                        consistency_report[key]['error'] = str(e)
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    # Add consistency check results to the report
    consistency_report['PatientID_consistent'] = len(patient_ids) == 1
    consistency_report['StudyInstanceUID_consistent'] = len(study_instance_uids) == 1
    
    for study_uid, series_uids in series_instance_uids.items():
        consistency_report[f'SeriesInstanceUID_consistent_{study_uid}'] = len(series_uids) == 1
    
    for series_uid, sop_uids in sop_instance_uids.items():
        consistency_report[f'SOPInstanceUID_consistent_{series_uid}'] = len(sop_uids) == len(files)
    
    return consistency_report

In [None]:
# Example usage:
directory, location = get_dicom_files_directory()
consistency_report = check_dicom_consistency(directory, location)
print("Consistency Report:")
for file_path, report in consistency_report.items():
    print(f"File: {file_path}, Report: {report}")

Verify DICOM IOD (Dciodvfy)

In [21]:
def verify_dicom_iod_data(directory, location='local'):
    """
    Verify DICOM IOD (Information Object Definition) data consistency within DICOM files
    in a directory and its subdirectories.
    
    Args:
    - directory (str): The root directory to search for DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - iod_verification_report (dict): Dictionary containing IOD verification results.
    """
    iod_verification_report = defaultdict(list)
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.dcm'):
                    file_path = os.path.join(root, file)
                    try:
                        ds = pydicom.dcmread(file_path)
                        iod = ds.SOPClassUID
                        
                        if iod:
                            iod_verification_report[iod].append(file_path)
                        else:
                            iod_verification_report['No SOPClassUID'].append(file_path)
                        
                    except Exception as e:
                        iod_verification_report['Error reading file'].append((file_path, str(e)))
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Read DICOM file directly from S3
                    try:
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])
                        iod = ds.SOPClassUID
                        
                        if iod:
                            iod_verification_report[iod].append(key)
                        else:
                            iod_verification_report['No SOPClassUID'].append(key)
                        
                    except Exception as e:
                        iod_verification_report['Error reading file'].append((key, str(e)))
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    return iod_verification_report

In [None]:
# Example usage:
directory, location = get_directory_path()
iod_verification_report = verify_dicom_iod_data(directory, location)
print("IOD Verification Report:")
for iod, files in iod_verification_report.items():
    print(f"IOD: {iod}, Files: {files}")

Generate activity report

In [None]:
def generate_activity_report(directory, location='local'):
    """
    Generate an activity report for DICOM files within a directory and its subdirectories.
    
    Args:
    - directory (str): The root directory to search for DICOM files (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - activity_report (dict): Dictionary containing the activity report.
    """
    activity_report = defaultdict(int)
    patients = set()
    studies = set()
    series = set()
    modalities = set()
    
    if location == 'local':
        # Traverse through all subdirectories recursively in the local directory
        for root, dirs, files in os.walk(directory):
            for file in files:
                if file.endswith('.dcm'):
                    file_path = os.path.join(root, file)
                    try:
                        ds = pydicom.dcmread(file_path)
                        
                        # Count patients, studies, series, modalities
                        patients.add(ds.PatientID)
                        studies.add(ds.StudyInstanceUID)
                        series.add(ds.SeriesInstanceUID)
                        modalities.add(ds.Modality)
                        
                        # Count total DICOM files
                        activity_report['Total DICOM Files'] += 1
                        
                    except Exception as e:
                        activity_report['Error reading file'] += 1
    elif location == 's3':
        # Traverse through all objects in the S3 bucket prefix
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Read DICOM file directly from S3
                    try:
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])
                        
                        # Count patients, studies, series, modalities
                        patients.add(ds.PatientID)
                        studies.add(ds.StudyInstanceUID)
                        series.add(ds.SeriesInstanceUID)
                        modalities.add(ds.Modality)
                        
                        # Count total DICOM files
                        activity_report['Total DICOM Files'] += 1
                        
                    except Exception as e:
                        activity_report['Error reading file'] += 1
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
    
    # Update activity report with counts
    activity_report['Total Patients'] = len(patients)
    activity_report['Total Studies'] = len(studies)
    activity_report['Total Series'] = len(series)
    activity_report['Total Modalities'] = len(modalities)
    
    return activity_report

In [None]:
# Example usage:
directory, location = get_directory_path()
activity_report = generate_activity_report(directory)

# Display the activity report
print("DICOM Activity Report:")
for key, value in activity_report.items():
    print(f"{key}: {value}")

PHI extract dicom attributes

In [24]:
def extract_dicom_attributes(directory, output_path, location='local'):
    """
    Recursively extract specific attributes from DICOM files in the input directory and save to an Excel file.
    
    Args:
    - directory (str): The directory containing DICOM files (local directory path or S3 bucket prefix).
    - output_path (str): The path to save the Excel file.
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - None
    """
    unique_values = set()
    data = []
    
    if location == 'local':
        for root, dirs, files in os.walk(directory):
            # Keep track of series UID and the number of files in each series
            series_count = {}

            for file in files:
                if file.endswith('.dcm'):
                    file_path = os.path.join(root, file)
                    ds = pydicom.dcmread(file_path)

                    series_uid = ds.SeriesInstanceUID

                    # Count the number of files in the series
                    series_count[series_uid] = series_count.get(series_uid, 0) + 1

                    for elem in ds:
                        value = str(ds[elem.tag].value)
                        if value not in unique_values:
                            unique_values.add(value)

                            # Extract desired attributes
                            element = elem.name
                            vr = elem.VR
                            q_value = elem.VR
                            description = elem.description
                            disp = elem.description
                            num_series = series_count.get(series_uid, 0)

                            data.append({'File': file_path, 'Element': element, 'VR': vr, 'Q-value': q_value, 
                                         'Description': description, 'Disp': disp, 'Num_series': num_series})
                            break  # Move to the next DICOM file after extracting one unique tag value
    elif location == 's3':
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    # Read DICOM file directly from S3
                    try:
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])

                        series_uid = ds.SeriesInstanceUID

                        # Count the number of files in the series
                        series_count[series_uid] = series_count.get(series_uid, 0) + 1

                        for elem in ds:
                            value = str(ds[elem.tag].value)
                            if value not in unique_values:
                                unique_values.add(value)

                                # Extract desired attributes
                                element = elem.name
                                vr = elem.VR
                                q_value = elem.VR
                                description = elem.description
                                disp = elem.description
                                num_series = series_count.get(series_uid, 0)

                                data.append({'File': key, 'Element': element, 'VR': vr, 'Q-value': q_value, 
                                             'Description': description, 'Disp': disp, 'Num_series': num_series})
                                break  # Move to the next DICOM file after extracting one unique tag value
                    except Exception as e:
                        print(f"Error reading DICOM file '{key}' from S3: {e}")
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
        return
    
    df = pd.DataFrame(data)
    df.to_excel(output_path, index=False)

In [None]:
# Example usage:
directory, location = get_directory_path()
output_path = r'your path'
extract_dicom_attributes(directory, output_path, 's3')

In [7]:
def remove_phi(directory, substrings, location='local'):
    """
    Remove DICOM tags containing substrings entered by the user.
    
    Args:
    - directory (str): The root directory containing DICOM files (local directory path or S3 bucket prefix).
    - substrings (list): List of substrings to search for in DICOM tags.
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - None
    """
    if location == 'local':
        dicom_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.dcm')]
        dicom_files.sort()

        for file_path in dicom_files:
            ds = pydicom.dcmread(file_path)

            # Remove tags containing specified substrings
            for tag in ds:
                for substring in substrings:
                    if substring.lower() in tag.lower():
                        delattr(ds, tag)
                        break

            # Save modified DICOM
            output_path = os.path.splitext(file_path)[0] + '_no_phi.dcm'
            ds.save_as(output_path)

            print(f"Removed specified substrings from {file_path} and saved as {output_path}")
    elif location == 's3':
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    try:
                        # Read DICOM file directly from S3
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])

                        # Remove tags containing specified substrings
                        for tag in ds:
                            for substring in substrings:
                                if substring.lower() in tag.lower():
                                    delattr(ds, tag)
                                    break

                        # Save modified DICOM
                        output_key = os.path.splitext(key)[0] + '_no_phi.dcm'
                        with open(output_key, 'wb') as f:
                            ds.save_as(f)

                        print(f"Removed specified substrings from {key} and saved as {output_key}")
                    except Exception as e:
                        print(f"Error processing DICOM file '{key}' from S3: {e}")
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")

In [8]:
def generate_summary(directory, output_path, location='local'):
    """
    Generate a spreadsheet summary of DICOM tags for review.
    
    Args:
    - directory (str): The root directory containing DICOM files (local directory path or S3 bucket prefix).
    - output_path (str): The path to save the Excel file.
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - None
    """
    if location == 'local':
        dicom_files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.dcm')]
        dicom_files.sort()

        dicom_data = []

        for file_path in dicom_files:
            ds = pydicom.dcmread(file_path)
            dicom_info = {'File': file_path}

            for elem in ds:
                dicom_info[elem.name] = str(elem.value)

            dicom_data.append(dicom_info)

        df = pd.DataFrame(dicom_data)
        df.to_excel(output_path, index=False)
        print(f"Summary of DICOM tags saved to {output_path}")
    elif location == 's3':
        bucket_name, prefix = directory.split('/', 1)
        s3 = boto3.client('s3')
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

        if 'Contents' in response:
            dicom_data = []

            for obj in response['Contents']:
                key = obj['Key']
                if key.endswith('.dcm'):
                    try:
                        # Read DICOM file directly from S3
                        s3_object = s3.get_object(Bucket=bucket_name, Key=key)
                        ds = pydicom.dcmread(s3_object['Body'])

                        dicom_info = {'File': key}

                        for elem in ds:
                            dicom_info[elem.name] = str(elem.value)

                        dicom_data.append(dicom_info)
                    except Exception as e:
                        print(f"Error processing DICOM file '{key}' from S3: {e}")

            df = pd.DataFrame(dicom_data)
            df.to_excel(output_path, index=False)
            print(f"Summary of DICOM tags saved to {output_path}")
        else:
            print("No DICOM files found in the specified S3 bucket location.")
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")

In [None]:
# Provide the directory containing DICOM files
#directory = '/path/to/your/dicom/directory/'

directory, location = get_directory_path()

# Enter substrings to remove from DICOM tags
substrings = input("Enter substrings to remove from DICOM tags (comma-separated): ").split(',')

# Remove specified substrings and save modified DICOM files
remove_phi(directory, substrings)

# Generate a spreadsheet summary of DICOM tags for review
output_path = r'your path'
generate_summary(directory, output_path)

Rename Files & Check Age

In [None]:
def rename_dicom_files(input_directory, location='local'):
    """
    Rename DICOM files according to their SOP Instance UID and empty Age tag if age > 89.

    Args:
    - input_directory (str): The input directory containing DICOM files.
    - location (str): Location type, either 'local' or 's3'. Default is 'local'.

    Returns:
    - None
    """
    if location == 'local':
        dcm_files = glob.glob(os.path.join(input_directory, "**/*.dcm"), recursive=True)
    elif location == 's3':
        s3 = boto3.resource('s3')
        bucket_name, prefix = input_directory.split('/', 1)
        bucket = s3.Bucket(bucket_name)
        dcm_files = [obj.key for obj in bucket.objects.filter(Prefix=prefix) if obj.key.endswith('.dcm')]
    else:
        raise ValueError("Invalid location type. Use 'local' or 's3'.")

    print(f"{len(dcm_files)} DICOM files found in the directory.\n")

    for dcm_file in tqdm(dcm_files):
        if location == 'local':
            complete_f_path = dcm_file
        elif location == 's3':
            file_name = os.path.basename(dcm_file)
            download_path = os.path.join('/tmp', file_name)
            try:
                bucket.download_file(dcm_file, download_path)
            except botocore.exceptions.ClientError as e:
                if e.response['Error']['Code'] == "404":
                    print(f"The object {dcm_file} does not exist.")
                else:
                    raise

            complete_f_path = download_path

        ds = pydicom.dcmread(complete_f_path)
        sop_instance_uid = str(ds.get((0x0008, 0x0018)).value)

        if (0x0010, 0x1010) in ds:
            age = str(ds.PatientAge)
            if age:
                scale = age[-1:]
                if scale.isalpha():
                    age = age[:-1]
                if len(age) > 1:
                    if age[0] == '0':
                        age = int(age[1:])
                    else:
                        age = int(age)
                elif len(age) <= 1:
                    age = int(age)
                if age > 89:
                    ds.PatientAge = None

        folder = os.path.dirname(complete_f_path)
        destination = os.path.join(folder, f"{sop_instance_uid}.dcm")

        os.rename(complete_f_path, destination)
        ds.save_as(destination)

    print("\nProcessing complete.")

In [None]:
# Example usage:
# For local directory:
rename_dicom_files("/path/to/local/directory", location='local')

# For S3 directory:
# rename_dicom_files("your_bucket_name/your_prefix", location='s3')

Generate DME

In [None]:
def get_dicom_metadata(file):
    """
    Extracts DICOM metadata from a DICOM file.
    
    Args:
    - file (str): Path to the DICOM file.
    
    Returns:
    - data (dict): Dictionary containing DICOM metadata.
    """
    dcm = pydicom.dcmread(file)
    pixel_spacing = dcm.get("PixelSpacing", "")
    if pixel_spacing != "":
        pixel_spacing = pixel_spacing[0].original_string
    imager_pixel_spacing = dcm.get("ImagerPixelSpacing", "")
    if imager_pixel_spacing != "":
        imager_pixel_spacing = imager_pixel_spacing[0].original_string
    image_type = dcm.get("ImageType", "")
    if type(image_type) != str:
        image_type = "_".join(dcm.get("ImageType", ""))
    convolution_kernel = dcm.get("ConvolutionKernel", "")
    if type(convolution_kernel) != str:
        convolution_kernel = "_".join(dcm.get("ConvolutionKernel", ""))
    exposure_modulation_type = dcm.get("ExposureModulationType", "")
    if type(exposure_modulation_type) != str:
        exposure_modulation_type = "_".join(dcm.get("ExposureModulationType", ""))

    data = {
        'file_name': os.path.basename(file),
        'accession_number': dcm.get("AccessionNumber", ""),
        'acquisition_type': dcm.get("AcquisitionType", ""),
        'body_part_examined': dcm.get("BodyPartExamined", ""),
        'case_ids': dcm.get("PatientID", ""),
        'contrast_bolus_agent': dcm.get("ContrastBolusAgent", ""),
        'patient_position': dcm.get("PatientPosition", ""),
        'convolution_kernel': convolution_kernel,
        'detector_type': dcm.get("DetectorType", ""),
        'exposure_modulation_type': exposure_modulation_type,
        'image_type': image_type,
        'imager_pixel_spacing': imager_pixel_spacing,
        'lossy_image_compression': dcm.get("LossyImageCompression", ""),
        'manufacturer': dcm.get("Manufacturer", ""),
        'manufacturer_model_name': dcm.get("ManufacturerModelName", ""),
        'modality': dcm.get("Modality", ""),
        'sop_instance_uid': dcm.get("SOPInstanceUID", ""),
        'pixel_spacing': pixel_spacing,
        'series_description': dcm.get("SeriesDescription", ""),
        'series_uid': dcm.get("SeriesInstanceUID", ""),
        'slice_thickness': dcm.get("SliceThickness", ""),
        'spacing_between_slices': dcm.get("SpacingBetweenSlices", ""),
        'spatial_resolution': dcm.get("SpatialResolution", ""),
        'study_description': dcm.get("StudyDescription", ""),
        'study_uid': dcm.get("StudyInstanceUID", ""),
        'view_position': dcm.get("ViewPosition", ""),
        'study_date': dcm.get("StudyDate", "")
    }
    return data


def write_csv(data, filename):
    """
    Writes DICOM metadata to a CSV file.
    
    Args:
    - data (list): List of dictionaries containing DICOM metadata.
    - filename (str): Path to the output CSV file.
    
    Returns:
    - None
    """
    with open(filename, 'w', newline='') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=data[0].keys())
        writer.writeheader()
        for row in data:
            writer.writerow(row)


def loc_dicom_files(directory):
    """
    Locates DICOM files in a local directory.
    
    Args:
    - directory (str): Path to the local directory.
    
    Returns:
    - dicom_files (list): List of paths to DICOM files.
    """
    dicom_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith(".dcm"):
                dicom_files.append(os.path.join(root, file))
    return dicom_files


def s3_dicom_files(bucket_name, prefix):
    """
    Locates DICOM files in an S3 bucket.
    
    Args:
    - bucket_name (str): Name of the S3 bucket.
    - prefix (str): Prefix to search within the bucket.
    
    Returns:
    - dicom_files (list): List of paths to DICOM files.
    """
    s3 = boto3.client('s3')
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

    dicom_files = []
    if 'Contents' in response:
        for obj in response['Contents']:
            key = obj['Key']
            if key.endswith('.dcm'):
                dicom_files.append(key)

    return dicom_files


def DME_main(directory, location='local'):
    """
    Generates DICOM metadata CSV file.
    
    Args:
    - directory (str): Path to the directory (local directory path or S3 bucket prefix).
    - location (str): The location type ('local' or 's3'). Default is 'local'.
    
    Returns:
    - None
    """
    if location == 'local':
        dicom_files = loc_dicom_files(directory)
    elif location == 's3':
        bucket_name, prefix = directory.split('/', 1)
        dicom_files = s3_dicom_files(bucket_name, prefix)
    else:
        print("Invalid location type. Please provide 'local' or 's3'.")
        return

    data = [get_dicom_metadata(file) for file in tqdm(dicom_files, desc='Progress')]
    output_file = os.path.basename(os.path.normpath(directory)) + "_dcm_metadata" + ".csv"
    write_csv(data, output_file)
    print()
    print('DICOM metadata exported to ' + output_file)

In [None]:
#Example Usage
directory = f'{bucket_name}/{prefix}'
DME_mainmain(directory, location='s3')