In [None]:
import json
import os

import numpy as np
from evaluate import load
from sentence_transformers import SentenceTransformer, util  ## https://github.com/snunlp/KR-SBERT
from tqdm import tqdm

In [1]:
class BookathonEvaluation:
    def __init__(self, model_id="skt/kogpt2-base-v2", keyword: bool = False, device: str = "cpu") -> None:
        self.model_id = model_id
        self.keyword = keyword
        self.device = device

    def evaluate(self, path: str, batch=32):
        test_result = json.load(open(path, encoding="UTF8"))

        first_prompt = test_result[0]["prompt"]
        first_generated = test_result[0]["generated"]

        if self.keyword:
            first_prompt = " ".join(first_prompt)

        prompt_list = [" ".join(t['prompt']) if self.keyword else t['prompt'] for t in test_result]
        generated_list = [t['generated'] if first_prompt not in first_generated else t['generated'][len(t['prompt']):]
                          for t in test_result]

        evaluate_result = {}

        ## repeat_ngram
        print("repeat_ngram..")
        n_list = [2, 3, 4]
        repeat_ngram_list = [self.repeat_ngram(g, n_list=n_list) for g in generated_list]
        for n in n_list:
            evaluate_result[f'rep-{n}'] = np.mean([d[n] for d in repeat_ngram_list])
        ## diversity
        print("diversity..")
        diversity_list = [self.diversity(generation_repetition=r) for r in repeat_ngram_list]
        evaluate_result['diversity'] = np.mean(diversity_list)
        ## keyword recall
        if self.keyword:
            print("recall_keyword..")
            evaluate_result['recall_keyword'] = np.mean([
                self.recall_keyword(prompt=p, generated=g) for p, g in zip(prompt_list, generated_list)])
        ## coherence
        print("coherence..")
        evaluate_result['coherence'] = self.coherence(prompt_list=prompt_list, generated_list=generated_list,
                                                      device=self.device, batch_size=batch)
        ## ppl
        print("ppl..")
        evaluate_result['ppl'] = self.ppl(generated_list=generated_list, model_id=self.model_id, device=self.device)

        return evaluate_result

    def repeat_ngram(self, generated: str, n_list=[2]):
        generation_repetition = dict()
        generated = generated.split(" ")

        for n in n_list:
            len_total_ngram = len(generated) - n + 1
            unique_ngram = set()
            for i in range(len_total_ngram):
                unique_ngram.add(" ".join(generated[i:i + n]))
            generation_repetition[n] = 100 * (1 - (len(unique_ngram) / len_total_ngram))

        return generation_repetition

    def diversity(self, generation_repetition: dict[int, float]):
        diversity = 1
        for g in generation_repetition.keys():
            diversity *= 1 - (generation_repetition[g] / 100)
        return diversity

    def recall_keyword(self, prompt: str, generated: str):
        keyword = prompt.split("</s>")[0].split(" ")
        keyword_hit = 0
        for k in keyword:
            if k in generated:
                keyword_hit += 1
        return keyword_hit / len(keyword)

    def coherence(self, prompt_list: list[str], generated_list: list[str], device="cpu", batch_size=32):

        assert len(prompt_list) == len(generated_list), "length mismatch"
        model = SentenceTransformer('snunlp/KR-SBERT-V40K-klueNLI-augSTS', device=device)
        prompt_encoded = model.encode(prompt_list, show_progress_bar=True, batch_size=batch_size)
        generated_encoded = model.encode(generated_list, show_progress_bar=True, batch_size=batch_size)

        coherence = [
            util.cos_sim(p, g).squeeze()
            for p, g in
            tqdm(zip(prompt_encoded, generated_encoded), total=len(prompt_encoded), desc="getting coherence score...")
        ]
        return np.mean(coherence)

    def ppl(self, generated_list: list[str], model_id: str, device: str = 'cpu'):
        perplexity = load("perplexity", module_type="metric")
        results = perplexity.compute(
                model_id=model_id,
                predictions=generated_list,
                add_start_token=False,
                device=device
        )

        return results['mean_perplexity']


In [None]:
Evaluate = BookathonEvaluation(model_id=os.environ["MODEL_NAME"], keyword=os.environ["KEYWORD"] == "junyoung",
                               device="cuda")
output = Evaluate.evaluate(path=os.environ["TEST_PATH"])

with open(f'./{os.environ["OUTPUT_NAME"]}.txt', 'w') as fp:
    fp.write(os.environ["MODEL_NAME"] + ", " + os.environ["TEST_PATH"] + "\n")
    fp.write(str(output))