##Reading the data
Using all the functions from previous script to read the data.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os, json
import numpy as np
from argparse import ArgumentParser
from tqdm import tqdm
from collections import defaultdict
import pandas as pd
from pprint import pprint
from datetime import datetime
import copy

## Evaluation

In [None]:
%%capture
!pip install transformers # If you are using collab, "!" is required to download
!pip install bert-score
!pip install -U sentence-transformers

import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer, PorterStemmer

# Download NLTK resources
nltk.download('punkt')
nltk.download('wordnet')

In [None]:
import os, json
import numpy as np
from argparse import ArgumentParser
from tqdm import tqdm
from collections import defaultdict
import pandas as pd
from pprint import pprint
import re, string
from google.colab import files

from transformers import BertTokenizer, BertModel
from sentence_transformers import SentenceTransformer, util
import torch
import copy

In [None]:
# def perform_lemmatization(text):

#     # Tokenize the text into words
#     words = word_tokenize(text)
#     lemmatizer = WordNetLemmatizer()
#     stemmer = PorterStemmer()
#     lemmatized_and_stemmed_words = [(lemmatizer.lemmatize(word), stemmer.stem(word)) for word in words]

#     lemmatized_words, stemmed_words = zip(*lemmatized_and_stemmed_words)
#     lemmatized_text = ' '.join(lemmatized_words)
#     stemmed_text = ' '.join(stemmed_words)

#     # print("Original text:", text)
#     # print("Lemmatized text:", lemmatized_text)
#     # print("Stemmed text:", stemmed_text)
#     return stemmed_text

def clac_sbert_score(text1, text2):
    model = SentenceTransformer("all-MiniLM-L6-v2")

    embeddings1 = model.encode(text1, convert_to_tensor=True)
    embeddings2 = model.encode(text2, convert_to_tensor=True)

    cosine_score = util.cos_sim(embeddings1, embeddings2)
    #print(cosine_score.item())
    return cosine_score.item()

def calc_cosine_similarity(text1, text2):

    tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
    model = BertModel.from_pretrained("bert-base-uncased")

    # text1 = perform_lemmatization(text1)
    # text2 = perform_lemmatization(text2)

    inputs1 = tokenizer(text1, return_tensors="pt", padding=True, truncation=True)
    inputs2 = tokenizer(text2, return_tensors="pt", padding=True, truncation=True)

    outputs1 = model(**inputs1)
    outputs2 = model(**inputs2)

    embeddings1 = outputs1.last_hidden_state.mean(dim=1).detach().numpy()
    embeddings2 = outputs2.last_hidden_state.mean(dim=1).detach().numpy()

    #Calculate cosine similarity
    similarity = np.dot(embeddings1, embeddings2.T) / (np.linalg.norm(embeddings1) * np.linalg.norm(embeddings2))

    #print("Similarity between the texts:", similarity)
    return similarity[0][0]


In [None]:
def is_null_or_empty(string):
    return string is None or string.strip().lower() == 'null' or string.strip() == ''

def normalize_string(s):
      def remove_articles(text):
          regex = re.compile(r'\b(a|an|the)\b', re.UNICODE)
          return re.sub(regex, ' ', text)
      def white_space_fix(text):
          return ' '.join(text.split())
      def remove_punc(text):
          exclude = set(string.punctuation)
          return ''.join(ch for ch in text if ch not in exclude)
      def lower(text):
          return text.lower()
      return white_space_fix(remove_articles(remove_punc(lower(s))))


def calculate_score(predictions, ground_truth_labels, similarity_threshold):
    true_pos, pred_count, gtruth_count = 0, len(predictions), len(ground_truth_labels)

    if ((len(ground_truth_labels)==1 and is_null_or_empty(ground_truth_labels[0]))): #only not calculating when ground-truth is null
        #print("xxx ", predictions, labels) #(len(predictions)==1 and is_null_or_empty(predictions[0]))
        return 0, 0, 0 #skipping this sample
    else:
        actual_labels = copy.deepcopy(ground_truth_labels)
        # print(predictions, actual_labels)
        # print(len(predictions), len(actual_labels))
        for p in predictions:
            for g in actual_labels:
                if(g.strip() == ''): gtruth_count -=1 ## if any ground-truth is empty reducing that count.
                if(is_null_or_empty(p) or is_null_or_empty(g)): continue ##if prediction/ground-truth is null then skip
                np, ng = normalize_string(p), normalize_string(g)
                sim_value = calc_cosine_similarity(np,ng)
                sbert_score = clac_sbert_score(np, ng)
                print(np, ng, sim_value, sbert_score)
                if(sim_value>similarity_threshold or sbert_score>similarity_threshold):
                    true_pos += 1
                    actual_labels.remove(g)
        # print(predictions, labels)
        # print(true_pos, pred_count, gtruth_count)
        return true_pos, pred_count, gtruth_count

