In [None]:
# uncomment if working in colab
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# uncomment if using colab
!pip install -q -U datasets
!pip install seqeval
!pip install -q -U evaluate
!pip install -q -U git+https://github.com/huggingface/transformers.git
!pip install -q -U bitsandbytes
!pip install -q -U git+https://github.com/huggingface/peft.git
!pip install -q -U git+https://github.com/huggingface/accelerate.git

In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, TextGenerationPipeline
import torch
import os
from utils import *

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
# dict for the entities (entity to int value)
simple_ent = {"Condition", "Value", "Drug", "Procedure", "Measurement", "Temporal", "Observation", "Person", "Device"}
sel_ent = {
    "O": 0,
    "B-Condition": 1,
    "I-Condition": 2,
    "B-Value": 3,
    "I-Value": 4,
    "B-Drug": 5,
    "I-Drug": 6,
    "B-Procedure": 7,
    "I-Procedure": 8,
    "B-Measurement": 9,
    "I-Measurement": 10,
    "B-Temporal": 11,
    "I-Temporal": 12,
    "B-Observation": 13,
    "I-Observation": 14,
    "B-Person": 15,
    "I-Person": 16,
    "B-Device": 17,
    "I-Device": 18
}

entities_list = list(sel_ent.keys())
sel_ent_inv = {v: k for k, v in sel_ent.items()}

In [None]:
root = '..'
root = './drive/MyDrive/TER-LISN-2024'
data_path = f'{root}/data'
models_path = f'{root}/models'

In [None]:
model_name = "BioMistral/BioMistral-7B"

In [None]:
# Load base model(Mistral 7B)
bnb_config = BitsAndBytesConfig(
    load_in_4bit= True,
    bnb_4bit_quant_type= "nf4",
    bnb_4bit_compute_dtype= torch.bfloat16,
    bnb_4bit_use_double_quant= False,
)
model = AutoModelForCausalLM.from_pretrained(
   model_name,
    quantization_config=bnb_config,
    device_map={"": 0}
)

In [None]:
# import tokenizer for mistral-7B
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
tokenizer.padding_side = 'left'
tokenizer.pad_token = tokenizer.eos_token
tokenizer.add_eos_token = True
tokenizer.add_bos_token, tokenizer.add_eos_token

In [None]:
pipe  = TextGenerationPipeline(model = model, tokenizer = tokenizer)

In [None]:
dataset = load_dataset('JavierLopetegui/chia_v1')

In [None]:
# for each sentence save the text
def generate_sentences_from_tokens(sentences):
    texts_sentences = []
    sentences_tokens = sentences['tokens']
    for sentence in sentences_tokens:
        sent_text = " ".join(sentence)
        texts_sentences.append(sent_text)
    sentences['text'] = texts_sentences
    return sentences

In [None]:
def build_prompts(sentences, prompt_type=2):
    sentences_prompts = []
    for sent in sentences['text']:
        prompt = build_prompt(sent, prompt_type)
        sentences_prompts.append(prompt)
    sentences['prompt'] = sentences_prompts
    return sentences

In [None]:
dataset = dataset.map(lambda x: generate_sentences_from_tokens(x), batched = True)
dataset_prompt2 = dataset.map(lambda x: build_prompts(x, prompt_type=2), batched = True)

In [None]:
test_dataset_p2 = dataset_prompt2['test']

In [None]:
# keep just the prompt column
test_dataset_p2 = test_dataset_p2.remove_columns(['tokens', 'text', 'ner_tags', 'file'])

In [None]:
# data_loader_p2 = DataLoader(test_dataset_p2, batch_size=4, shuffle=False)

In [None]:
# generated_sentences_p2 = pipe(batch['prompt'], max_new_tokens = 500, return_full_text = False, handle_long_generation = "hole"))

In [None]:
generated_sentences_p2 = []
for sentence in test_dataset_p2['prompt']:
    output = pipe(sentence, max_new_tokens = 500, return_full_text = False, handle_long_generation = "hole")[0]['generated_text']
    output = output.split('\n\n')[0]
    generated_sentences_p2.append(output)

