# Step 1: paths + load data

In [21]:
## check paths exist  

import os 
gazzetteer_snomed = "./gazzeteer_snomedct_full.tsv" # TODO: path to the .tsv of snomed

# path to folder with .txt files
data_path = "" # TODO: path to folder with .txt files

print(os.path.exists(gazzetteer_snomed))
print(os.path.exists(data_path))

True
True
True


In [22]:
## load snomed.tsv

import pandas as pd

def load_snomed(gazzetteer_snomed_path):
    df_snomed = pd.read_csv(gazzetteer_snomed_path, sep="\t")
    return df_snomed


# load data 
def load_data(data_folder_path):
    # Initialize an empty list to store the data
    data = []
    # Loop through each file in the folder
    for file_name in os.listdir(data_folder_path):
        if file_name.endswith(".txt"):
            file_path = os.path.join(data_folder_path, file_name)
            
            # Open and read the content of the file
            with open(file_path, "r", encoding="utf-8") as file:
                content = file.read()
            
            # Append the file name and content to the list as tuple
            data.append({"id": file_name, "text": content})

    # Create a df from the list
    df_data = pd.DataFrame(data)
    # Display the df
    print(df_data.head(2))

    return df_data


In [None]:
df_data = load_data(data_path)
df_snomed =  load_snomed(gazzetteer_snomed)

In [None]:
len(df_data)

# Step 2: removal lists

In [25]:
# removal list for SNOMED: skip following semantic tag, so they will not be annotated

removal_list_1 = ['qualifier value', 'dose form', 'basic dose form', 'unit of presentation', 'intended site', 'record artifact',  'attribute',  'SNOMED RT+CTV3', 'navigational concept',  'foundation metadata concept', 'core metadata concept',  'administration method', 'link assertion',  'OWL metadata concept', 'release characteristic', 'namespace concept', 'nan', 'linkage concept', 'special concept',  'supplier',  'context-dependent category']

removal_list_2 =  [ 'occupation', 'environment', 'ethnic group', 'geographic location',  'religion/philosophy',  'person' 'racial group']

removal_list_3 = ['social concept',  'attribute', 'qualifier value', 'basic dose form', 'unit of presentation', 'intended site',  'attribute',  'SNOMED RT+CTV3', 'navigational concept',  'foundation metadata concept', 'core metadata concept',  'administration method', 'link assertion',  'OWL metadata concept', 'release characteristic', 'namespace concept', 'nan', 'linkage concept', 'special concept',  'supplier',  'context-dependent category']

removal_list_4= list(set(removal_list_2 + removal_list_3))

# Step 3: code

In [26]:
import re
import os
import unicodedata

def normalize_text(text):
    """
    Normalize the text to ensure consistent handling of accented chars
    """
    return unicodedata.normalize('NFC', text)

