The main goal of this notebook is to handle inconsistent white space in the medical records.
Some records are formatted with a newline character after each line reaches a certain length while
others only uses newlines for paragraphs. We want to process the first case into the second one in order to prevent long succession of empty spaces in the data.

The output of this notebook is a JSON file `PHI_dataset.json` containing preprocessed medical records.

In [None]:
# Versions used where pandas==2.2.1 and xmltodict==0.13.0
%pip install pandas xmltodict

In [None]:
import pandas as pd
from os import listdir
import xml.etree.ElementTree as ET
import xmltodict
import re
import json
import html

In [None]:
# Update the path to point to the i2b2 dataset containing the xml files
PHI_folder = 'path_to/2014_PHI_gold_sets/'
PHI_dataset = PHI_folder + 'all'
files = [file for file in listdir(PHI_dataset) if file.endswith('.xml')]
print('Number of records:', len(files))

In [None]:
patient_ids = {f.split('-')[0] for f in files}
print('Number of patients:',len(patient_ids))

In [None]:
patient_reports = pd.DataFrame([f.split('-') for f in files], columns=['patient_id', 'report_id'])
patient_reports['report_id'] = patient_reports.report_id.map(lambda r: r[:2])
print("Distribution of number of reports per patient:")
patient_reports.groupby('patient_id').count().describe()

In [None]:
# Define helper functions

def extractXML(directory, filename):
    """Parses an xml file to get the medical record content and metadata"""
    tree = ET.parse(directory + '/'+ filename)
    root = tree.getroot()
    xmlstr = ET.tostring(root, encoding='utf8', method='xml')

    xml_dict = xmltodict.parse(xmlstr,dict_constructor=dict)["deIdi2b2"]
    text = xml_dict["TEXT"]
    tags_dict = xml_dict["TAGS"]

    return text,tags_dict,xmlstr

def is_line_to_remove(prev_line, line):
    """
    Remove empty lines or only containing separators
    xxx-yy: lines of dashes -
    """
    is_undesirable_line = line == ''
    is_undesirable_line |= len(set(line + '_ ')) == 2 # only chars are underscores and spaces e.g. ___ ______
    is_undesirable_line |= len(set(line + '- ')) == 2 # only chars are dashes and spaces e.g. -- --
    is_undesirable_line |= '******' in line
    is_undesirable_line |= prev_line == '\n'
    return is_undesirable_line

def is_list_item(prev_line, curr_line):
    """
    We want to remove arbitrary newline characters within the text. To do so we look at the length of each line
    and try to guess whether the newline charater is arbitrary or justified. If the line is part of a list then
    we leave the newline as it is.
    Patient records with interesting list patterns:
    xxx-yy: (1) formats with tab and spaces
    xxx-yy: only instance of #1: format
    xxx-yy: very short sentence formatting, which can be messed up by testing the line length
    xxx-yy: 1) a) i) formats
    xxx-yy: 1. a. formats interlaced and tables
    xxx-yy: 1.) format, items with ':', tables
    xxx-yy: only instance of #1 #2 format currently matched by the length < 50 but not the regexes
    """
    # the previous line has few characters, which is probably a list item, a date, name etc
    if len(prev_line) < 50:
        return True
    if curr_line.startswith('-') or curr_line.startswith('#'):
        return True
    if ':' in prev_line and ':' in curr_line:
        return True

    # Identify list format like 1) or (1) or 1. or 1.) 
    # as well as a) b) or a. b. and roman numerals i) ii) or (ii) or iii. or iii.
    # numbered lists go up to 15) and lettered lists go up to i)
    if len(re.findall(r'^\(?(\d+|[a-i])(\.|\)|:)',curr_line.split(' ')[0])) > 0:
        return True
    # roman numerals i) ii) or (ii) or iii. or iii.
    # roman numeral lists go up to iii in this dataset (no iv or v)
    if len(re.findall(r'^\(?i+(\.|\))',curr_line.split(' ')[0])) > 0:
        return True

    return False

