In [None]:
import os
import random
import math
import pandas as pd
from tqdm import tqdm
import nltk
import spacy
nlp = spacy.load("es_core_news_lg")
import tensorflow as tf
from transformers import AutoTokenizer, AutoConfig, TFAutoModelForTokenClassification, TFTrainer, TFTrainingArguments
from transformers.utils import logging as hf_logging
from seqeval.metrics.sequence_labeling import performance_measure, precision_recall_fscore_support, accuracy_score, classification_report

## Parameters

In [None]:
bioes = True # BIOES/BIO labels
task1 = True # Task1 / Task2
plus = True # Use ProfNER for Task 1
positives = True # Only positives for training

## BERT model

In [None]:
model_name = "dccuchile/bert-base-spanish-wwm-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
max_seq_length = tokenizer.model_max_length

### Transform Brat annotation format to BIO/BIOES tag schema

In [None]:
def get_labels(files):
    labels = set()
    for file_name in files:
        if os.path.exists(file_name + ".ann"):
            with open(file_name + ".ann", "rb") as file:
                annotations = file.read().decode().splitlines()
            for ann in annotations:
                if ann[0] == 'T':
                    labels.add(ann.split("\t", maxsplit=2)[1].split(" ", maxsplit=1)[0])
    return list(labels)

def divide_text(text, max_seq_length):
    # Divide into multiple sentences until words > max_seq_length
    sentences = []
    text_sentences = nlp(text)
    maximum = 0
    initial = 0
    for sentence in text_sentences.sents:
        len_sentence = sum([len(tokenizer._tokenizer.encode_batch([word.text], add_special_tokens=False)[0].tokens) for word in sentence])
        maximum += len_sentence
        if maximum >= max_seq_length:
            # Agregate current sentence if greater than maximum
            if len_sentence >= max_seq_length:
                # Agregate previous sentences
                if maximum != len_sentence:
                    final = sentence.start_char
                    sentences.append([initial, final])
                    initial = final
                maximum = 0
                final = sentence.end_char
            else:
                maximum = len_sentence
                final = sentence.start_char
            sentences.append([initial, final])
            initial = final
    # Agregate the last sentences
    if initial != len(text):
        sentences.append([initial, len(text)])
    return sentences

def brat2BIO(file_name, labels, bioes=True, folder=""):
    # Load text and annotation files
    with open(file_name + ".txt", "rb") as file:
        text = file.read().decode()
    entities = []
    if os.path.exists(file_name + ".ann"):
        with open(file_name + ".ann", "rb") as file:
            annotations = file.read().decode().splitlines()
        # Extract the entities from the annotation file
        for ann in annotations:
            if ann[0] == 'T':
                idx, entity_label, name = ann.split("\t", maxsplit=2)
                entity_label, offsets = entity_label.split(" ", maxsplit=1)
                offsets = [[int(offset) for offset in pair.split(" ")] for pair in offsets.split(';')]
                off_start, off_end = [min(offset[0] for offset in offsets), max(offset[1] for offset in offsets)]
                entities.append([int(idx[1:]), entity_label, off_start, off_end, text[off_start:off_end]])
        # Deoverlap nested entities for each class
        ents = []
        for label in labels:
            offsets = [[e[2],e[3]] for e in entities if e[1]==label]
            overlaps = [[o1, [o2 for o2 in offsets if not o1 == o2 and o1[0] < o2[1] and o1[1] > o2[0]]] for o1 in offsets]
            if [o for o in overlaps if o[1]]:
                ranges = [max([overlap[0]]+[o for o in overlap[1]], key=lambda x: x[1]-x[0]) for overlap in overlaps]
                offsets = [[r[0],r[1]] for r in set([(r[0],r[1]) for r in ranges])]
            ents.append(offsets)
        entities = ents
    # Clean the texts
    text = text.lower().replace("\n"," ")
    # Divide text into sentences
    sentences = divide_text(text, max_seq_length)
    # Annotate each token with BIO/BIOES schema    
    for l, label in enumerate(labels):
        for start_sentence, end_sentence in sentences:
            new_tokens = []
            for word in nlp(text[start_sentence:end_sentence]):
                tokenized = tokenizer._tokenizer.encode_batch([word.text], add_special_tokens=False)[0]
                for token, offset in zip(tokenized.tokens, tokenized.offsets):
                    if token.startswith("##"):
                        new_tokens[-1][1] = word.idx+offset[1]
                    else:
                        new_tokens.append([word.idx+offset[0], word.idx+offset[1]])
            new_tokens = [[text[start_sentence:end_sentence][token_start:token_end], token_start, token_end, "O"] for token_start, token_end in new_tokens]
            if entities:
                tokens = []
                offsets = [(start_entity-start_sentence, end_entity-start_sentence) for start_entity, end_entity in entities[l] if start_sentence<=start_entity and end_sentence>=end_entity]
                for token_text, token_start, token_end, token_label in new_tokens:
                    # Check if the token is annotated
                    for entity_start, entity_end in offsets:
                        # Match between token and entity offsets
                        if token_start < entity_end and token_end > entity_start:
                            if token_start == entity_start and token_end == entity_end:
                                token_label = ("S-" if bioes else "B-") + label
                            elif token_start == entity_start and token_end < entity_end:
                                token_label = "B-" + label
                            elif token_start > entity_start and token_end == entity_end:
                                token_label = ("E-" if bioes else "I-") + label
                            else: # token_start > entity_start and token_end < entity_end
                                token_label = "I-" + label
                            break
                    tokens.append([token_text, token_start, token_end, token_label])
            else:
                tokens = new_tokens
            pd.DataFrame(tokens, columns = ["Token","Offset start","Offset end","Label"]).to_csv(os.path.join(folder, file_name.split("\\")[-1] + "~" + label + "~" + str(start_sentence) + ".csv"), index=False, encoding="utf-16")

