In [None]:
!pip install -U datasets evaluate transformers langchain langchain-openai faiss-cpu langchain-community llamaapi


from dotenv import load_dotenv
from openai import OpenAI
from google.colab import drive
from transformers import (
    T5ForConditionalGeneration, T5Tokenizer,
    BartForConditionalGeneration, BartTokenizer,
)
from transformers import (
    MT5ForConditionalGeneration, MT5Tokenizer,
    LEDForConditionalGeneration, LEDTokenizer,
    PegasusForConditionalGeneration, PegasusTokenizer
)

from transformers import pipeline
from datasets import load_dataset, Dataset
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from collections import defaultdict
import re
import os
import torch
import json
import csv


In [4]:
#HF_TOKEN =#token
MASKED_MODEL_CONFIG = [
    {
        "name": "LED-Base",
        "model_cls": LEDForConditionalGeneration,
        "tokenizer_cls": LEDTokenizer,
        "pretrained": "allenai/led-base-16384",
        "out_dir": "/content/drive/MyDrive/models/role-aware-rag/led-base"
    },
  {
      "name": "BART-Base",
       "model_cls": BartForConditionalGeneration,
        "tokenizer_cls": BartTokenizer,
        "pretrained": "facebook/bart-base",
       "out_dir": "/content/drive/MyDrive/models/role-aware-rag/bart-base"
    },
    {
        "name": "T5-Base",
        "model_cls": T5ForConditionalGeneration,
        "tokenizer_cls": T5Tokenizer,
        "pretrained": "t5-base",
        "out_dir": "/content/drive/MyDrive/models/role-aware-rag/t5-base"
    },

  {
        "name": "DistilBART",
       "model_cls": BartForConditionalGeneration,
       "tokenizer_cls": BartTokenizer,
        "pretrained": "sshleifer/distilbart-cnn-12-6",
        "out_dir": "/content/drive/MyDrive/models/role-aware-rag/distilbart"
    }


]


In [5]:
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
#load dataset
dataset = load_dataset(
    "json",
    data_files="/content/drive/MyDrive/role_aware_squad.json",
    split="train"
)

Generating train split: 0 examples [00:00, ? examples/s]

In [7]:
#balance dataset into answerable and unanswerable questions
def balance_role_dataset(dataset, per_role_count=500):

    role_data = defaultdict(lambda: {"answerable": [], "unanswerable": []})

    for i, example in enumerate(dataset):
        if i < 1000:
          continue
        for role in ["EMPLOYER", "EMPLOYEE", "CUSTOMER"]:
            answerable = example["role_answerable"][role]
            entry = {
                "question": example["question"],
                "original_context": example["original_context"],
                "original_answer": example["original_answer"],
                "role_context": example["role_contexts"][role],
                "is_answerable": answerable,
                "role": role
            }
            category = "answerable" if answerable else "unanswerable"
            role_data[role][category].append(entry)

    # Sample balanced set
    balanced_examples = []
    for role in role_data:
        ans = role_data[role]["answerable"]
        unans = role_data[role]["unanswerable"]

        sampled = ans[:per_role_count] + unans[:per_role_count]
        balanced_examples.extend(sampled)

    return balanced_examples

In [8]:
balanced_dataset = balance_role_dataset(dataset)

In [18]:
def mask_context(context, role, tokenizer, model):
    prompt = f"""Please mask all PERSON, ORG, LOCATION, and DATE entities from the following text, considering the role {role}:\n{context}"""

    gen_cfg = model.generation_config

    # Ensure early_stopping is properly set
    if gen_cfg.early_stopping is None:
        gen_cfg = GenerationConfig.from_model_config(model.config)
        gen_cfg.early_stopping = True          # or False, or "never"
        gen_cfg.num_beams = getattr(gen_cfg, "num_beams", 4)
        gen_cfg.max_new_tokens = getattr(gen_cfg, "max_new_tokens", 512)

    inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True).to(model.device)
    output = model.generate(**inputs, generation_config=gen_cfg)

    # Return only a single decoded string
    return tokenizer.decode(output[0], skip_special_tokens=True)

In [10]:
def final_prompt(query, context):
    return f"""
    You are not allowed to use any outside knowledge, training data, or prior information. You must only answer based on the text provided below. If a detail (such as a name) is not explicitly written in the text, respond by saying "The information is not provided in the source."

    [BEGIN SOURCE TEXT]
    {context}
    [END SOURCE TEXT]

    Please answer the question
    Question: {query}
    """

In [11]:
def is_actual_answer(response):

    response_clean = response.strip().lower()

    # List of phrases that indicate no answer was found
    denial_phrases = [
        "is not provided",
        "not provided",
        "is not available",
        "not provided in the source",
        "not mentioned in the source",
        "the source does not provide",
        "cannot be found in the source",
    ]

    # Check if response matches any denial phrase
    for phrase in denial_phrases:
        if phrase in response_clean:
            return False

    return True

In [15]:
#save data to file
output_path = '/content/drive/MyDrive/llm_rag_eval_heuristic_results_llama.csv'

fieldnames = [
    "index", "role", "question", "masked_context",
    "llm_answer", "llm_says_answerable",
    "original_answer", "is_heuristically_answerable", "model"
]

#check if file exists
file_exists = os.path.isfile(output_path)

# open file
with open(output_path, mode='a', newline='', encoding='utf-8') as file:
    writer = csv.DictWriter(file, fieldnames=fieldnames)

    if not file_exists:
        writer.writeheader()

In [None]:
#process through the whole rag

device = "cuda" if torch.cuda.is_available() else "cpu"
results = []
existing_indices = set()

if os.path.isfile(output_path):
    with open(output_path, mode='r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            existing_indices.add(int(row['index']))

index_counter = 0

for model_config in MASKED_MODEL_CONFIG:
  tokenizer = model_config['tokenizer_cls'].from_pretrained(model_config['out_dir'])
  model = model_config['model_cls'].from_pretrained(model_config['out_dir']).to(device)
  for i in range(2500):
    index_counter += 1
    if index_counter in existing_indices:
      continue
    role = balanced_dataset[i]['role']
    question = balanced_dataset[i]['question']
    context = balanced_dataset[i]['original_context']
    masked_contexts = mask_context(context, role, tokenizer, model)

    # Prompt + LLM
    prompt = final_prompt(question, masked_contexts)

    client = OpenAI(
      base_url="https://ki32sz6gk1njugbq.us-east-1.aws.endpoints.huggingface.cloud/v1/",
      api_key=HF_TOKEN
    )

    r = client.chat.completions.create(
        model = "tgi",
        messages = [
          {
            "role": "user",
            "content": prompt
          }
        ],
        max_tokens=512
      )
    response = r.choices[0].message.content
    bool_answer = is_actual_answer(response)

    print(i)

    with open(output_path, mode='a', newline='', encoding='utf-8') as file:
      writer = csv.DictWriter(file, fieldnames=[
          "index", "role", "question", "masked_context",
          "llm_answer", "llm_says_answerable",
          "original_answer", "is_heuristically_answerable", "model"
      ])
      writer.writerow({
          "index": index_counter,
          "role": role,
          "question": question,
          "masked_context": joined_masked_context,
          "llm_answer": response,
          "llm_says_answerable": bool_answer,
          "original_answer": balanced_dataset[i]['original_answer'],
          "is_heuristically_answerable": balanced_dataset[i]['is_answerable'],
          "model": model_config['name']
      })