def calc_precision_recall_f1(true_pos, pred_count, gtruth_count):
    print(true_pos, pred_count, gtruth_count)
    precision = true_pos/(pred_count + 1e-10)
    recall = true_pos/(gtruth_count + 1e-10)
    f1 = 2 * precision * recall / (precision + recall + 1e-10)
    return precision, recall, f1

In [None]:
def select_event_specific_data(event, data):
    ev_data = []
    for dt in data:
        if event==dt['label']:
            ev_data.append(dt)
    return ev_data

def calc_results(all_data, event_types, arg_types, events_roles, pred_name, gtruth_name, similarity_threshold):
    results = {}
    for event in event_types:
        results[event] = {}
        for arg_type in arg_types:
            results[event][arg_type] = {}
            arguments = list(events_roles[event][arg_type].keys())
            for arg in arguments:
                tp, pc, gc = 0, 0, 0 #initialize the true_positive, pred_count, ground_truth-count for each arguments
                event_specifc_data = select_event_specific_data(event, all_data) ##geting the data for a specific event type.
                for dt in event_specifc_data:
                    label = dt['label']
                    pred_list = dt[pred_name][arg_type][arg] #predictions
                    gtruth_list = dt[gtruth_name][arg_type][arg] #ground-truth arguments
                    #print(pred_list, gtruth_list)
                    true_pos, pred_count, gtruth_count = 0, 0, 0
                    try:
                       true_pos, pred_count, gtruth_count = calculate_score(pred_list,
                                                                         gtruth_list, similarity_threshold)
                    except:
                        print(event, arg_type, arg)
                        print(pred_list, gtruth_list)
                        print("Error occured at", dt['doc_id'])

                    tp+= true_pos
                    pc+= pred_count
                    gc+= gtruth_count
                if(gc==0):
                    continue #skiping those values, where we do not have any ground truth.

                precision, recall, f1 = calc_precision_recall_f1(tp, pc, gc)
                print(event, arg_type, arg, precision, recall, f1)
                #saving the values in two items is a list
                #first item has (precision, recall, f1 scores) #second item has (true_pos, pred_count, ground_truth_count values)
                results[event][arg_type][arg]= [[precision, recall, f1], [tp, pc, gc]]

    return results


In [None]:
def read_json_file(name):
    with open(name, 'r') as f:
        data = json.load(f)
        return data

def save_json(json_data, file_name):
    json_data = json.dumps(json_data)
    print(file_name)
    with open(file_name, "w") as json_file:
          json_file.write(json_data)

In [None]:
events_roles = read_json_file(os.path.join('role_definitions.json'))


event_types = ['taking-moud', 'tapering', 'relapse']
arg_types = ['main-arguments', 'event-specific-arguments', 'subject-effect-arguments']
similarity_threshold = [0.749, 0.99]

number = '2'
name = 'generative-qa-flan-t5-base'
folder_path = 'Argument-Extraction-Predictions'
file_name = f'{number}-{name}-predictions.json'
all_data = read_json_file(os.path.join(folder_path, file_name))

print(all_data[0].keys())

print(len(all_data))

dict_keys(['doc_id', 'id', 'text1', 'text2', 'label', 'thread-length', 'ground-truth-arguments', 'predictions'])
100


In [None]:
print(all_data[0]['ground-truth-arguments'])
print(all_data[0]['predictions'])