In [None]:
# Labels Task 1
folder = os.path.join("meddoprof_training_set", "task1")
if os.path.exists(folder):
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    labels_task1 = get_labels(files)
    print(folder + " (documents = " + str(len(files)) + "):")

# Training set
folderTrain = "processedTrain_task1"
if not os.path.exists(folderTrain):
    os.mkdir(folderTrain)
    folder = os.path.join("meddoprof_training_set", "task1")
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task1, bioes=bioes, folder=folderTrain)

# Training and Valid sets PROFNER
folderTrain = "processedTrain_task1_plus"
if not os.path.exists(folderTrain):
    os.mkdir(folderTrain)
    folder = os.path.join("ProfNER","DATA", "subtask-2", "brat", "train")
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task1, bioes=bioes, folder=folderTrain)
    folder = os.path.join("ProfNER","DATA", "subtask-2", "brat", "valid")
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task1, bioes=bioes, folder=folderTrain)

# Test set
folderTest = "processedTest_task1"
if not os.path.exists(folderTest):
    os.mkdir(folderTest)
    folder = "meddoprof_test_set"
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task1, bioes=bioes, folder=folderTest)

# Labels Task 2
folder = os.path.join("meddoprof_training_set", "task2")
if os.path.exists(folder):
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    labels_task2 = get_labels(files)
    print(folder + " (documents = " + str(len(files)) + "):")

# Training set
folderTrain = "processedTrain_task2"
if not os.path.exists(folderTrain):
    os.mkdir(folderTrain)
    folder = os.path.join("meddoprof_training_set", "task2")
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task2, bioes=bioes, folder=folderTrain)

# Test set
folderTest = "processedTest_task2"
if not os.path.exists(folderTest):
    os.mkdir(folderTest)
    folder = "meddoprof_test_set"
    files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".txt")]
    for file_name in tqdm(files):
        brat2BIO(file_name, labels_task2, bioes=bioes, folder=folderTest)

### BERT implementation

