# DICOMs infos extractor
By Stephen Karl Larroque @ Coma Science Group, GIGA Research, University of Liege
Creation date: 2018-02-17
License: MIT
v1.5.3

DESCRIPTION:
This script extract metadata infos (patients names, scan date, etc) from all dicoms recursively from the specified folder.
The script expects that the root folder contains either one folder per subject or one zip file per subject, because only one dicom file will be read. If there are multiple subjects in one folder or zip, then only the first will be included, the rest will be skipped (to save file walking time).
Note: it is possible to provide a list of folders where all dicoms reside, each rootfolder will be processed one after the other.

INSTALL NOTE:
You need to pip install pandas before launching this script.
Tested on Python 2.7.15

USAGE:
Modify the parameters below, the most important being the dicom root folder(s), where all your subjects' dicoms folders/zips can be found.

TODO:
* save as additional fields the patient birthdate + sex
* save as additional fields the experiment id, machine id and third id, these can be used to uniquely identify an exam and find duplicates.
* make a dicom checker script: each folder/zip should contain at least one readable dicom, else there is a problem + report if multiple patient names/date of acquisition in the same folder/zip!

In [None]:
# Forcefully autoreload all python modules
%load_ext autoreload
%autoreload 2

In [None]:
# IMPORT AUX FUNCTIONS

import collections
import os, sys
import shutil
import zipfile
import re

cur_path = os.path.realpath('.')
sys.path.append(os.path.join(cur_path, 'csg_fileutil_libs'))  # for unidecode and cleanup_name, because it does not support relative paths (yet?)

# For DB reorganization
from csg_fileutil_libs.aux_funcs import save_dict_as_csv, save_df_as_csv, _tqdm, df_to_unicode

# For Dicom reading
from csg_fileutil_libs.aux_funcs import cleanup_name, recwalk, _StringIO
import csg_fileutil_libs.pydicom as pydicom
from csg_fileutil_libs.pydicom import config as pydicomconfig
from csg_fileutil_libs.pydicom.filereader import InvalidDicomError

pydicomconfig.enforce_valid_values = False  # to allow more resilience against malformatted dicom fields

In [None]:
# PARAMETERS

# Dicoms database
# the rootpath_to_dicoms need to be a string of the folder containing one dicom folder or zip file per subject (so there can be lots of subjects, but one folder/zip per subject from the given path)
# rootpath_to_dicoms can also be a list of strings/folders
rootpath_to_dicoms = [r'C:\git\datatest\output\PATIENTS\NON_SEDATED',
                      r'C:\git\datatest\output\PATIENTS\SEDATED',
                      r'C:\git\datatest\output\PATIENTS\UNKNOWN',
                      r'C:\git\datatest\output\CONTROLS\Controls',
                      r'C:\git\datatest\output\CONTROLS\Controls_new_dti',
                      ]  # rootpath where dicoms are stored (one subject per folder/zip). Can specify a list of paths to process multiple folders one after the other.
#rootpath_to_dicoms = [r'G:\Topreproc\ReportsTun\csg_fileutil_v2.2.0\dicomtest\a', r'G:\Topreproc\ReportsTun\csg_fileutil_v2.2.0\dicomtest\b']
csv_output = r'databases_output\dicoms_db_subjects_reorg.csv'  # where to save the list of subjects
csv_output2 = r'databases_output\dicoms_db_infos_reorg.csv'  # where to save the list of subjects AND the extracted additional fields infos
additional_fields = ['AcquisitionDate', 'PatientID', 'SeriesDescription', 'ProtocolName']  # additional fields to extract from the dicoms headers
walk_all_dicoms = True  # if False, will extract infos from the first dicom found. If True, will recurse until all dicoms have been read and additional fields extracted for all dicoms (will be stored in set() so as to avoid duplication), this will ensure that you do not miss any info at the expense of (way) longer calculations.
find_dicoms_matching = [{'ProtocolName': ['dti', 'repos']}, {'ProtocolName': ['repos']}]  # if you are looking for dicoms matching specific parameters, you can specify the matching here, the result will be saved in another csv. Format: {'DicomAttribute': ['first_attribute_to_match', 'second_attribute_to_match']}. It's an AND test, so it expects all the specified parameters to be found to return True for the current subject. You can specify multiple tests, by providing a list of dicts.
csv_output3 = r'databases_output\dicoms_db_infosmatch_reorg.csv'  # where to save the list of subjects matching the find_dicoms_matching patterns


In [None]:
# MORE AUX FUNCTIONS
def get_list_of_folders(rootpath):
    return [item for item in os.listdir(rootpath) if os.path.isdir(os.path.join(rootpath, item))]

