In [1]:
import os
import re
import json
import nltk
import shutil

from nltk.corpus import stopwords
from collections import defaultdict

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\dever\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [2]:
data_dir = os.path.join("..", "annotated_dict")

In [3]:
entity_to_acronyms = {
    'Activity': 'ACT',
    'Administration': 'ADM',
    'Age': 'AGE',
    'Area': 'ARA',
    'Biological_attribute': 'BAT',
    'Biological_structure': 'BST',
    'Clinical_event': 'CLE',
    'Color': 'COL',
    'Coreference': 'COR',
    'Date': 'DAT',
    'Detailed_description': 'DET',
    'Diagnostic_procedure': 'DIA',
    'Disease_disorder': 'DIS',
    'Distance': 'DIS',
    'Dosage': 'DOS',
    'Duration': 'DUR',
    'Family_history': 'FAM',
    'Frequency': 'FRE',
    'Height': 'HEI',
    'History': 'HIS',
    'Lab_value': 'LAB',
    'Mass': 'MAS',
    'Medication': 'MED',
    'Nonbiological_location': 'NBL',
    'Occupation': 'OCC',
    'Other_entity': 'OTH',
    'Other_event': 'OTE',
    'Outcome': 'OUT',
    'Personal_background': 'PER',
    'Qualitative_concept': 'QUC',
    'Quantitative_concept': 'QUC',
    'Severity': 'SEV',
    'Sex': 'SEX',
    'Shape': 'SHA',
    'Sign_symptom': 'SIG',
    'Subject': 'SUB',
    'Texture': 'TEX',
    'Therapeutic_procedure': 'THP',
    'Time': 'TIM',
    'Volume': 'VOL',
    'Weight': 'WEI'
}
acronyms_to_entities = {v: k for k, v in entity_to_acronyms.items()}

In [4]:
json_file_path = os.path.join("..", "annotated_data.json")