In [None]:
def extract_dataset(csv_files, tokenizer, max_seq_length, model_name, label2id):
    # Initialize feature list
    features = []
    # Maximum sequence length
    special_tokens = tokenizer.num_special_tokens_to_add()
    # Special tokens
    cls_token_id = tokenizer.cls_token_id
    sep_token_id = tokenizer.sep_token_id
    cls_token_at_end = "xlnet" in model_name.lower()
    sep_token_extra = "roberta" in model_name.lower()
    # IDs for mask and token type
    token_mask_id = 1#tokenizer.token_mask_id
    token_type_id = 0#tokenizer.token_type_id
    # IDs for padding
    pad_token_id = tokenizer.pad_token_id
    pad_token_label_id = -100#tokenizer.pad_token_label_id
    pad_token_mask_id = 0#tokenizer.pad_token_mask_id
    pad_token_type_id = tokenizer.pad_token_type_id
    left_pad = tokenizer.padding_side=="left"
    right_pad = tokenizer.padding_side=="right"
    for csv_file in tqdm(csv_files):
        # Initialize word and label ids
        input_ids = []
        label_ids = []
        # Tokenize each word in the csv file
        for word, label in pd.read_csv(csv_file, keep_default_na=False, encoding = "utf16")[["Token","Label"]].values.tolist():
            tokens = tokenizer.tokenize(word)
            if len(tokens) > 0:
                input_ids += tokenizer.convert_tokens_to_ids(tokens)
                label_ids += [label2id[label]] + [pad_token_label_id]*(len(tokens)-1)#[label2id[label]]*len(tokens)
        # Keep only maximum sequence length
        input_ids = input_ids[:(max_seq_length-special_tokens)]
        label_ids = label_ids[:(max_seq_length-special_tokens)]
        # Add special tokens to the features
        input_ids = [cls_token_id]*(not cls_token_at_end) + input_ids + [sep_token_id] + [sep_token_id]*sep_token_extra + [cls_token_id]*cls_token_at_end
        label_ids = [pad_token_label_id]*(not cls_token_at_end) + label_ids + [pad_token_label_id] + [pad_token_label_id]*sep_token_extra + [pad_token_label_id]*cls_token_at_end
        # Initialize attention mask and token type ids
        attention_mask = [token_mask_id]*len(input_ids)
        token_type_ids = [token_type_id]*len(input_ids)
        # Pad to the maximum sequence length
        padding_length = max_seq_length-len(input_ids)
        input_ids = ([pad_token_id]*padding_length)*left_pad + input_ids + ([pad_token_id]*padding_length)*right_pad
        label_ids = ([pad_token_label_id]*padding_length)*left_pad + label_ids + ([pad_token_label_id]*padding_length)*right_pad
        attention_mask = ([pad_token_mask_id]*padding_length)*left_pad + attention_mask + ([pad_token_mask_id]*padding_length)*right_pad
        token_type_ids = ([pad_token_type_id]*padding_length)*left_pad + token_type_ids + ([pad_token_type_id]*padding_length)*right_pad
        # Add example to feature list
        feature = {"input_ids": input_ids}
        if "attention_mask" in tokenizer.model_input_names:
            feature["attention_mask"] = attention_mask
        if "token_type_ids" in tokenizer.model_input_names:
            feature["token_type_ids"] = token_type_ids
        features.append([feature, label_ids])
    # Features to tensors
    def gen():
        for feature, label in features:
            yield (feature, label)
    tf_types = {"input_ids": tf.int32}
    tf_shapes = {"input_ids": tf.TensorShape([None])}
    if "attention_mask" in tokenizer.model_input_names:
        tf_types["attention_mask"] = tf.int32
        tf_shapes["attention_mask"] = tf.TensorShape([None])
    if "token_type_ids" in tokenizer.model_input_names:
        tf_types["token_type_ids"] = tf.int32
        tf_shapes["token_type_ids"] = tf.TensorShape([None])
    dataset = tf.data.Dataset.from_generator(gen, (tf_types, tf.int64), (tf_shapes, tf.TensorShape([None])))
    dataset = dataset.apply(tf.data.experimental.assert_cardinality(len(features)))
    return dataset

def filter_predictions(predictions, label_ids):
    preds = tf.argmax(predictions, axis=2).numpy()
    batch_size, seq_len = preds.shape
    y_true = [[id2label[label_ids[i][j]] for j in range(seq_len) if label_ids[i][j] != -100] for i in range(batch_size)]
    y_pred = [[id2label[preds[i][j]] for j in range(seq_len) if label_ids[i][j] != -100] for i in range(batch_size)]
    return y_true, y_pred

def compute_metrics(predictions):
    y_true, y_pred = filter_predictions(predictions.predictions, predictions.label_ids)
    performance = performance_measure(y_true, y_pred)
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="micro")
    return {
        "true_positives": performance["TP"],
        "true_negatives": performance["TN"],
        "false_positives": performance["FP"],
        "false_negatives": performance["FN"],
        "precision": precision,
        "recall": recall,
        "f1_score": f1,
        "accuracy": accuracy_score(y_true, y_pred)
    }

