# MetaCAT - Training biLSTM

In [1]:
import numpy as np
import pandas as pd
from medcat.tokenizers.meta_cat_tokenizers import TokenizerWrapperBPE
from medcat.config_meta_cat import ConfigMetaCAT
from medcat.meta_cat import MetaCAT
from pathlib import Path
import json

In [2]:
# Input
data_dir = Path.cwd().parents[0] / 'data'
annotation_file = data_dir / 'emc-dcc_ann.json'
split_list_file = data_dir / 'split_list.json'
model_dir = Path.cwd().parents[0] / 'models' / 'bilstm'
embeddings_file = model_dir / 'embeddings.npy'

# Output
annotations_split_dir = data_dir / 'annotations_split'
models_split_dir = model_dir / 'model_splits'
result_dir = Path.cwd().parents[0] / 'results'
test_result_file = result_dir / 'bilstm_results_folds.csv.gz'

# Create output dir
annotations_split_dir.mkdir(exist_ok=True)
models_split_dir.mkdir(exist_ok=True)

config_metacat = ConfigMetaCAT()
config_metacat.general['category_name'] = 'Negation'
config_metacat.train['nepochs'] = 10

## Load Tokenizer and embeddings matrix

In [3]:
tokenizer = TokenizerWrapperBPE.load(model_dir)
embeddings = np.load(embeddings_file)

## Split annotation file

In [4]:
# Load annotated data
with open(annotation_file) as f:
    annotations = json.load(f)
    
# Load split lists
with open(split_list_file) as f:
    split_lists = json.load(f)

In [5]:
# Select a single list for testing
split_list = split_lists[0:2]

for split_list in split_lists:
    train_annotations = []
    test_annotations = []

    for document in annotations['projects'][0]['documents']:
        if document['name'] in split_list['train']:
            train_annotations.append(document)
        elif document['name'] in split_list['test']:
            test_annotations.append(document)
    #     else:
    #         print(f'{document["name"]} not found in either train or test')

    # Create an annotation file for the split following MetaCAT's annotation format
    project_train_annotations = {'projects': [{'documents': train_annotations}]}
    project_test_annotations = {'projects': [{'documents': test_annotations}]}

    # Write output files
    train_output_file = annotations_split_dir / f'train_annotations_{split_list["split_id"]}.json'
    with open(train_output_file, "w") as fp:
        json.dump(project_train_annotations, fp)

    test_output_file = annotations_split_dir / f'test_annotations_{split_list["split_id"]}.json'
    with open(test_output_file, "w") as fp:
        json.dump(project_test_annotations, fp)

## Train biLSTM from training sets

In [None]:
# List to store results of individual folds
test_result_list = []

for train_file in annotations_split_dir.rglob("train_annotations_*.json"):
    print(train_file)
    split_id = train_file.stem.split('_')[2]
    split_dir = models_split_dir / split_id
    split_dir.mkdir(exist_ok=True)
    
    # Initiate MetaCAT
    meta_cat = MetaCAT(tokenizer=tokenizer, config=config_metacat)
    
    # Train model
    train_results = meta_cat.train(json_path=train_file, save_dir_path=str(models_split_dir))
    
    # Test model
    test_file = train_file.parent / train_file.name.replace('train_annotations_', 'test_annotations_')
    test_results = meta_cat.eval(json_path=test_file)
    
    # Save test results
    test_result_list.append([split_id,
                             len(test_results['examples']['TP']['negated']),
                             len(test_results['examples']['FP']['negated']),
                             len(test_results['examples']['FN']['negated'])])

In [7]:
test_results = pd.DataFrame(test_result_list, columns=['split_id', 'tp', 'fp', 'fn'])
test_results

Unnamed: 0,split_id,tp,fp,fn
0,6,129,4,10
1,7,170,8,18
2,0,172,17,17
3,1,159,9,15
4,2,158,10,17
5,3,150,13,17
6,8,165,9,18
7,4,158,11,16
8,5,149,4,35
9,9,160,9,27


In [18]:
def calculate_recall(row):
    tp = row.tp
    fp = row.fp
    fn = row.fn
    recall = round(tp / (tp + fn), 2)
    return recall

def calculate_precision(row):
    tp = row.tp
    fp = row.fp
    fn = row.fn
    precision = round(tp / (tp + fp), 2)
    return precision

def calculate_f1(row):
    tp = row.tp
    fp = row.fp
    fn = row.fn
    f1 = round((2*tp) / ((2*tp) + fp + fn), 2)
    return f1

    print(f'recall: {recall}')
    print(f'precision: {precision}')
    print(f'f1: {f1}')

test_results['recall'] = test_results.apply(calculate_recall, axis=1)
test_results['precision'] = test_results.apply(calculate_precision, axis=1)
test_results['f1'] = test_results.apply(calculate_f1, axis=1)

test_results

Unnamed: 0,split_id,tp,fp,fn,recall,precision,f1
0,6,129,4,10,0.93,0.97,0.95
1,7,170,8,18,0.9,0.96,0.93
2,0,172,17,17,0.91,0.91,0.91
3,1,159,9,15,0.91,0.95,0.93
4,2,158,10,17,0.9,0.94,0.92
5,3,150,13,17,0.9,0.92,0.91
6,8,165,9,18,0.9,0.95,0.92
7,4,158,11,16,0.91,0.93,0.92
8,5,149,4,35,0.81,0.97,0.88
9,9,160,9,27,0.86,0.95,0.9


In [19]:
test_results.to_csv(test_result_file, index=False, compression='gzip')