In [None]:
tokenizer = AutoTokenizer.from_pretrained('xlm-roberta-base')

In [None]:
# tokenize and align the labels in the dataset
def tokenize_and_align_labels(sentence, flag = 'I'):
    """
    Tokenize the sentence and align the labels
    inputs:
        sentence: dict, the sentence from the dataset
        flag: str, the flag to indicate how to deal with the labels for subwords
            - 'I': use the label of the first subword for all subwords but as intermediate (I-ENT)
            - 'B': use the label of the first subword for all subwords as beginning (B-ENT)
            - None: use -100 for subwords
    outputs:
        tokenized_sentence: dict, the tokenized sentence now with a field for the labels
    """
    tokenized_sentence = tokenizer(sentence['tokens'], is_split_into_words=True, truncation=True)

    labels = []
    for i, labels_s in enumerate(sentence['ner_tags']):
        word_ids = tokenized_sentence.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # if the word_idx is None, assign -100
            if word_idx is None:
                label_ids.append(-100)
            # if it is a new word, assign the corresponding label
            elif word_idx != previous_word_idx:
                label_ids.append(labels_s[word_idx])
            # if it is the same word, check the flag to assign
            else:
                if flag == 'I':
                    if entities_list[labels_s[word_idx]].startswith('I'):
                      label_ids.append(labels_s[word_idx])
                    else:
                      label_ids.append(labels_s[word_idx] + 1)
                elif flag == 'B':
                    label_ids.append(labels_s[word_idx])
                elif flag == None:
                    label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_sentence['labels'] = labels
    return tokenized_sentence

**Standarizing true annotations**

In [None]:
new_true_annotations = []
for sent in dataset['test']:
    annotation = []
    for word, tag in zip(sent['tokens'], sent['ner_tags']):
        annotation.append((word, sel_ent[tag]))
    new_annotation = []
    ps = r'(\.|\,|\:|\;|\!|\?|\-|\(|\)|\[|\]|\{|\}|\")'
    for i,(word, tag) in enumerate(annotation):
        if re.search(ps, word):
            # find the ocurrences of the punctuation signs
            occurrences = re.finditer(ps, word)
            indexes = [(match.start(), match.end()) for match in occurrences]
            # create the new tokens
            last = 0
            for j, (beg, end) in enumerate(indexes):
                if beg > last:
                    new_annotation.append((word[last:beg], tag))
                if tag != "O":
                    label = f'I-{tag.split("-")[1]}'
                else:
                    label = "O"
                if end < len(word) or (i < len(annotation) - 1 and annotation[i+1][1] == label):
                    new_annotation.append((word[beg:end], label))
                else:
                    new_annotation.append((word[beg:end], 'O')) 
                last = end
            if last < len(word):
                new_annotation.append((word[last:], label))
        else:
            new_annotation.append((word, tag))
    new_true_annotations.append(new_annotation)
len(new_true_annotations)

In [None]:
true_annotations = []
for sent in new_true_annotations:
    dicc_sent = {"tokens":[], "ner_tags":[]}
    for word, tag in sent:
        dicc_sent["tokens"].append(word)
        dicc_sent["ner_tags"].append(sel_ent[tag])
    true_annotations.append(dicc_sent)
len(true_annotations)

In [None]:
true_df = pd.DataFrame(true_annotations)
true_ann_dataset = Dataset.from_pandas(true_df)