{'main-arguments': {'subject/patient': ['Individual altered treatment due to withdrawals'], 'effects': ['Withdrawals in the mornings'], 'treatment': ['Timing dose later', 'splitting dose 12 hrs apart']}, 'event-specific-arguments': {'medications': ['Suboxone'], 'dosage': ['null'], 'treatment-duration': ['null'], 'manner': ['orally'], 'frequency': ['20-24hr', 'sometimes 12hr'], 'timing': ['null'], 'purpose': ['Avoiding withdrawals']}, 'subject-effect-arguments': {'age': ['null'], 'gender': ['null'], 'conditions': ['null'], 'side-effects': ['withdrawals'], 'severity': ['worst'], 'start-time': ['in the morning'], 'side-effect-duration': ['null'], 'intervention': ['splitting dosage and changing frequency']}}
{'main-arguments': {'subject/patient': ['Individual who wakes up every morning'], 'treatment': ['Taking a dose 12 hours apart'], 'effects': ['null']}, 'event-specific-arguments': {'medications': ['null'], 'dosage': ['null'], 'treatment-duration': ['null'], 'manner': ['orally'], 'freque

In [None]:
%%time
g_name = 'ground-truth-arguments'
p_name = 'predictions'
numbers = ['0', '1', '2']
name = 'generative-qa-flan-t5-base'
folder_path = 'Argument-Extraction-Predictions'

for number in numbers:
    results_dict = {}
    file_name = f'{number}-{name}-predictions.json'
    all_data = read_json_file(os.path.join(folder_path, file_name))
    for s_t in similarity_threshold:
        results = calc_results(all_data, event_types, arg_types, events_roles,
                  p_name, g_name, s_t)
        save_json(results, f'{number}-{name}-{s_t}.json')
        results_dict[f'{number}-{name}-{s_t}']=results

    result_folder_path = 'Argument-Extraction-Results'
    save_json(results_dict, os.path.join(result_folder_path, f'{number}-new-results-{name}.json'))

In [None]:
results_dict

{'taking-moud': {'main-arguments': {'subject/patient': [[0.0, 0.0, 0.0],
    [0, 40, 43]],
   'treatment': [[0.0, 0.0, 0.0], [0, 39, 41]],
   'effects': [[0.04347826086937619, 0.03333333333322223, 0.03773584900733357],
    [1, 23, 30]]},
  'event-specific-arguments': {'medications': [[0.057142857142693876,
     0.04651162790686857,
     0.05128205123244576],
    [2, 35, 43]],
   'dosage': [[0.13043478260812855, 0.11538461538417161, 0.12244897954152437],
    [3, 23, 26]],
   'treatment-duration': [[0.24999999999791667,
     0.24999999999791667,
     0.24999999994791666],
    [3, 12, 12]],
   'manner': [[0.0, 0.0, 0.0], [0, 20, 21]],
   'frequency': [[0.04761904761882087,
     0.04545454545433884,
     0.04651162785678746],
    [1, 21, 22]],
   'timing': [[0.0, 0.0, 0.0], [0, 3, 3]],
   'purpose': [[0.0, 0.0, 0.0], [0, 23, 23]]},
  'subject-effect-arguments': {'gender': [[0.0, 0.0, 0.0], [0, 1, 1]],
   'conditions': [[0.0, 0.0, 0.0], [0, 9, 11]],
   'side-effects': [[0.07692307692278107,

In [None]:
def print_check_score(data):
    event_types = ['taking-moud', 'relapse', 'tapering']
    arg_types = ['main-arguments', 'event-specific-arguments', 'subject-effect-arguments']

    scores = {}
    ls3 = []
    for event in event_types:
        ls1 = []
        for arg_type in arg_types:
            arguments = list(data[event][arg_type].keys())
            ls = []
            for arg in arguments:
                f1_score = data[event][arg_type][arg][0][2] ##stored f1-score
                gt_count = data[event][arg_type][arg][1][2] ##stored ground-truth count
                if(gt_count>5):
                    print(event, arg_type, arg, f1_score)
                    ls.append(f1_score)
            if(len(ls)>0):
                print(event, arg_type, sum(ls)/len(ls))
                scores[f'{event}_{arg_type}_average'] = sum(ls)/len(ls)
                ls1.append(sum(ls)/len(ls))
        if(len(ls1)>0):
            print("{:.3f}".format(sum(ls1)/len(ls1),"\n"))
            scores[f'{event}_average'] = sum(ls1)/len(ls1)
            ls3.append(sum(ls1)/len(ls1))
    print("\nOverall", sum(ls3)/len(ls3))
    pprint(scores)
    return scores
print_check_score(results)