# Regex/Rule Based Parsing - Baseline
Here we present ways to parse the data based on regular expressions, with manually defined rules.

How rules are created:
For each category (respiratory, abdominal, skin and soft tissues, etc) repeat the following:
1. Filter for indications associated with this category
2. Sort by occurrence of the indication (descending)
3. Pick the top k (e.g. 10) indications that look different
4. Assemble rules based on the words (just look for occurences)

How to filter:
1. For each category, repeat matching
2. Match using fuzzy matching to allow for spelling mistakes (e.g. allow for a string distance to be > 0)

Install and load libraries

In [1]:
import math
import regex
import os
import pandas as pd

from pathlib import Path
from pprint import pprint

from sklearn.metrics import f1_score, roc_auc_score, average_precision_score

print(os.getcwd())

/home/kevin/DPhil/Projects/EHR-Indication-Processing/02_Models/01_Baseline/Regex


## Specify parameters and import data

Set default parameters and data paths

In [2]:
# --- Model parameters
model_name = "Regex"

# Global regex parameters
default_error = 0.1 # percent error allowed (0.1 = 10%)
default_error_max = 2 # max number of errors allowed
default_l_boundary = r"\b"
default_r_boundary = r""

# --- Paths
# Base data path
base_data_path = Path("../../../00_Data/")
# Dataset Path (training, testing, etc.)
dataset_path =  base_data_path / "publication_ready"
# Export Path (model checkpoints, predictions, etc.)
export_path = base_data_path / "model_output" / model_name

assert base_data_path.is_dir(),\
  f"{base_data_path} either doesn't exist or is not a directory."
export_path.mkdir(exist_ok=True)

# --- Misc settings
# Print dataframes and keys to debug
debug = True

Import the data and join the labelled training data with number of occurrences

In [3]:
# Import data --> upload into "Files" on the left-hand panel
train_eval_df = pd.read_csv(
    dataset_path / 'training_oxford_2023-08-23.csv',
    dtype={"Indication": str},
    keep_default_na=False,
    na_values=["NA"],
)

train_eval_counts_df = pd.read_csv(
    dataset_path / 'training_oxford_counts_2023-08-23.csv',
    dtype={"Indication": str, "Count": int},
    keep_default_na=False,
    na_values=["NA"],
)

test_oxford_df = pd.read_csv(
    dataset_path / 'testing_oxford_2023-08-23.csv',
    dtype={"Indication": str},
    keep_default_na=False,
    na_values=["NA"],
)

test_banbury_df = pd.read_csv(
    dataset_path / 'testing_banbury_2023-08-23.csv',
    dtype={"Indication": str},
    keep_default_na=False,
    na_values=["NA"],
)

test_set_raw = {
    "Oxford": test_oxford_df,
    "Banbury": test_banbury_df,
}

In [4]:
# Join the annotated data with indication occurance countsabout:blank#blocked
indication_annotated_counts = (
    train_eval_counts_df[["Indication", "Count"]]
    .merge(
        train_eval_df,
        left_on="Indication",
        right_on="Indication",
        how="left",
        validate="one_to_one",
))

## Find the vocabulary

For each category, get the top 50 indications

In [5]:
categories = ['urinary', 'respiratory', 'abdominal',
       'neurological', 'skin_soft_tissue', 'ent', 'orthopaedic',
       'other_specific', 'no_specific_source', 'prophylaxis']

top_k_indications = 30

# Get the top k indications for each category
top_indication_by_category = dict()

for single_category in categories:
    top_indication_by_category[single_category] = (
        indication_annotated_counts[indication_annotated_counts[single_category] == 1]
        .sort_values(by="Count", ascending=False)
        .Indication
        .apply(lambda x: x.replace('?', '').strip()) # Remove ? from all cells
        .apply(lambda x: x.replace('/', ' ').strip()) # Split words by "/"
        .drop_duplicates(keep="first") # Drop duplicates, keep the first
        .head(top_k_indications)
        .values
    )

top_indication_by_category = pd.DataFrame(top_indication_by_category)

if debug:
    display(top_indication_by_category)