In [None]:
training_args = TFTrainingArguments(
    output_dir="MEDDOPROF",
    do_train=True,
    do_eval=True,
    do_predict=True,
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    logging_strategy="steps",
    evaluation_strategy="steps",
    save_strategy="steps",
    num_train_epochs=8.0,
)

In [None]:
random.seed(training_args.seed)

labels = labels_task1 if task1 else labels_task2
labels_names = [["O", "B-"+label, "I-"+label, "E-"+label, "S-"+label] if bioes else ["O", "B-"+label, "I-"+label] for label in labels]
id2labels = [{i: l for i, l in enumerate(label)} for label in labels_names]
labels2id = [{l: i for i, l in enumerate(label)} for label in labels_names]

# Training set
folder = "processedTrain_task1" if task1 else "processedTrain_task2"
csv_files_train = [os.path.join(folder, file) for file in os.listdir(folder) if file.endswith(".csv")]
# Training set plus
folder = "processedTrain_task1_plus"
if task1 and plus:
    csv_files_train += [os.path.join(folder, file) for file in os.listdir(folder) if file.endswith(".csv")]
random.shuffle(csv_files_train)
csv_files_train = [[csv_file_train for csv_file_train in csv_files_train if label in csv_file_train] for label in labels]

# Test set
folder = "processedTest_task1" if task1 else "processedTest_task2"
csv_files_predict = [os.path.join(folder, file) for file in os.listdir(folder) if file.endswith(".csv")]
random.shuffle(csv_files_predict)
csv_files_predict = [[csv_file_predict for csv_file_predict in csv_files_predict if label in csv_file_predict] for label in labels]

# Development set
PER_EVAL = 0.2
csv_files_pos = [[csv_file_train for csv_file_train in csv_files_train[l] if label in set([i.split("-")[-1] for i in set(pd.read_csv(csv_file_train, keep_default_na=False, encoding = "utf16")["Label"].values.tolist())])] for l, label in enumerate(labels)]
csv_files_neg = [[csv_file_train for csv_file_train in csv_files_train[l] if not label in set([i.split("-")[-1] for i in set(pd.read_csv(csv_file_train, keep_default_na=False, encoding = "utf16")["Label"].values.tolist())])] for l, label in enumerate(labels)]
csv_files_eval = [csv_files_pos[l][:int(len(csv_files_pos[l])*PER_EVAL)]+csv_files_neg[l][:int(len(csv_files_neg[l])*PER_EVAL)] for l in range(len(labels))]
csv_files_train = [csv_files_pos[l][int(len(csv_files_pos[l])*PER_EVAL):]+([] if positives else csv_files_neg[l][int(len(csv_files_neg[l])*PER_EVAL):]) for l in range(len(labels))]
[random.shuffle(csv_file_eval) for csv_file_eval in csv_files_eval]
[random.shuffle(csv_file_train) for csv_file_train in csv_files_train]

In [None]:
os.environ["WANDB_DISABLED"] = "true"

hf_logging.set_verbosity_info()
hf_logging.enable_default_handler()
hf_logging.enable_explicit_format()

