In [13]:
import os
import re
import json
import shutil

In [14]:
import string

import nltk
from nltk.corpus import stopwords
from collections import Counter
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/tiagolima/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [15]:
data_dir = "../data/annotated_json_data/"

In [16]:
import sys
sys.path.append('../')
from config import entity_to_acronyms, acronyms_to_entities, colors

In [17]:
# Open the JSON file for reading
with open(os.path.join(data_dir, "annotated_data.json"), 'r') as f:

    # Load the JSON data into a dictionary
    data = json.load(f)

In [18]:
def remove_trailing_punctuation(token):
    """
    Removes trailing punctuation from a token.

    Args:
        token (str): A string representing the token to be cleaned.

    Returns:
        str: The cleaned token with trailing punctuation removed.
    """
    while token and re.search(r'[^\w\s\']', token[-1]):
        token = token[:-1]
        
    return token

In [19]:
def split_text(text):

    regex_match = r'[^\s\u200a\-\u2010-\u2015\u2212\uff0d]+'  # r'[^\s\u200a\-\—\–]+'

    tokens = []
    start_end_ranges = []

    sentence_breaks = []

    start_idx = 0
    
    for sentence in text.split('\n'):
        words = [match.group(0) for match in re.finditer(regex_match, sentence)]
        processed_words = list(map(remove_trailing_punctuation, words))
        sentence_indices = [(match.start(), match.start() + len(token)) for match, token in
                            zip(re.finditer(regex_match, sentence), processed_words)]

        # Update the indices to account for the current sentence's position in the entire text
        sentence_indices = [(start_idx + start, start_idx + end) for start, end in sentence_indices]

        start_end_ranges.extend(sentence_indices)
        tokens.extend(processed_words)

        sentence_breaks.append(len(tokens))

        start_idx += len(sentence) + 1
        # print("sentence, ".upper(),sentence)
    return tokens, start_end_ranges, sentence_breaks

In [20]:
# for doc_id, doc in data.items():
#     print(f"text: {doc['text'][:200]} split: {split_text(doc['text'][:200])} \n {'*' * 30}")

In [21]:
def tag_token(tokens, tags, token_pos, entity):
    """
    Modifies a list of tags by adding a tag label to a token at a given position in the list, based on the position of the 
    previous token and whether the current token has the same tag label as the previous token.

    Args:
    - tokens (list): A list of tokens in a sequence.
    - tags (list): A list of tag labels corresponding to the tokens in a sequence.
    - token_pos (int): The position of the token to tag.
    - entity (str): The tag label to add to the token.

    Returns:
    - tags (list): The modified list of tag labels.
    """
    
    stop_words = stopwords.words('portuguese')
    tag = entity_to_acronyms[entity]
    # print(f"tokens[token_pos] {tokens[token_pos]}\ttags[token_pos - 1]: {tags[token_pos - 1]}\ttokens[token_pos] not in stop_words: {tokens[token_pos] not in stop_words}")
    if token_pos > 0 and f'{tag}' in tags[token_pos - 1]:        
        tags[token_pos] = f'I-{tag}'
        # print(f"\t\t\t tag_token 01 tag: {tag} token_pos: {token_pos} tags[token_pos - 1]: {tags[token_pos - 1]} result: {tags[token_pos]}")
    elif tokens[token_pos] not in stop_words:
        tags[token_pos] = f'B-{tag}'
        # print(f"\t\t\t tag_token 02 tag: {tag} tokens[token_pos]: {tokens[token_pos]} not stop_words: {tokens[token_pos] not in stop_words} result: {tags[token_pos]}")
    else:
        pass
        # print(f"\t\t\t tag_token 03 tag: {tag} tokens[token_pos]: {tokens[token_pos]} result: {tags[token_pos]}")  
    return tags

In [22]:
def write_bio_files(output_file_path, tokens, tags, sentence_breaks):

    # Write the tags to a .bio file
    with open(output_file_path, 'w') as f:
        for i in range(len(tokens)):
            token = tokens[i].strip()
            if token:
                if i in sentence_breaks:
                    f.write("\n")
                f.write(f"{tokens[i]}\t{tags[i]}\n")