mapping_table = str.maketrans({
    '’': "'",
    '·': '-',
    '–': ':',
    '“': '"',
    '”': '"',
    '‘': "'",
    '½': '1/2'
})

def clean_line(line):
    line = re.sub('\s{2,}', ' ', line) # remove consecutive white space within the line
    line.replace('&#8211;', '-') # encoding
    # remove dashes if there are too many consecutive dashes in the str
    # e.g. -----  ---- or ----- 
    if '----' in line: 
        line = line.replace('-','')
    if '_____' in line:
        line = line.replace('_','')
    # Unescape html symbol entities like &#8220;
    if '&#' in line: 
        line = html.unescape(line)
        line = line.translate(mapping_table)
    return line

In [None]:
def preprocess_raw_dataset():
    """
    Converts and process each xml file into a .txt file in a `clean` folder.
    Returns a list of each record represented as a dict.
    """
    dataset = []
    for filename in listdir(PHI_dataset):
        if filename.endswith('.xml'):
            patient_record = filename.split('.')[0]
            try:
                text, tags_dict, _ = extractXML(PHI_dataset, filename)
                # prepend the record id, e.g. xxx-yy at the very beginning to be used as identifier when injected in the training set
                lines = [f'{patient_record}\n']
                # Here we drop lines we don't want, remove empty space and inline text arbitrarily broken into multiple lines into a single line
                for line in (l.strip() for l in text.split('\n')):
                    if not is_line_to_remove(lines[-1], line):
                        line = clean_line(line)
                        if line == '':
                            break
                        # If it is the first line
                        if len(lines) == 1:
                            lines.append(line)
                        else:
                            prev_line = lines[-1]
                            # Prefix with newline when the previous line ends with a dot or if it is part of a list
                            if prev_line[-1] == '.' or is_list_item(prev_line, line):
                                line = '\n'+ line
                            # O.w. assume it is the continuation of the same sentence and 
                            # prefix with a white space to inline sentences that are originally formatted on multiple lines
                            else:
                                line = ' ' + line    
                            lines.append(line)
                clean_text = "".join(lines)
                dataset.append({'id':patient_record, 'text': clean_text, 'tags': tags_dict})

                with open(PHI_folder + 'clean/' + patient_record + '.txt', 'w') as f:
                    f.write(clean_text)
          
            except Exception as e: 
                print(filename, e)
    return sorted(dataset, key=lambda x: x['id'])


In [None]:
def preprocess_tags(dataset):
    """Extracts PHI tags. We didn't make use of tags in our current work."""
    for record in dataset:
        inlined_record_text = "".join(record['text'].split('\n')) # remove newlines because tags don't include them
        try:
          if type(record['tags']) != list:
            inlined_tag_list = []
            for tag_type, tag_list in record['tags'].items():
                if type(tag_list) is not list:
                    tag_list = [tag_list]
                for tag in tag_list:
                    new_tag = {
                        'type': tag_type,
                        'subtype': tag['@TYPE'],
                        'id': tag['@id'],
                        'value': clean_line(tag['@text'].strip()),
                    }

                    # Escape the pattern because some tags include parentheses like phone numbers
                    tag_start_idx = [[m.start(), m.end()] for m in re.finditer(re.escape(new_tag['value']), inlined_record_text)]
                    # Every tag should occur in the text
                    if len(tag_start_idx) == 0:
                        print(record['id'], new_tag)
                        raise Exception
                    new_tag['start_indices'] = tag_start_idx
                    inlined_tag_list.append(new_tag)
            record['tags'] = sorted(inlined_tag_list, key=lambda x: x['id'])
        except Exception as e:
            print(record['id'], e)

dataset = preprocess_raw_dataset()
preprocess_tags(dataset)

In our work, we experiment with duplicated notes to control for the effect of data duplication on memorization. Here we create a dataset without any data duplication. Data is duplicated for fine-tuning via the `create_dataset.py` script.

In [None]:
with open(PHI_folder + 'PHI_dataset.json', 'w') as f:
    f.write(json.dumps(dataset, indent=2))