In [None]:
true_ann_dataset = true_ann_dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
def get_labels(p):
    predictions, labels = p
    # Remove ignored index (special tokens)
    predictions = [
        [entities_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    labels = [
        [entities_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    return predictions, labels

In [None]:
from eval_file import *

**Evaluating prompt 2**

In [None]:
def parse_ann2bio(sentence, pattern, pattern1, pattern2):
    if sentence[-1] == "\n":
        sentence = sentence[:-2] # remove the \n and a final point wrongly added
    else:
        sentence = sentence[:-1] # remove the final point wrongly added
    
    # find the entities
    occurrences = re.finditer(pattern, sentence)
    indexes = [(match.start(), match.end()) for match in occurrences]

    annotation = []
    i = 0
    # create the bio list
    for beg, end in indexes:
        if beg > i:
            annotation.extend([(word, "O") for word in sentence[i:beg].split()])
        entity = sentence[beg:end]
        entity_name = re.search(pattern1, entity).group(1)
        entity = entity.replace(f'<{entity_name}>', "").replace(f'</{entity_name}>', "")
        split_entity = entity.split()
        annotation.append((split_entity[0], "B-" + entity_name))
        annotation.extend([(word, "I-" + entity_name) for word in split_entity[1:]])
        i = end
    annotation.extend([(word, "O") for word in sentence[i:].split()])

    # check punctuation sign in tokens and put them as individual tokens
    ps = r'(\.|\,|\:|\;|\!|\?|\-|\(|\)|\[|\]|\{|\}|\")'
    new_annotation = []
    for i,(word, tag) in enumerate(annotation):
        if re.search(ps, word):
            # find the ocurrences of the punctuation signs
            occurrences = re.finditer(ps, word)
            indexes = [(match.start(), match.end()) for match in occurrences]
            # create the new tokens
            last = 0
            for j, (beg, end) in enumerate(indexes):
                if beg > last:
                    new_annotation.append((word[last:beg], tag))
                if tag != "O":
                    label = f'I-{tag.split("-")[1]}'
                else:
                    label = "O"
                if end < len(word) or (i < len(annotation) - 1 and annotation[i+1][1] == label):
                    new_annotation.append((word[beg:end], label))
                else:
                    new_annotation.append((word[beg:end], 'O')) 
                last = end
            if last < len(word):
                new_annotation.append((word[last:], label))   
                
        else:
            new_annotation.append((word, tag))

    
    return new_annotation

In [None]:
pattern1 = r'<(Person|Condition|Value|Drug|Procedure|Measurement|Temporal|Observation|Mood|Pregnancy_considerations|Device)>'
pattern2 = r'</(Person|Condition|Value|Drug|Procedure|Measurement|Temporal|Observation|Mood|Pregnancy_considerations|Device)>'
pattern = f'{pattern1}.*?{pattern2}'

In [None]:
new_p2_annotations = []
for sent in generated_sentences_p2:
    annotation = parse_ann2bio(sent, pattern, pattern1, pattern2)
    p2_annotations.append(annotation)
len(new_p2_annotations)

In [None]:
p2_annotations = []
for sent in new_p2_annotations:
    dicc_sent = {"tokens":[], "ner_tags":[]}
    for word, tag in sent:
        dicc_sent["tokens"].append(word)
        dicc_sent["ner_tags"].append(sel_ent[tag])
    p2_annotations.append(dicc_sent)
len(p2_annotations)

In [None]:
p2_df = pd.DataFrame(p2_annotations)
p2_dataset = Dataset.from_pandas(p2_df)

In [None]:
p2_dataset = p2_dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
# keep just sentences with the same length
sentences_to_evaluate_p2 = []
sentences_to_evaluate_true = []

for i in range(len(p2_dataset)):
    if len(p2_dataset['labels'][i]) == len(true_ann_dataset['labels'][i]):
        sentences_to_evaluate_p2.append(p2_dataset[labels][i])
        sentences_to_evaluate_true.append(true_ann_dataset[labels][i])

print(len(sentences_to_evaluate_p2)/len(p2_dataset))

In [None]:
evaluator = BioEval()

In [None]:
pred_labels, true_labels = get_labels((sentences_to_evaluate_p2, sentences_to_evaluate_true))

In [None]:
evaluator.evaluate_annotations(true_labels, pred_labels)

In [None]:
evaluator.performance

In [None]:
evaluator.save_evaluation('eval_p2.json')

In [1]:
print("done!")

done!