Unnamed: 0,urinary,respiratory,abdominal,neurological,skin_soft_tissue,ent,orthopaedic,other_specific,no_specific_source,prophylaxis
0,uti,lrti,cholecystitis,cns infection,cellulitis,tonsilitis quinsy,osteomyelitis,dental abscess,perioperative prophylaxis,perioperative prophylaxis
1,urosepsis,cap,diverticulitis,meningitis,wound infection,tonsillitis,pji,oral thrush,sepsis,prophylaxis
2,pyelonephritis,chest infection,appendicitis,encephalitis,dog bite,quinsy,bone infection,endometritis,prophylaxis,intra-partum prophylaxis
3,cauti,hap,pid,brain abscess,diabetic foot infection,tonsilitis,om,chorioamnionitis,infection,post op
4,uti prophylaxis,pneumonia,biliary sepsis,cerebral abscess,3rd degree tear,sinusitis,open fracture,pprom,intra-partum prophylaxis,transplant prophylaxis
5,epididymo-orchitis,iecopd,cholangitis,ventriculitis,foot infection,supraglottitis,septic arthritis,dental infection,post op,pcp prophylaxis
6,catheter associated uti,chest sepsis,h pylori eradication,csf infection,cat bite,pinna cellulitis,joint infection,ie,transplant prophylaxis,post-op
7,e coli uti,pcp prophylaxis,abdo sepsis,subdural empyema,breast abscess,perichondritis,discitis,pre ppm,neutropenic sepsis,surgical prophylaxis
8,e.coli uti,aspiration pneumonia,perianal abscess,empiric cns neurosurg,mastitis,post tonsillectomy bleed,bone inf,ppm,post-op,cmv prophylaxis
9,urinary tract infection,empyema,abdominal sepsis,meningoencephalitis,facial cellulitis,noe,infected tkr,endocarditis,sepsis source,post op prophylaxis


Export the table as XLSX file with one category per sheet

In [6]:
# Create a Pandas Excel writer using XlsxWriter as the engine.
rule_sheet_generated_path = export_path / "regex_rules_sheet.xlsx"

with pd.ExcelWriter(rule_sheet_generated_path, engine="xlsxwriter") as writer:
    # Write each category to a different worksheet.
    for category_name in top_indication_by_category.columns:
        """Extract and add extra columns.
            1. Extract the category, rename the series to "Indication" and convert to a dataframe
            2. Add extra columns 
                - "Error" to indicate the amount of allowed errors
                - "L_Boundary" and "R_Boundary" to indicate the left and right word boundaries
                - "Exclude" to indicate if the word should be excluded from the regex
        """
        category_sheet = top_indication_by_category[category_name]\
            .rename("Indication")\
            .to_frame()\
            .assign(Error="", L_Boundary="", R_Boundary="", Exclude="")

        
        """Pre populate the table.
        If a substring of the indication has appeared before, mark the row to be excluded.
        """

        previous_words = set()
        # Iterate through each row in the table
        for index, row in category_sheet.iterrows():
            indication = row['Indication']
            
            # Check if the indication contains any word from previous columns
            exclude = any(word in previous_words for word in indication.split())
            
            # Set the 'Exclude' column value for the current row
            category_sheet.at[index, 'Exclude'] = 1 if exclude else None
            
            # Update the set of previous words with the words from the indication
            previous_words.add(indication)


        category_sheet.to_excel(
            writer, sheet_name=category_name, index=False
        )

## Annotate the strings
Annotate the exported file and save the changes into `regex_rules_sheet_annotated.xlsx`

Things to incorporate:
- Fuzzy matches, but for abbreviations don't allow for fuzzyness
- Partial matches, in case they were truncated
- Full word matches, or they can't be in the middle of the word (e.g. UTI can't be part of a word)

Notes:
- Export the table, make one file for each indication
- Columns should be:
    - Indication
    - \b start and \b end
    - Fuzzyness flags

The import the files, and create a big regex query based on that


## Build Regex String from annotated file

Load the excel file, returns a dict of dataframes with the individual sheets.

In [7]:
rule_sheet_annotated_path = export_path / "regex_rules_sheet_annotated.xlsx"
regex_rule_sheets = pd.read_excel(rule_sheet_annotated_path, sheet_name=None, index_col=None)

if debug:
    display(regex_rule_sheets.keys())

dict_keys(['urinary', 'respiratory', 'abdominal', 'neurological', 'skin_soft_tissue', 'ent', 'orthopaedic', 'other_specific', 'no_specific_source', 'prophylaxis'])

Convert the rules to regex strings and the compile into patterns (allows for faster matching).

Create a dictionary with patterns for each category

In [8]:
regex_pattern_dict = dict()

