In [11]:
import json

In [12]:
def load_data(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

# def preprocess_data(data):
#     input_texts = [entry['question'] for entry in data]
#     target_texts = ['; '.join(entry['final_answers']) for entry in data]
#     return input_texts, target_texts

def preprocess_data(data):
    input_texts = []
    target_texts = []
    for entry in data:
        context = f"Subject: {entry['subject']}, Type: {entry['type']}"
        if 'aliases' in entry and entry['aliases']:
            aliases = ", ".join(entry['aliases'])
            context += f", Aliases: {aliases}"
        question = entry['question']
        input_texts.append(f"{context}. {question}")
        target_texts.append('; '.join(entry['final_answers']))
    return input_texts, target_texts

In [13]:
train_data = load_data('data/train_TLQA.json')
test_data = load_data('data/test_TLQA.json')

train_input_texts, train_target_texts = preprocess_data(train_data)
test_input_texts, test_target_texts = preprocess_data(test_data)

def prepare_inference_inputs(data):
    input_texts = [entry['question'] for entry in data]
    return input_texts

# Prepare test inputs without context for inference
test_input_texts_without_context = prepare_inference_inputs(test_data)

In [14]:
# pip install sentencepiece
# pip install pytorch
# pip install transformers[torch]

from transformers import T5Tokenizer

tokenizer = T5Tokenizer.from_pretrained('google/flan-t5-base')

max_length = 1024  # Adjust according to your model's capacity
train_encodings = tokenizer(train_input_texts, padding=True, truncation=True, max_length=max_length, return_tensors='pt')

# train_encodings = tokenizer(train_input_texts, padding=True, return_tensors='pt')
train_labels = tokenizer(train_target_texts, padding=True, return_tensors='pt').input_ids

In [15]:
from transformers import T5ForConditionalGeneration, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset

class TLQADataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item



In [16]:

model = T5ForConditionalGeneration.from_pretrained('google/flan-t5-base')

train_dataset = TLQADataset(train_encodings, train_labels)

test_encodings = tokenizer(test_input_texts, padding=True, truncation=True, max_length=max_length, return_tensors='pt')
test_labels = tokenizer(test_target_texts, padding=True, truncation=True, max_length=max_length, return_tensors='pt').input_ids

# Define the evaluation dataset
test_dataset = TLQADataset(test_encodings, test_labels)

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=2,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    evaluation_strategy="epoch",
    save_strategy="epoch"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer
)

  trainer = Trainer(


In [None]:
trainer.train()

<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

 ··········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


Epoch,Training Loss,Validation Loss
1,0.3773,0.32032


In [None]:
def generate_answers(model, tokenizer, inputs):
    inputs = tokenizer(inputs, padding=True, truncation=True, return_tensors="pt")
    inputs = inputs.to(model.device)
    outputs = model.generate(inputs.input_ids)
    answers = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
    return answers

def generate_answers_2(model, tokenizer, inputs, batch_size=8):
    model.eval()  # Set the model to evaluation mode
    answers = []

    # Process the inputs in batches
    for i in range(0, len(inputs), batch_size):
        batch_inputs = inputs[i:i + batch_size]
        encoded_inputs = tokenizer(batch_inputs, padding=True, truncation=True, return_tensors="pt")
        encoded_inputs = {key: val.to(model.device) for key, val in encoded_inputs.items()}

        with torch.no_grad():
            outputs = model.generate(**encoded_inputs)

        batch_answers = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
        answers.extend(batch_answers)

    return answers

def generate_answers_3(model, tokenizer, inputs, batch_size=8):
    model.eval()  # Set model to evaluation mode
    all_predictions = []

    for i in range(0, len(inputs), batch_size):
        batch_inputs = inputs[i:i + batch_size]
        encoding = tokenizer(batch_inputs, padding=True, truncation=True, return_tensors="pt", max_length=1024)
        encoding = encoding.to(model.device)

        with torch.no_grad():
            output = model.generate(encoding.input_ids)

        predictions = [tokenizer.decode(output, skip_special_tokens=True) for output in output]
        all_predictions.extend(predictions)

    return all_predictions

In [None]:
import re

# Exact Match metric
def compute_exact_match(predictions, references):
    return sum([1 if pred.strip().lower() == ref.strip().lower() else 0 for pred, ref in zip(predictions, references)]) / len(references)

# F1 score metric
def compute_f1(predictions, references):
    def get_tokens(text):
        return re.findall(r'\b\w+\b', text.lower())

    f1_scores = []
    for pred, ref in zip(predictions, references):
      pred_tokens = get_tokens(pred)
      ref_tokens = get_tokens(ref)

      common = set(pred_tokens) & set(ref_tokens)
      if not common:
          f1_scores.append(0)
          continue

      precision = len(common) / len(pred_tokens)
      recall = len(common) / len(ref_tokens)
      f1_scores.append(2 * (precision * recall) / (precision + recall))
    return sum(f1_scores) / len(f1_scores)

# Time metric
def extract_years(text):
  return re.findall(r'\b(19|20)\d{2}\b', text)

def compute_time_metric(predictions, references):
    time_metric_scores = []
    for pred, ref in zip(predictions, references):
        pred_years = set(extract_years(pred))
        ref_years = set(extract_years(ref))

        if not ref_years:
            # If there are no years in the reference, consider it perfect (or adjust based on your criteria)
            time_metric_scores.append(1.0)
            continue

        if not pred_years:
            # If there are no years in the prediction but there are in the reference, it's incorrect
            time_metric_scores.append(0.0)
            continue

        intersection = pred_years & ref_years
        union = pred_years | ref_years
        time_metric_scores.append(len(intersection) / len(union))

    return sum(time_metric_scores) / len(time_metric_scores)

# Completeness metric
def compute_completeness(predictions, references):
    def list_contains_all(sublist, mainlist):
        return all(item in mainlist for item in sublist)

    completeness_scores = []
    for pred, ref in zip(predictions, references):
        pred_items = pred.split('; ')
        ref_items = ref.split('; ')
        completeness_scores.append(list_contains_all(ref_items, pred_items))

    return sum(completeness_scores) / len(completeness_scores)

In [None]:
def evaluate(predictions, references):
    em = compute_exact_match(predictions, references)
    f1 = compute_f1(predictions, references)
    time_metric = compute_time_metric(predictions, references)
    completeness = compute_completeness(predictions, references)

    print(f"Exact Match: {em * 100:.2f}%")
    print(f"F1 Score: {f1 * 100:.2f}%")
    print(f"TimeMetric: {time_metric * 100:.2f}%")
    print(f"Completeness: {completeness * 100:.2f}%")

    return {
        "EM": em,
        "F1": f1,
        "TimeMetric": time_metric,
        "Completeness": completeness
    }

In [None]:
predictions = generate_answers_2(model, tokenizer, test_input_texts)
evaluation_results = evaluate(predictions, test_target_texts)

In [None]:
def display_test_cases(model, tokenizer, test_inputs, test_targets, num_cases=5):
    # Generate answers using the model
    predictions = generate_answers_2(model, tokenizer, test_inputs[:num_cases])

    # Display each test case with the original question, expected answer, and model prediction
    for i in range(num_cases):
        print(f"Test Case {i+1}:")
        print(f"Question: {test_inputs[i]}")
        print(f"Expected Answer: {test_targets[i]}")
        print(f"Model's Answer: {predictions[i]}")
        print("="*80)

# Call the function to display the first 5 test cases
display_test_cases(model, tokenizer, test_input_texts, test_target_texts)

In [None]:
# Extract questions and answers from the training data for KNN
train_questions = [entry['question'] for entry in train_data]
train_answers = ['; '.join(entry['final_answers']) for entry in train_data]

In [None]:
class KnnSearch:
    def __init__(self,data=None, num_trees=None,emb_dim=None):
        self.num_trees=num_trees
        self.emb_dim=emb_dim
    def get_embeddings_for_data(self, data_ls):
        model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
        embeddings = model.encode(data_ls)
        return embeddings

    def get_top_n_neighbours(self,sentence,data_emb,transfer_data,k):
        sent_emb = self.get_embeddings_for_data(sentence)
        #data_emb = self.get_embeddings_for_data(transfer_questions)
        top_questions = []

        print("new_emb", sent_emb.shape, data_emb.shape)
        text_sims = cosine_similarity(data_emb, [sent_emb]).tolist()
        results_sims = zip(range(len(text_sims)), text_sims)
        sorted_similarities = sorted(results_sims, key=lambda x: x[1], reverse=True)

        #print("text_sims",sorted_similarities[:2])
        for idx, item in sorted_similarities[:k]:
                #if item[0] > 0.45:
                    top_questions.append(transfer_data[idx])

        # text_sims = cosine_similarity(strategy_emb, [sent_emb]).tolist()
        # results_sims = zip(range(len(text_sims)), text_sims)
        # sorted_similarities = sorted(results_sims, key=lambda x: x[1], reverse=True)
        #print("text_sims",sorted_similarities[:2])
        # for idx, item in sorted_similarities:
        #         top_questions.append(str_qa[idx])
        return top_questions

In [None]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# Instantiate KnnSearch
knn_search = KnnSearch()

# Get embeddings for training data
train_embeddings = knn_search.get_embeddings_for_data(train_questions)
print(train_embeddings)

In [None]:
def get_few_shot_examples(test_question, k=5):
    combined_data = [{'question': q, 'answer': a} for q, a in zip(train_questions, train_answers)]

    # Get the top-k similar questions
    few_shot_examples = knn_search.get_top_n_neighbours(
        sentence=test_question,
        data_emb=train_embeddings,
        transfer_data=combined_data,
        k=k
    )
    return few_shot_examples

def create_few_shot_prompt(few_shot_examples, test_question):
    prompt = ""
    for example in few_shot_examples:
        prompt += f"Q: {example['question']}\nA: {example['answer']}\n\n"
    prompt += f"Q: {test_question}\nA:"
    return prompt

def generate_few_shot_predictions(model, tokenizer, test_questions, k=5):
    predictions = []
    for test_question in test_questions:
        few_shot_examples = get_few_shot_examples(test_question, k)
        prompt = create_few_shot_prompt(few_shot_examples, test_question)
        input_ids = tokenizer(prompt, return_tensors="pt").input_ids
        # outputs = model.generate(input_ids, max_length=512)
        # prediction = tokenizer.decode(outputs[0], skip_special_tokens=True)
        # predictions.append(prediction)
    # return predictions
    return []

In [None]:
# Generate predictions


test_question = "List all sports teams Andy Carroll played for from 2010 to 2020."

# Pass the test question as a single-element list
few_shot_examples = knn_search.get_top_n_neighbours(
    sentence=test_question,  # Wrap the test question in a list
    data_emb=train_embeddings,  # Ensure this is 2D
    transfer_data=train_questions,  # The list of training questions
    k=5  # Number of nearest neighbors to retrieve
)
print(few_shot_examples)

few_shot_predictions = generate_few_shot_predictions(model, tokenizer, test_input_texts, k=5)
evaluation_results_few_shot = evaluate(few_shot_predictions, test_target_texts)