# Prepare Token Classification task
for l, label in enumerate(labels):
    print("######## " + os.path.join("MEDDOPROF_" + model_name.replace("/","_"), label) + " ########")
    training_args.output_dir = os.path.join("MEDDOPROF_" + model_name.replace("/","_"), label)
    id2label = id2labels[l]
    label2id = labels2id[l]
    
    config = AutoConfig.from_pretrained(model_name, num_labels=len(labels), id2label=id2label, label2id=label2id, return_dict=True, output_hidden_states=True, output_attentions=True)
    with training_args.strategy.scope():
        model = TFAutoModelForTokenClassification.from_pretrained(model_name, config=config, from_pt=True)
    
    # Get datasets
    train_dataset = extract_dataset(csv_files_train[l], tokenizer, max_seq_length, model_name, label2id) if training_args.do_train else None
    eval_dataset = extract_dataset(csv_files_eval[l], tokenizer, max_seq_length, model_name, label2id) if training_args.do_eval else None
    test_dataset = extract_dataset(csv_files_predict[l], tokenizer, max_seq_length, model_name, label2id) if training_args.do_predict else None
    
    # Initialize our Trainer
    trainer = TFTrainer(model=model, train_dataset=train_dataset, eval_dataset=eval_dataset, compute_metrics=compute_metrics, args=training_args)
    
    # Training set
    if training_args.do_train:
        approx = math.floor if training_args.dataloader_drop_last else math.ceil
        steps_per_epoch = max(approx(tf.data.experimental.cardinality(train_dataset).numpy()/(training_args.train_batch_size*training_args.gradient_accumulation_steps)),1)
        training_args.logging_steps = int(steps_per_epoch/4)        
        training_args.eval_steps = steps_per_epoch
        training_args.save_steps = steps_per_epoch
        trainer.train()
    
    # Development set
    if training_args.do_eval:
        # Results for every epoch
        results = []
        with training_args.strategy.scope():
            ckpt = tf.train.Checkpoint(optimizer=trainer.optimizer, model=trainer.model)
            for checkpoint in trainer.model.ckpt_manager.checkpoints:
                ckpt.restore(checkpoint)
                results.append(trainer.evaluate())
        
        # Save best checkpoint
        criteria = [result["eval_loss"] for result in results]
        checkpoint = trainer.model.ckpt_manager.checkpoints[criteria.index(min(criteria))]
        trainer.save_model()
        tokenizer.save_pretrained(training_args.output_dir)
        
        # Save Development results
        with open(os.path.join(training_args.output_dir, "eval_results.txt"), "w") as writer:
            logger.info("***** Eval results *****")
            for key, value in results[criteria.index(min(criteria))].items():
                logger.info("  %s = %s", key, value)
                writer.write("%s = %s\n" % (key, value))
    
    # Test set
    if training_args.do_predict:
        # Load best checkpoint
        with training_args.strategy.scope():
            ckpt = tf.train.Checkpoint(optimizer=trainer.optimizer, model=trainer.model)
            ckpt.restore(checkpoint if checkpoint else trainer.model.ckpt_manager.latest_checkpoint)
        
        # Final results
        predictions, label_ids, metrics = trainer.predict(test_dataset)
        y_true, y_pred = filter_predictions(predictions, label_ids)
        
        # Save predictions
        folder = os.path.join(training_args.output_dir, "prediction")
        if not os.path.exists(folder):
            os.mkdir(folder)
        
        tokens = [tokenizer.convert_ids_to_tokens(test_data[0]["input_ids"]) for test_data in iter(test_dataset)]
        preds = tf.argmax(predictions, axis=2).numpy()
        
        for file, [token_data, pred_data] in enumerate(zip(tokens, preds)):
            new_tokens = []
            new_y_pred = []
            for token, pred in zip(token_data, pred_data):
                if token=="[SEP]":
                    break
                if token!="[CLS]":
                    if token.startswith("##"):
                        new_tokens[-1] += token[2:]
                    else:
                        new_tokens.append(token)
                        new_y_pred.append(id2label[pred])
            csv_file_predict = csv_files_predict[l][file]
            offset_start, offset_end = zip(*pd.read_csv(csv_file_predict, keep_default_na=False, encoding="utf-16")[["Offset start","Offset end"]].values.tolist())
            csv_prediction = os.path.join(folder, os.path.split(csv_file_predict)[-1])
            pd.DataFrame(list(zip(new_tokens, offset_start, offset_end, new_y_pred)), columns = ["Token","Offset start","Offset end","Label"]).to_csv(csv_prediction, index=False, encoding="utf-16")

In [None]:
def BIO2brat(csv_files, folder=""):
    annotations = []
    for csv_file in csv_files:
        offset = int(os.path.split(csv_file)[-1].split("~")[-1][:-4])
        previous = "O"
        for _, offset_start, offset_end, label in pd.read_csv(csv_file, keep_default_na=False, encoding="utf-16").values.tolist():
            if label != "O":
                if previous != "O" and label[0] not in ["B","S"]:
                    annotations[-1][-1] = offset_end+offset
                else:
                    annotations.append([label[2:], offset_start+offset, offset_end+offset])
            previous = label
    annotations = list(set([(label, offset_start, offset_end) for label, offset_start, offset_end in annotations]))
    annotations.sort(key = lambda x: x[1])
    annotations = ["T"+str(i+1)+"\t"+label+" "+str(offset_start)+" "+str(offset_end) for i, [label, offset_start, offset_end] in enumerate(annotations)]
    with open(os.path.join(folder, os.path.split(csv_files[0])[-1].split("~")[0] + "_prediction.ann"), "wb") as file:
        file.write("\n".join(annotations).encode())