In [23]:
def convert_ann_to_bio(data, output_dir, filtered_entities=[]):
    summary = {}

    
    """
    Convert annotations from a dictionary of text files to a BIO-tagged sequence.

    Args:
        data (dict): A dictionary of text files where keys are file IDs and values are dictionaries containing 'text' and
            'annotations' keys.
        filtered_entities (list): A list of entity labels to include. If provided, only annotations with these labels will
            be converted to the BIO format. Defaults to an empty list.

    Returns:
        A tuple of two lists: tokens and tags.
        - tokens (list): A list of tokens in a sequence.
        - tags (list): A list of corresponding tags for each token in the sequence. Tags are BIO formatted.

    """
    
    if os.path.exists(output_dir):
        # Delete the contents of the directory
        shutil.rmtree(output_dir)
    # Recreate the directory
    os.makedirs(output_dir)
    
    
    for file_id in data:
        text = data[file_id]['text']
        annotations = data[file_id]['annotations']

      
        # print(text)
        # Tokenizing
        tokens, token2text, sentence_breaks = split_text(text)

        # Initialize the tags
        tags = ['O'] * len(tokens)

        ann_pos = 0
        token_pos = 0
        # for dd,dda in zip(tokens, token2text):
        #     print(dd,dda)
        # print("=" * 50)
        while ann_pos < len(annotations) and token_pos < len(tokens):

            label = annotations[ann_pos]['label']
            start = annotations[ann_pos]['start']
            end = annotations[ann_pos]['end']
            # print(f"label: {label}\tstart: {start}\tend:{end}")
            if filtered_entities:
                if label not in filtered_entities:
                    # increment to access next annotation
                    ann_pos += 1
                    continue
            
            ann_word = text[start:end]
            # print(f"\tann_word: {ann_word}")
            
            # find the next word that fall between the annotation start and end
            while token_pos < len(tokens) and token2text[token_pos][0] < start:
                token_pos += 1
            
            if tokens[token_pos] == ann_word or ann_word in tokens[token_pos] or re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos]):
                tag_token(tokens, tags, token_pos, label)
                # print(f"\t\tRULE_01 tk[token_pos]: {tokens[token_pos]}")
            elif ann_word in tokens[token_pos - 1] or  ann_word in tokens[token_pos - 1] or re.sub(r'\W+', '', ann_word) in re.sub(r'\W+', '', tokens[token_pos - 1]):
                tag_token(tokens, tags, token_pos - 1, label)
                # print(f"\t\tRULE_02 tk[token_pos]: {tokens[token_pos]}")
            else:
                pass
                # print(tokens[token_pos], tokens[token_pos - 1], ann_word, label)
            # print("\t\t",tokens[token_pos], tokens[token_pos - 1], ann_word, label)
            # increment to access next annotation
            ann_pos += 1

        # break
        # write to bio file
        # print("*" * 50)
        write_bio_files(os.path.join(output_dir, f"{file_id}.bio"), tokens, tags, sentence_breaks)
    print("Conversion complete")

In [25]:
output_dir='../data/bio_json_data'
if not os.path.exists(output_dir):
    os.makedirs(output_dir,  exist_ok=True, mode=0o777)

# convert_ann_to_bio(data, output_dir=output_dir, filtered_entities = []) 
# convert_ann_to_bio(data, output_dir=output_dir, filtered_entities = ['Age','Therapeutic_procedure', 'Date','Time']) 
#                                                               # 'Lab_value', 'Medication', 'Color', 'Age',
#                                                               # 'Clinical_event', 'Pregnancy', 'Pregnancy_history',
#                                                               # 'Therapeutic_procedure', 'Date',
#                                                               # 'Subject'])
convert_ann_to_bio(data, output_dir=output_dir
                   , filtered_entities=[
                                        'Sign_symptom',  'Diagnostic_procedure',
                                        'Lab_value' ,'Disease_disorder',
                                        'Pregnancy', 'Pregnancy_history',
                                        'Color', 'Subject', 
                                        'Age','Clinical_event',
                                        'Date', 'Time',
                                        'Therapeutic_procedure','Medication',
                                        'Biological_attribute'
                                       ]) 
                                                                  # 'Lab_value', 'Medication', 'Color', 'Age',
                                                                  # 'Clinical_event', 'Pregnancy', 'Pregnancy_history',
                                                                  # 'Therapeutic_procedure', 'Date',
                                                                  # 'Subject'])

Conversion complete