for individual_category in regex_rule_sheets.keys():
    # Extract an individual sheet
    rule_sheet_individual = regex_rule_sheets[individual_category]

    regex_pattern_list = []
    for _, row in rule_sheet_individual.iterrows():
        # Skip the row if it is marked to be excluded
        if row['Exclude'] == 1:
            continue
        
        # Populate with individual regex patterns
        str_indication = row['Indication']
        num_error = (default_error * len(str_indication)) if pd.isna(row['Error']) else row['Error']
        num_error = min(math.ceil(num_error), default_error_max)

        pat_l_boundary = default_l_boundary if pd.isna(row['L_Boundary']) else r'\b'
        pat_r_boundary = default_r_boundary if pd.isna(row['R_Boundary']) else r'\b'

        regex_pattern_list += [fr'(?:{pat_l_boundary}{str_indication}{pat_r_boundary}){{e<={num_error}}}']

    regex_pattern_dict[individual_category] = regex.compile("|".join(regex_pattern_list))

if debug:
    pprint(regex_pattern_dict)

{'abdominal': regex.Regex('(?:\\bcholecystitis){e<=2}|(?:\\bdiverticulitis){e<=2}|(?:\\bappendicitis){e<=2}|(?:\\bpid){e<=1}|(?:\\bbiliary sepsis){e<=2}|(?:\\bcholangitis){e<=2}|(?:\\bh pylori eradication){e<=2}|(?:\\babdo sepsis){e<=2}|(?:\\bperianal abscess){e<=2}|(?:\\babdominal sepsis){e<=2}|(?:\\bintra-abdominal sepsis){e<=2}|(?:\\bcolitis){e<=1}|(?:\\bsplenectomy){e<=2}|(?:\\bc diff){e<=1}|(?:\\bsuspected c. diff){e<=2}|(?:\\bintra-abdominal infection){e<=2}|(?:\\bliver abscess){e<=2}|(?:\\bpd peritonitis){e<=2}|(?:\\bintraabdominal sepsis){e<=2}|(?:\\bh pylori){e<=1}|(?:\\bsbp){e<=1}|(?:\\bintrabdominal sepsis){e<=2}|(?:\\bpelvic collection){e<=2}|(?:\\babdominal collection){e<=2}|(?:\\bintra-abdo sepsis){e<=2}|(?:\\babdominal infection){e<=2}|(?:\\bc.diff){e<=1}|(?:\\bintra abdo sepsis){e<=2}|(?:\\bperitonitis){e<=2}|(?:\\boesophageal candidiasis){e<=2}|(?:\\basplenia){e<=1}|(?:\\bintra-abdominal collectio){e<=2}|(?:\\bpancreatitis){e<=2}|(?:\\bintraabdominal infection){e<=2}|(

## Additional Rules
Additional rules for non-source categories.
These include `uncertainty` and `not informative`.

Uncertainty:
- Presence of "?", "/" or ","
- Multiple sources listed

Not infomrative:
- If no source has been detected, we classify it as "not infomrative"

N.B: Some of these rules have to be performed before the Regex string matching, some after

In [9]:
uncertainty_pattern = regex.compile(r"\?|/|suspected|possible|probable|likely")
uncertainty_pattern

regex.Regex('\\?|/|suspected|possible|probable|likely', flags=regex.V0)

## Evaluate the model per class and overall
Test the performance of these Regex rules on the test-set.
Run all rules and processing steps

In [10]:
# Helper function to extract the matched pattern
def match_pattern(x, regex_pattern):
    if match_obj := regex_pattern.search(x):
        return match_obj.group()
    else:
        return None

categories_specific = [
      'urinary',
      'respiratory',
      'abdominal',
      'neurological',
      'skin_soft_tissue',
      'ent',
      'orthopaedic',
      'other_specific',
      'prophylaxis'
 ]

def run_regex(input_indications: pd.Series):
        # Create output df
        prediction_df = pd.DataFrame({"Indication": input_indications})

        # --- Run uncertainty pattern check
        prediction_df["uncertainty"] = prediction_df.Indication.apply(lambda x: match_pattern(x, uncertainty_pattern))

        # Split words and remove uncertainty markers
        prediction_df["Indication"] = (prediction_df["Indication"]
                .apply(lambda x: x.replace('?', '').strip()) # Remove ? from all cells
                .apply(lambda x: x.replace('/', ' ').strip()) # Split words by "/"
        )

        # --- Run the regex rules on the columns
        for single_category in categories:
                # Get the pattern for the current category
                regex_pattern = regex_pattern_dict[single_category]
                # Applyt the regex and save back
                prediction_df[single_category] = prediction_df.Indication.apply(lambda x: match_pattern(x, regex_pattern))

        # Reorder columns
        prediction_df = prediction_df[['Indication'] + categories + ['uncertainty']]

        # --- Apply last `uncertainty` and `not informative` rule
        # Change value of "uncertainty" column to "multiple entries" if there are multiple entries
        prediction_df.loc[prediction_df[categories_specific].notna().sum(axis=1) > 1, "uncertainty"] = "multiple entries"

        # Not informative indicator
        prediction_df["not_informative"] = prediction_df[categories].apply(
                lambda x: True if sum(~x.isna()) == 0 else None,  # If no entries, then True
                axis=1
        )

        return prediction_df

In [11]:
# Metrics function
def calculate_metrics(y_true, predictions_probs,
                      predictions_binarised,
                      labels, 
                      result_precision=2, 
                      averaging_method = "weighted",
    ):
    # Calculate per class metrics
    scores_per_class = {}
    scores_per_class["F1-Score"] = f1_score(y_true=y_true, y_pred=predictions_binarised, average=None)
    # scores_per_class["ROC AUC"] = roc_auc_score(y_true=y_true, y_score=predictions_probs, average=None)
    # scores_per_class["PR AUC"] = average_precision_score(y_true=y_true, y_score=predictions_probs, average=None)

    scores_per_class = pd.DataFrame.from_dict(scores_per_class,orient='index', columns=labels)
    
    # Calculate average metrics
    scores_average = {}
    scores_average["F1-Score"] = f1_score(y_true=y_true, y_pred=predictions_binarised, average=averaging_method)
    # scores_average["ROC AUC"] = roc_auc_score(y_true=y_true, y_score=predictions_probs, average=averaging_method)
    # scores_average["PR AUC"] = average_precision_score(y_true=y_true, y_score=predictions_probs, average=averaging_method)

    # Format into printable string
    metrics_string = ""
    for score_name, avg_score_value in scores_average.items():
        avg_score = avg_score_value.round(result_precision)
        min_sore = scores_per_class.loc[score_name].min().round(result_precision)
        max_score = scores_per_class.loc[score_name].max().round(result_precision) 
        metrics_string += f"{score_name}: {avg_score} ({min_sore}-{max_score})\n"
    
    return scores_per_class, scores_average, metrics_string

In [12]:
output_cols = [label for label in train_eval_df if not label=="Indication"]

for test_location, test_dataset in test_set_raw.items():
    print("Test set:", test_location)

    #--- Run inference on test set
    predictions_words = run_regex(test_dataset["Indication"])
    # Binarise the predictions
    predictions_binarised = predictions_words.drop(columns=['Indication']).notna().astype(int)

    # Get true labels
    y_test_true = test_dataset[output_cols]

    # Calculate metrics
    scores_per_class, scores_average, metrics_string = \
        calculate_metrics(y_test_true, predictions_binarised, predictions_binarised, output_cols, 
                          averaging_method="weighted")
    
    # Print metrics
    pd.set_option('display.precision', 2)
    print(scores_per_class)
    print(metrics_string)

    # Save predictions for further analysis
    predictions_binarised.to_csv(export_path/f"predictions_{model_name}_{test_location}.csv", index=False)

Test set: Oxford
          urinary  respiratory  abdominal  neurological  skin_soft_tissue  \
F1-Score     0.87         0.63       0.63          0.68              0.76   

           ent  orthopaedic  other_specific  no_specific_source  prophylaxis  \
F1-Score  0.54          0.3            0.11                0.78         0.93   

          uncertainty  not_informative  
F1-Score          0.2              0.0  
F1-Score: 0.71 (0.0-0.93)

Test set: Banbury
          urinary  respiratory  abdominal  neurological  skin_soft_tissue  \
F1-Score     0.96         0.73       0.35          0.78               0.9   

          ent  orthopaedic  other_specific  no_specific_source  prophylaxis  \
F1-Score  0.3         0.25            0.03                0.81         0.95   

          uncertainty  not_informative  
F1-Score          0.3              0.0  
F1-Score: 0.74 (0.0-0.96)