def get_list_of_zip(rootpath):
    return [item for item in os.listdir(rootpath) if os.path.isfile(os.path.join(rootpath, item)) and item.endswith('.zip')]

def get_dcm_names_from_dir(rootpath, dcm_subj_list=None, folder_to_name=None, add_fields=None, walk_all_dicoms=False, verbose=False):
    if dcm_subj_list is None:
        dcm_subj_list = []  # store list of subjects names from dicom files (useful for csv filtering)
    if folder_to_name is None:
        folder_to_name = {}  # store the name of the patient stored in each root folder (useful for anonymization later on)
    if add_fields is not None:
        additional_infos = {}  # store all additional fields extracted from dicoms
    for subject in _tqdm(get_list_of_folders(rootpath), desc='DIR'):
        if verbose:
            print('- Processing subject %s' % unicode(subject, 'latin1'))
        fullpath = os.path.join(rootpath, subject)
        if not isinstance(fullpath, unicode):
            fullpath = unicode(fullpath, 'latin1')
        pts_name = None
        for dirpath, filename in recwalk(fullpath, filetype=['.dcm', '']):
            try:
                #print('* Try to read fields from dicom file: %s' % os.path.join(dirpath, filename))
                # Read the dicom data in memory (via StringIO)
                dcmdata = pydicom.read_file(os.path.join(dirpath, filename), stop_before_pixels=True, defer_size="2 MB", force=True)  # stop_before_pixels allow for faster processing since we do not read the full dicom data, and here we can use it because we do not modify the dicom, we only read it to extract the dicom patient name. defer_size avoids reading everything into memory, which workarounds issues with some malformatted fields that are too long (OverflowError: Python int too large to convert to C long)
                #print(dcmdata.PatientName)
                # Extract and cleanup the patient's name
                pts_name = cleanup_name(dcmdata.PatientName)
                # Add to the list of names
                if (not walk_all_dicoms or len(dcm_subj_list) == 0 or pts_name != dcm_subj_list[-1]):  # add only if the name is not already in the list
                    dcm_subj_list.append( pts_name )
                # Extract additional fields
                if add_fields:
                    additional_infos = add_dicom_fields(additional_infos, dcmdata, pts_name, add_fields, walk_all_dicoms)  # add additional dicom fields given a list in add_fields
                    additional_infos = add_any_field(additional_infos, pts_name, dcmdata.AcquisitionDate, 'path', fullpath)  # add rootpath where each dicom was found, can then later be used for filtering
                # Stop here after the first valid dicom file found, except if we want to extract ALL data
                if not walk_all_dicoms:
                    break
            except (InvalidDicomError, AttributeError, OverflowError) as exc:
                pass
        folder_to_name[subject] = pts_name
    return dcm_subj_list, folder_to_name, additional_infos

def get_dcm_names_from_zip(rootpath, dcm_subj_list=None, folder_to_name=None, add_fields=None, walk_all_dicoms=False, verbose=False):
    if dcm_subj_list is None:
        dcm_subj_list = []  # store list of subjects names from dicom files (useful for csv filtering)
    if folder_to_name is None:
        folder_to_name = {}  # store the name of the patient stored in each root folder (useful for anonymization later on)
    if add_fields is not None:
        additional_infos = {}  # store all additional fields extracted from dicoms
    # Extract names from zipped dicom files (extract the first dicom file we can read and use its fields)
    for zipfilename in _tqdm(get_list_of_zip(rootpath), desc='ZIP'):
        zfilepath = os.path.join(rootpath, zipfilename)
        if verbose:
            print('- Processing file %s' % zipfilename)
        try:
            with zipfile.ZipFile(zfilepath, 'r') as zipfh:
                # Extract only files, not directories (end with '/', this is standard detection in zipfile)
                zfolder = (item for item in zipfh.namelist() if item.endswith('/'))
                zfiles = (item for item in zipfh.namelist() if not item.endswith('/'))
                # Get first top folder inside zip to extract folder name (because when we will extract the zip, we need the folder name)
                try:
                    folder_name = zfolder.next().strip('/')
                except StopIteration:
                    folder_name = re.search('^([^\\/]+)[\\/]', zipfh.namelist()[0]).group(1)
                # Get first dicom file we can find
                pts_name = None
                for zf in zfiles:
                    # Need to extract because pydicom does not support not having seek() (and zipfile in-memory does not provide seek())
                    z = _StringIO(zipfh.read(zf)) # do not use .extract(), the path can be anything and it does not support unicode (so it can easily extract to the root instead of target folder!)
                    # Try to open the extracted dicom
                    try:
                        if verbose:
                            print('Try to decode dicom fields with file %s' % zf)
                        # Read the dicom data in memory (via StringIO)
                        dcmdata = pydicom.read_file(z, stop_before_pixels=True, defer_size="2 MB", force=True)  # stop_before_pixels allow for faster processing since we do not read the full dicom data, and here we can use it because we do not modify the dicom, we only read it to extract the dicom patient name. defer_size avoids reading everything into memory, which workarounds issues with some malformatted fields that are too long (OverflowError: Python int too large to convert to C long)
                        # Extract and cleanup the patient's name
                        pts_name = cleanup_name(dcmdata.PatientName)
                        # Add to the list of names
                        if (not walk_all_dicoms or len(dcm_subj_list) == 0 or pts_name != dcm_subj_list[-1]):  # add only if the name is not already in the list
                            dcm_subj_list.append( pts_name )
                        # Extract additional fields
                        if add_fields:
                            additional_infos = add_dicom_fields(additional_infos, dcmdata, pts_name, add_fields, walk_all_dicoms)
                            additional_infos = add_any_field(additional_infos, pts_name, dcmdata.AcquisitionDate, 'path', zfilepath)  # add rootpath where each dicom was found, can then later be used for filtering
                        if not walk_all_dicoms:
                            break
                    except (InvalidDicomError, AttributeError, OverflowError) as exc:
                        continue
                    except IOError as exc:
                        if 'no tag to read' in str(exc).lower():
                            continue
                        else:
                            raise
                # Add to the folder name -> dicom patient name mapping
                folder_to_name[zipfilename] = pts_name
        except zipfile.BadZipfile as exc:
            # If the zipfile is unreadable, just pass
            continue
    return dcm_subj_list, folder_to_name, additional_infos

