In [1]:
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "2"

In [2]:
ckpt = os.listdir("./output")
last_ckpt = max(ckpt, key=lambda x: int(x.split('-')[-1]))

In [3]:
import json

last_state_path = "./output/" + last_ckpt + "/trainer_state.json"
with open(last_state_path, 'r') as fp:
    last_state = json.load(fp)

best_state_path = last_state["best_model_checkpoint"]
model_name_or_path = best_state_path

In [4]:
max_length = 256

In [5]:
from transformers import AutoTokenizer

encoding_args = {
    "max_length" : max_length,
    "padding" : True,
    "truncation" : True,
    "return_tensors" : "pt"
}

tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
import torch
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)

cuda:0


In [7]:
from transformers import AutoModelForSeq2SeqLM

model = AutoModelForSeq2SeqLM.from_pretrained(model_name_or_path)
model = model.to(device)

In [8]:
import sys
sys.path.append("../../../src")
import data_utils
from evaluation import compute_metrics, summary_score

catch_answer_fn = getattr(data_utils.AnswerCatcher(),"lego_absa")
decoding_args = {
    "skip_special_tokens" : False
}

In [9]:
from tqdm import tqdm
from typing import List, Dict, Tuple

def generate_predictions(model,tokenizer,data,device=torch.device("cuda:0"),decoding_args:Dict={}) -> List[str]:
    # Predict
    model = model
    tokenizer = tokenizer
    tensor_predictions = []
    with torch.no_grad():
        for text in tqdm(data):
            input_ids = tokenizer(text, return_tensors="pt").input_ids.to(device)
            tensor_predictions.extend(model.generate(input_ids=input_ids, pad_token_id=tokenizer.pad_token_id,eos_token_id=tokenizer.eos_token_id,max_length=max_length).cpu())
            input_ids = input_ids.cpu()
            # attention_mask = attention_mask.cpu()
    tensor_predictions = [[token for token in row if token != -100] for row in tensor_predictions]
    predictions = tokenizer.batch_decode(tensor_predictions,**decoding_args)
    predictions = [el for el in predictions]
    return predictions

In [10]:
from datasets import Dataset

test_path = "../../../data/absa/en/zhang/interim/interim_2/rest1516/test.txt"

test_tasks = [
    {
        "paradigm" : "extraction",
        "se_order" : "oasc",
        "prompt" : "lego_absa",
        "answer" : "lego_absa"
    }
]

test = data_utils.read_data(test_path)
test_ds = data_utils.data_gen(data=test, nt_se_order="acso", tasks=test_tasks, n_fold=1, algo="round_robin", shuffle=False)
test_ds = Dataset.from_list(test_ds)

100%|██████████| 1081/1081 [00:00<00:00, 22113.18it/s]


In [11]:
inputs = test_ds["input"]
str_preds = generate_predictions(model, tokenizer, inputs, device, decoding_args)

100%|██████████| 1080/1080 [06:47<00:00,  2.65it/s]


In [12]:
from copy import deepcopy
import json
test_and_preds = []
for i in range(len(test_ds)):
    row = deepcopy(test_ds[i])
    row.update({
        "preds" : str_preds[i]
    })
    test_and_preds.append(row)

with open("test_and_preds.json", 'w') as fp:
    json.dump(test_and_preds, fp)

In [13]:
str_preds = [el.replace("</s>",'').replace("<pad>",'') for el in str_preds]

In [14]:
oasc_ext_preds = [catch_answer_fn(out, "oasc", text) for out, text in zip(str_preds, test_ds["input"])]

In [15]:
targets = [catch_answer_fn(out, "oasc", text) for out, text in zip(test_ds["output"], test_ds["input"])]

In [16]:
summary = summary_score(oasc_ext_preds,targets)
summary

{'recall': 0.48085373509102325,
 'precision': 0.4916330063473745,
 'f1_score': 0.48618363073310605}

In [17]:
with open("summary_score.json", 'w') as fp:
    json.dump(summary, fp)