# Load data and data analysis

In [None]:
# TODO
# TODO: maybe load the dataset as follows from huggingface
# https://huggingface.co/docs/datasets/v1.9.0/loading_datasets.html
from datasets import load_dataset
dataset = load_dataset('squad')

# Setup generative model (open/closed model)

In [None]:
from transformers import AutoTokenizer, OPTForCausalLM, pipeline

class GenerativeModel:
    def __init__(self, max_answer_length) -> None:
        self.generator = pipeline('text-generation', model="facebook/opt-1.3b")
        self.model = OPTForCausalLM.from_pretrained("facebook/opt-1.3b")
        self.tokenizer = AutoTokenizer.from_pretrained("facebook/opt-1.3b")
        self.tokenizer.padding_side = "left"   # so that the text will continue as without padding
        self.max_answer_length = max_answer_length

    def get_open_model_answer(self, question, context, use_pipeline=False):
        prompt = f"CONTEXT:\n{context}\nQUESTION:\n{question}"
        # generate answer
        answer = self._generate_answer(prompt, use_pipeline)
        # remove prompt from generated text
        answer = answer.removeprefix(prompt)
        return answer

    def get_closed_model_answer(self, question, use_pipeline=False):
        prompt = question
        # generate answer
        answer = self._generate_answer(prompt, use_pipeline)
        # remove prompt from generated text
        answer = answer.removeprefix(prompt)
        return answer

    def get_open_batch_answers(self, questions, contexts):
        assert len(questions) == len(contexts), "questions and contexts should have the same length"
        prompts = [f"CONTEXT:\n{contexts[i]}\nQUESTION:\n{questions[i]}" for i in range(len(questions))]
        # generate answers
        answers = self._generate_batch_answers(prompts)
        # remove prompts from generated text
        answers = [answers[i].removeprefix(prompts[i]) for i in range(len(prompts))]
        return answers

    # https://github.com/huggingface/transformers/issues/10704
    def get_closed_batch_answers(self, questions):
        prompts = questions
        # generate answers
        answers = self._generate_batch_answers(prompts)
        # remove prompts from generated text
        # TEMP answers = [answers[i].removeprefix(prompts[i]) for i in range(len(prompts))]
        return answers
    
    def _generate_answer(self, prompt, use_pipeline):
        if use_pipeline:
            answer = self.generator(prompt, max_new_tokens=self.max_answer_length)[0]['generated_text']
        else:
            inputs = self.tokenizer(prompt, return_tensors="pt")
            generate_ids = self.model.generate(inputs.input_ids, max_new_tokens=self.max_answer_length)
            answer = self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
        return answer

    def _generate_batch_answers(self, prompts):
        inputs = self.tokenizer(prompts, return_tensors="pt", padding=True)  # padding, so that all prompts have same length for computing it as a batch
        generate_ids = self.model.generate(inputs.input_ids, max_new_tokens=self.max_answer_length)
        answers = self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)
        return answers

In [None]:
generative_model = GenerativeModel(max_answer_length=42)

## Temporary test of the model

In [170]:
# NOTE: test it with some squad data
from datasets import load_dataset
test_set = load_dataset('squad', split='validation[:10]')  # get first n entries from the test set
test_contexts = test_set['context']  # list of strings
test_questions = test_set['question']  # list of strings
test_answers = [d['text'] for d in test_set['answers']]  # list of lists of answers

Found cached dataset squad (/Users/stephan/.cache/huggingface/datasets/squad/plain_text/1.0.0/d6ec3ceb99ca480ce37cdd35555d6cb2511d223b9150cce08a837ef62ffea453)


In [None]:
# test for single question
print(f"Context: {test_contexts[0]}\nQuestion: {test_questions[0]}\nCorrect answer: {test_answers[0]}")
print("Closed generative Model:")
print(generative_model.get_closed_model_answer(test_questions[0]))
print("Open generative Model:")
print(generative_model.get_open_model_answer(test_questions[0], test_contexts[0]))

In [171]:
%%time
# test for batch questioning closed model
closed_answers = generative_model.get_closed_batch_answers(test_questions)

CPU times: user 2 µs, sys: 1e+03 ns, total: 3 µs
Wall time: 6.2 µs


In [None]:
%%time
# test for batch questioning open model
open_answers = generative_model.get_open_batch_answers(test_questions, test_contexts)

# Evaluation

In [None]:
# TODO