def add_dicom_fields(additional_infos, dcmdata, pts_name, add_fields, walk_all_dicoms=False):
    """Add dicom fields in the provided additional_infos dict (can be an empty dict)"""
    dictid = '%s|%s' % (pts_name, dcmdata.AcquisitionDate)
    for field in add_fields:
        # Check that the field is present in the dicom metadata
        if field in dcmdata:
            if walk_all_dicoms:
                # If we walk all dicoms, we might get multiple values for the same field, so we create a set to store the unique set of values
                if not dictid in additional_infos:
                    additional_infos[dictid] = {}
                if not field in additional_infos[dictid]:
                    additional_infos[dictid][field] = set()
                if isinstance(field, str):
                    # If string (a named field)
                    additional_infos[dictid][field].add(dcmdata[dcmdata.data_element(field).tag].value)
                else:
                    # Else it's a coordinate field (no name, like (0010, 2020))
                    additional_infos[dictid][field].add(dcmdata[field].value)
            else:
                # Else we just read one file per folder, so it's easier, we just return one value
                if isinstance(field, str):
                    additional_infos[dictid][field] = dcmdata[dcmdata.data_element(field).tag].value
                else:
                    additional_infos[dictid][field] = dcmdata[field].value
    return additional_infos

def add_any_field(additional_infos, pts_name, acquisitiondate, field, fieldvalue):
    """Add any arbitrary field name and value (provided by user, not from dicoms)
    The main advantage of this function is that it can extract a set of values from the same field over multiple dicom files.
    It is of course mostly indicated for demographics building, NOT for dicom reorganization."""
    dictid = '%s|%s' % (pts_name, acquisitiondate)
    if not dictid in additional_infos:
        # Create the subject/session entry if necessary
        additional_infos[dictid] = {}
    if not field in additional_infos[dictid]:
        # If the field does not exist, then simply enter the value
        additional_infos[dictid][field] = fieldvalue
    else:
        # If the field already exist, we might have multiple times the same value
        # Check if value is the same, then do not modify
        if additional_infos[dictid][field] != fieldvalue:
            # If the value is not the same, we create a set to store all values
            if not isinstance(additional_infos[dictid][field], set):
                additional_infos[dictid][field] = set([additional_infos[dictid][field]])
            additional_infos[dictid][field].add(fieldvalue)
    return additional_infos