if os.path.exists(json_file_path):
    try:
        with open(json_file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        print("Data loaded successfully.")
        print(data)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
else:
    print(f"The file {json_file_path} does not exist.")


Data loaded successfully.
{'15939911': {'text': "CASE: A 28-year-old previously healthy man presented with a 6-week history of palpitations.\nThe symptoms occurred during rest, 2–3 times per week, lasted up to 30 minutes at a time and were associated with dyspnea.\nExcept for a grade 2/6 holosystolic tricuspid regurgitation murmur (best heard at the left sternal border with inspiratory accentuation), physical examination yielded unremarkable findings.\nAn electrocardiogram (ECG) revealed normal sinus rhythm and a Wolff– Parkinson– White pre-excitation pattern (Fig.1: Top), produced by a right-sided accessory pathway.\nTransthoracic echocardiography demonstrated the presence of Ebstein's anomaly of the tricuspid valve, with apical displacement of the valve and formation of an “atrialized” right ventricle (a functional unit between the right atrium and the inlet [inflow] portion of the right ventricle) (Fig.2).\nThe anterior tricuspid valve leaflet was elongated (Fig.2C, arrow), whereas 

In [5]:
def remove_trailing_punctuation(token: str) -> str:
    """
    Removes trailing punctuation from a token.

    Args:
        token (str): The token from which to remove trailing punctuation.

    Returns:
        str: The token stripped of trailing punctuation.
    """
    while token and re.search(r'[^\w\s\']', token[-1]):
        token = token[:-1]
    return token

In [6]:
def split_text(text: str):
    """
    Splits the provided text into tokens, identifies the start and end indices of each token,
    and tracks where sentences break based on newline characters.

    Args:
        text (str): The text to be split.

    Returns:
        tuple: A tuple
    """
    # Defining regex pattern to match tokens
    regex_match = r'[^\s\u200a\-\u2010-\u2015\u2212\uff0d]+'

    tokens = list()
    start_end_ranges = list()
    sentence_breaks = list()

    start_idx = 0  # Starting index of each sentence in the overall text

    for sentence in text.split('\n'):
        # Find all matches in the sentence and process each match
        matches = list(re.finditer(regex_match, sentence))
        words = [match.group(0) for match in matches]
        processed_words = list(map(remove_trailing_punctuation, words))

        # Calculate start and end indices for each token
        sentence_indices = [(match.start(), match.start() + len(token)) 
                            for match, token in zip(matches, processed_words)]

        # Adjust indices based on the current position in the entire text
        sentence_indices = [(start_idx + start, start_idx + end) 
                            for start, end in sentence_indices]

        start_end_ranges.extend(sentence_indices)
        tokens.extend(processed_words)

        # Mark where each sentence ends in the list of tokens
        sentence_breaks.append(len(tokens))

        # Update start index to the position 
        start_idx += len(sentence) + 1 

    return tokens, start_end_ranges, sentence_breaks


In [7]:
for doc_id, doc in data.items():
    # Extracting the first 100 characters of the text for the current document
    text_preview = doc['text'][:100]
    
    tokens, indices, sentence_breaks = split_text(text_preview)
    
    print(f"Document ID: {doc_id}")
    print("Tokens:", tokens)
    print("Indices:", indices)
    print("Sentence Breaks:", sentence_breaks)
    break

Document ID: 15939911
Tokens: ['CASE', 'A', '28', 'year', 'old', 'previously', 'healthy', 'man', 'presented', 'with', 'a', '6', 'week', 'history', 'of', 'palpitations', 'The', 'symp']
Indices: [(0, 4), (6, 7), (8, 10), (11, 15), (16, 19), (20, 30), (31, 38), (39, 42), (43, 52), (53, 57), (58, 59), (60, 61), (62, 66), (67, 74), (75, 77), (78, 90), (92, 95), (96, 100)]
Sentence Breaks: [16, 18]


In [8]:
def tag_token(tokens: list, tags: list, token_pos: int, entity: str) -> list:
    """
    Tags a token based on its position in the sequence and its relationship to an entity using the BIO scheme.
    
    Args:
        tokens (list): The list of tokens from the text.
        tags (list): The current list of tags associated with these tokens.
        token_pos (int): The position index of the token to be tagged.
        entity (str): The named entity associated with the token.

    Returns:
        list: The updated list of tags after tagging the specified token.
    """
    # Convert stop words list to a set
    stop_words = set(stopwords.words('english'))

    tag = entity_to_acronyms[entity]

    # Tag as 'I-{tag}' if the previous token is part of the same entity
    if token_pos > 0 and f'I-{tag}' in tags[token_pos - 1]:
        tags[token_pos] = f'I-{tag}'
    # Tag as 'B-{tag}' if the token is not a stop word
    elif tokens[token_pos] not in stop_words:
        tags[token_pos] = f'B-{tag}'
    
    return tags

In [9]:
def write_bio_files(output_file_path: str, tokens: list, tags: list, sentence_breaks: list):
    """
    Writes tokens and their BIO tags to a file with each token and its tag separated by a tab.

    Args:
        output_file_path (str): The file path where the BIO formatted data will be written.
        tokens (list): A list of tokens to write.
        tags (list): A list of BIO tags corresponding to the tokens.
        sentence_breaks (list): A list of indices where sentences end.
    """
    with open(output_file_path, 'w', encoding='utf-8') as f:
        for i in range(len(tokens)):
            token = tokens[i].strip()  # Clean the token to remove unwanted spaces
            if token:  
                # Check if the current token is at the start of a new sentence
                if i in sentence_breaks and i != 0:  
                    f.write("\n")  
                f.write(f"{token}\t{tags[i]}\n") 

In [10]:
def convert_ann_to_bio(data, output_dir, filtered_entities=list()):
    """
    Converts annotations to the BIO tagging format and writes them to files.

    Args:
        data (dict): Dictionary containing texts and their annotations.
        output_dir (str): Directory to save the output BIO formatted files.
        filtered_entities (list): List of entity labels to ignore.
    """
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir)

    for file_id, content in data.items():
        text = content['text']
        annotations = [ann for ann in content['annotations'] if not filtered_entities or ann['label'] in filtered_entities]
        
        tokens, token2text, sentence_breaks = split_text(text)
        tags = ['O'] * len(tokens)

        # Match annotations to tokens and tag them
        for ann in annotations:
            ann_start, ann_end, label = ann['start'], ann['end'], ann['label']
            # Iterate through token positions and indices
            for idx, (token_start, token_end) in enumerate(token2text):
                if token_start >= ann_start and token_end <= ann_end:
                    # Apply tagging logic here
                    prefix = 'B-' if token_start == ann_start else 'I-'
                    tags[idx] = f"{prefix}{label}"

        # Write to BIO formatted file
        bio_file_path = os.path.join(output_dir, f"{file_id}.bio")
        write_bio_files(bio_file_path, tokens, tags, sentence_breaks)

    print("Conversion complete.")

data_dir = os.path.join("..", "annotated_dict")
convert_ann_to_bio(data, data_dir)

Conversion complete.