def dictionary_lookup_snomed(df_snomed, df_data, removal_list=None):
    annotations = []

    for index, row in df_data.iterrows():
        text = normalize_text(row['text'])  # Normalize the text to handle accents properly
        text_annotations = []
        matched_terms = set()  # Avoid duplicate annotations
        occupied_ranges = []

        for _, snomed_row in df_snomed.iterrows():
            term = snomed_row['term']
            code = snomed_row['code']
            semantic_tag = snomed_row['semantic_tag']

            # Skip terms with semantic tags present in the removal list
            if removal_list and semantic_tag in removal_list:
                continue

            # Normalize the SNOMED term to match the text properly
            term = normalize_text(term)

            # Match base term and account for hyphenated extensions
            pattern = r'\b' + re.escape(term) + r'([-\w]*)\b'
            matches = list(re.finditer(pattern, text, flags=re.IGNORECASE))

            for match in matches:
                start_idx = match.start()
                end_idx = match.end()

                # Get the full term matched, including additional parts (like "-brazo")
                full_annotation = text[start_idx:end_idx].strip()
                
                # Ensure we are capturing the full extent of terms like "tobillo-brazo"
                actual_start = text.find(full_annotation, start_idx)
                actual_end = actual_start + len(full_annotation)

                # Validate that the extracted span matches the term in the text
                if text[actual_start:actual_end] != full_annotation:
                    print(f"Warning: Span mismatch for term '{full_annotation}' at position {actual_start}:{actual_end}. Adjusting span.")
                    continue  # Skip mismatched annotations

                # Check for overlap with existing annotations
                if not is_overlap(actual_start, actual_end, occupied_ranges) and full_annotation not in matched_terms:
                    # Normalize semantic_tag by replacing spaces with underscores
                    semantic_tag_normalized = semantic_tag.replace(' ', '_')

                    # Track the annotation and prevent overlaps
                    text_annotations.append({
                        'start': actual_start,
                        'end': actual_end,
                        'term': full_annotation,
                        'code': code,
                        'label': f'SNOMED_{semantic_tag_normalized}',  # Use normalized semantic_tag
                        'original': term  # The exact term matched, like "tobillo"
                    })
                    occupied_ranges.append((actual_start, actual_end))
                    matched_terms.add(full_annotation)

        annotations.append({
            'index': index,
            'id': row['id'].replace('.txt', ''),
            'text': text,
            'annotations': text_annotations
        })

    return annotations

def is_overlap(start, end, occupied_ranges):
    """ Check if the annotation overlaps with existing ranges -- Important """
    return any((start < range_end and end > range_start) for range_start, range_end in occupied_ranges)

def create_brat_annotations(annotations, output_folder):
    """
    Creates .txt and .ann files in BRAT format for each annotation
    Adds a check for proper annotation spans
    """
    # Ensure the output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for annotation in annotations:
        text_id = annotation['id'].replace('.txt', '')  # Ensure 'id' doesn't contain '.txt'
        text = annotation['text']
        text_annotations = annotation['annotations']

        # Define file paths for both .txt and .ann files
        txt_file_path = os.path.join(output_folder, f'{text_id}.txt')
        ann_file_path = os.path.join(output_folder, f'{text_id}.ann')

        # Save the original text in a .txt file
        with open(txt_file_path, 'w', encoding='utf-8') as txt_file:
            txt_file.write(text)

        # Save the annotations in a .ann file
        with open(ann_file_path, 'w', encoding='utf-8') as ann_file:
            for i, ann in enumerate(text_annotations):
                start = ann['start']
                end = ann['end']
                term = ann['term']

                # Check if the annotated span matches the actual text
                if text[start:end] != term:
                    print(f"Warning: Span mismatch in file {text_id}. Expected '{term}', found '{text[start:end]}' at {start}:{end}")

                # Write BRAT-style annotation with the correct label
                ann_file.write(f"T{i+1}\t{ann['label']} {start} {end}\t{term}\n")

                # Annotator notes: write the corresponding SNOMED code and original annotation
                ann_file.write(f"#{i+1}\tAnnotatorNotes T{i+1}\tSNOMED annotation: {ann['original']}\tSNOMED_CODE: {ann['code']}\n")


# Fix encoding/span issues
def run_annotation_process(df_snomed, df_data, removal_lists, output_base_folder):
    """
    Runs the annotation process for different removal lists and creates output files
    """
    for list_number, removal_list in removal_lists.items():
        # Define output folder based on the removal list number
        output_folder = os.path.join(output_base_folder, f'gazzetteer_snomed_removal_list_{list_number}')

        # Run the dictionary lookup with the current removal list
        annotations = dictionary_lookup_snomed(df_snomed, df_data, removal_list)

        # Create the BRAT annotations for this run
        create_brat_annotations(annotations, output_folder=output_folder)

# Dictionary of removal lists to iterate 

removal_lists = {
    1: removal_list_1 , 
    2: removal_list_2,
    3: removal_list_3,
    4: removal_list_4
}

output_base_folder = "" # output path
run_annotation_process(df_snomed, df_data, removal_lists, output_base_folder)