def dict_merge(dct, merge_dct, add_keys=True):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
    to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
    ``dct``.

    This version will return a copy of the dictionary and leave the original
    arguments untouched.

    The optional argument ``add_keys``, determines whether keys which are
    present in ``merge_dict`` but not ``dct`` should be included in the
    new dict.
    
    By DomWeldon: https://gist.github.com/angstwad/bf22d1822c38a92ec0a9#gistcomment-2622319

    Args:
        dct (dict) onto which the merge is executed
        merge_dct (dict): dct merged into dct
        add_keys (bool): whether to add new keys

    Returns:
        dict: updated dict
    """
    try:
        dct = dct.copy()
    except Exception as exc:
        print(dct)
        raise
    if not add_keys:
        merge_dct = {
            k: merge_dct[k]
            for k in set(dct).intersection(set(merge_dct))
        }

    for k, v in merge_dct.items():
        if (k in dct and isinstance(dct[k], dict)
                and isinstance(merge_dct[k], collections.Mapping)):
            dct[k] = dict_merge(dct[k], merge_dct[k], add_keys=add_keys)
        else:
            dct[k] = merge_dct[k]

    return dct

In [None]:
# Extract subjects lists and infos from each rootpath
if not isinstance(rootpath_to_dicoms, list):
    # Convert to a list if it's a string
    rootpath_to_dicoms = [rootpath_to_dicoms]
# Initialize variables
dcm_subj_list = []
folder_to_name = {}
additional_infos = {}
paths_list = []
# For each rootpath, extract infos
for rootpath in rootpath_to_dicoms:
    dcm_subj_list, folder_to_name, additional_infos1 = get_dcm_names_from_dir(rootpath, dcm_subj_list=dcm_subj_list, folder_to_name=folder_to_name, add_fields=additional_fields, walk_all_dicoms=walk_all_dicoms)
    dcm_subj_list, folder_to_name, additional_infos2 = get_dcm_names_from_zip(rootpath, dcm_subj_list=dcm_subj_list, folder_to_name=folder_to_name, add_fields=additional_fields, walk_all_dicoms=walk_all_dicoms)
    additional_infos = dict_merge(additional_infos, additional_infos1)
    additional_infos = dict_merge(additional_infos, additional_infos2)
# Save the names list
save_dict_as_csv([{'name': x} for x in dcm_subj_list], csv_output, csv_order_by='name')
# Display the result!
dcm_subj_list

In [None]:
additional_infos

In [None]:
len(additional_infos.keys())

In [None]:
# Convert the additional infos to a pandas DataFrame and save it as csv
# TODO: convert sets to lists
import pandas as pd
df_additional_infos = pd.DataFrame(additional_infos).transpose()
df_additional_infos.index.set_names(['id'], inplace=True)  # add a name to the index (so that the csv column is named)
df_additional_infos.reset_index(inplace=True)
df_additional_infos['name'] = df_additional_infos['id'].apply(lambda x: x.split('|')[0])  # add a column name with only the name
df_additional_infos.set_index('id', inplace=True)
df_additional_infos = df_additional_infos[df_additional_infos.columns[-1:].append(df_additional_infos.columns[:-1])]  # place 'name' column first
#df_additional_infos.drop_duplicates()
save_df_as_csv(df_to_unicode(df_additional_infos), csv_output2, keep_index=True, encoding='iso-8859-1')
df_additional_infos

In [None]:
# Save a CSV file with filtered infos from dicoms (eg, looking for all DICOMs containing DTI sequences)
import re
def find_in_dicoms(df, find={}, partialmatch=True):
    """Given a serie and a dictionary of lists, will return True for all records that match all the provided whitelist, and False otherwise
    This allows to find what dicoms match the parameters you are looking for (eg, having both a dti and bold series).
    to be used in a lambda apply: eg, df.apply(lambda x: find_in_dicoms(x, find={'ProtocolName': ['dti', 'bold']}))"""
    for key, vals in find.items():
        if not key in df or pd.isnull(df[key]):
            return False
        for val in vals:
            if partialmatch:
                if not re.search(val, ' '.join(df[key]).lower(), re.I):
                    return False
            else:
                if not val in df[key]:
                    return False
    return True

if find_dicoms_matching:
    df_match_full = df_additional_infos.copy()
    if not isinstance(find_dicoms_matching, list):
        find_dicoms_matching = [find_dicoms_matching]
    for dicompattern in find_dicoms_matching:
        # Find rows matching with what we are looking for
        df_match = df_additional_infos.apply(lambda df: find_in_dicoms(df, find=dicompattern), axis=1)
        # Now that we have the id that are matching, join this boolean mask as a new column in the whole dataframe
        df_match_full = df_match_full.assign(**{('match_%s' % str(dicompattern)): df_match})
    # Save as a CSV!
    if save_df_as_csv(df_to_unicode(df_match_full), csv_output3, keep_index=True, fields_order=['name']):
        print('The list of dicoms matching the search pattern was saved in the csv file: %s.' % csv_output3)
    df_match_full

In [None]:
# TODO: detect conflicts in dicom path (multiple dicom paths)

Debug code
------------------

In [None]:
import pickle
def save_object(obj, filename):
    with open(filename, 'wb') as output:  # Overwrites any existing file.
        pickle.dump(obj, output, pickle.HIGHEST_PROTOCOL)

# sample usage
save_object(additional_infos, 'additional_infos.pkl')

In [None]:
import pickle
with open('additional_infos.pkl','rb') as f:
    additional_infos = pickle.load(f)
additional_infos