In [None]:
folder = os.path.join("MEDDOPROF_" + model_name.replace("/","_"), "prediction")
if not os.path.exists(folder):
    os.mkdir(folder)

csv_prediction_files = [[os.path.join(folder, file) for folder in [os.path.join("MEDDOPROF_" + model_name.replace("/","_"), label, "prediction")] for file in os.listdir(folder) if file.endswith(".csv")] for label in labels]
csv_prediction_set = set([os.path.split(csv_prediction_file)[-1].split("~")[0] for label in csv_prediction_files for csv_prediction_file in label])
csv_prediction_files = [[csv_prediction_file for label in csv_prediction_files for csv_prediction_file in label if csv_prediction_num == os.path.split(csv_prediction_file)[-1].split("~")[0]] for csv_prediction_num in csv_prediction_set]

for csv_prediction_file in tqdm(csv_prediction_files):
    BIO2brat(csv_prediction_file, folder=folder)
predictions = [os.path.join(folder, file) for file in os.listdir(folder) if file.endswith(".ann")]

folder = "meddoprof_test_set"
files = [os.path.join(folder, file) for file in os.listdir(folder) if file.endswith(".txt")]

# Aggregate text into annotations
for file_name, prediction in tqdm([(file_name, prediction) for file_name in files for prediction in predictions if os.path.split(file_name)[-1].split(".")[0] == os.path.split(prediction)[-1].split("_prediction")[0]]):
    with open(file_name, "rb") as file:
        text = file.read().decode()
    with open(prediction, "rb") as file:
        annotations = file.read().decode().splitlines()
    annotations = [ann+"\t"+text[int(off_start):int(off_end)] for ann in annotations for off_start, off_end in [ann.split("\t", maxsplit=2)[-1].split(" ", maxsplit=1)[-1].split(" ")]]
    with open(prediction, "wb") as file:
        file.write("\n".join(annotations).encode())

### Dictionary matching

In [None]:
codes = pd.read_csv("meddoprof_valid_codes.tsv", sep="\t", keep_default_na=False)
dictionary = {label: code for code, labels, alternatives in codes.values.tolist() for label in labels.split("|")}
dictionary.update({alternative: code for code, labels, alternatives in codes.values.tolist() for alternative in alternatives.split("|")})

folder = os.path.join("MEDDOPROF_" + model_name.replace("/","_"), "prediction")
files = [os.path.join(folder, file)[:-4] for file in os.listdir(folder) if file.endswith(".ann")]
files_codes = []
for file_name in files:
    with open(file_name + ".ann", "rb") as file:
        annotations = file.read().decode().splitlines()
    for ann in annotations:
        if ann and ann[0] == 'T':
            idx, entity_label, name = ann.split("\t", maxsplit=2)
            entity_label, offsets = entity_label.split(" ", maxsplit=1)
            offsets = [[int(offset) for offset in pair.split(" ")] for pair in offsets.split(';')]
            off_start, off_end = [min(offset[0] for offset in offsets), max(offset[1] for offset in offsets)]
            files_codes.append([os.path.split(file_name)[-1],name,str(off_start)+" "+str(off_end)])
final_file = files_codes

In [None]:
similarity = lambda u,v: nltk.distance.edit_distance(u, v, transpositions=True)
string_matchings = [[value for key, value in dictionary.items() if key==name] for filename, name, span in files_codes]
string_matchings = [string_matching[0] if string_matching else min([[value, similarity(files_codes[s][1], key)] for key, value in dictionary.items()], key=lambda x: x[1])[0] for s, string_matching in tqdm(enumerate(string_matchings))]
final_file = pd.DataFrame([["\""+filename+"\"", "\""+name+"\"", "\""+span+"\"", "\""+string_matching+"\""] for [filename, name, span], string_matching in zip(final_file, string_matchings)], columns=["\"filename\"", "\"text\"", "\"span\"", "\"code\""])
final_file.to_csv(os.path.join(os.path.split(folder)[0], "task3.tsv"), index=False, encoding="utf-16", sep='\t',quotechar="'")