In [None]:
# !pip install flash-attn --no-build-isolation

In [1]:
import json
import outlines
import torch
from typing import List, Optional
from pydantic import BaseModel, Field
from dotenv import load_dotenv
from datasets import load_dataset,Dataset
from rich import print
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.prompts import ChatPromptTemplate
from transformers import AutoModelForCausalLM, AutoTokenizer
from typing import Dict,Tuple,Any
from outlines import Generator
from outlines.types import JsonSchema
from outlines import from_transformers
import os
import tqdm
# os.environ["TRANSFORMERS_VERBOSITY"] = "info"
load_dotenv()

True

In [2]:
os.environ['HF_HUB_ENABLE_HF_TRANSFER'] = '1'
os.environ['TOKENIZERS_PARALLELISM'] = 'true'

In [3]:
PARA = 0.5
MODEL_NAME = f"Qwen/Qwen2.5-{PARA}B-Instruct"
MODEL_PATH = "./Models"
DATASET_NAME = "Vishva007/RBI-Circular-QA-Dataset"
SEED = 42
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") 
GOOGLE_MODEL_ID="gemini-2.0-flash"
GOOGLE_TEMPERATURE=0.01
GOOGLE_MAX_TOKENS=256
EVAL_SET_SIZE = 10
MERGED_MODEL_OUTPUT_DIR = f"./Models/Qwen2.5-{PARA}B-Instruct-RBI-QA-Merged"
MODEL_FINETUNED_REPO_ID = f"Vishva007/Qwen2.5-{PARA}B-Instruct-RBI-QA-Finetuned"

In [4]:
print(f"Using Model: {MODEL_NAME} with {PARA}B parameters")

In [5]:
import os
print(f"Current working directory: {os.getcwd()}")
absolute_model_path = os.path.abspath(MODEL_PATH)
print(f"Absolute model path: {absolute_model_path}")


In [6]:
print(f"Loading dataset: {DATASET_NAME}")
print(f"Using seed: {SEED}")

In [7]:
try:
    dataset = load_dataset(DATASET_NAME,split="eval")
    print("Dataset loaded successfully!")
    print(dataset) # Print the dataset structure to see available splits
except Exception as e:
    print(f"Error loading dataset: {e}")
    print("Please ensure the dataset name is correct and you have an active internet connection.")


In [8]:
number_of_examples_for_eval = EVAL_SET_SIZE

# Get the total number of examples in full_dataset
num_examples = len(dataset)

# Calculate the start index for the last EVAL_SET_SIZE examples
start_index = num_examples - number_of_examples_for_eval

# Create a list of indices for the last 1EVAL_SET_SIZE0 examples
indices_to_select = list(range(start_index, num_examples))

# Use the .select() method to create a new Dataset object containing only those indices
eval_dataset = dataset.select(indices_to_select)


In [9]:
print("dataset type:", type(dataset))
print("eval_dataset type:", type(dataset))

In [10]:
class ResponseFormat(BaseModel):
    answer : str

class BatchResponseFormat(BaseModel):
    responses: List[ResponseFormat] = Field(..., description="List of responses for each question in the batch.")   
    
class EvaluationResult(BaseModel):
    score: int = Field(..., description="Score 1 if the answer fully satisfies ALL the specified criteria, 0 otherwise.", ge=0, le=1)


In [11]:
if torch.cuda.is_available():
    print("GPU is Used!")
else:
    print("CPU is Used!")

In [12]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.svâ€¦

In [13]:
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype="auto",
    device_map="auto",
    trust_remote_code=True,
    cache_dir=MODEL_PATH,
)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, cache_dir=MODEL_PATH)

tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "left"


In [None]:
import sys


def generate_responses_from_prompts(model, tokenizer, prompts: list[str]) -> list[str]:
    """
    Generates responses for a batch of prompts using a loaded causal language model and tokenizer.

    Args:
        model: The pre-trained causal language model.
        tokenizer: The tokenizer corresponding to the model.
        prompts (list[str]): A list of user prompts for which to generate responses.

    Returns:
        list[str]: A list of generated responses, corresponding to each input prompt.
    """
    if model is None or tokenizer is None:
        print("Model or tokenizer not loaded. Cannot generate responses.")
        return ["Error: Model not loaded." for _ in prompts]

    sys_prompt = """
        You are a highly knowledgeable AI assistant with expertise in Indian banking and financial regulations, 
        particularly those outlined in Reserve Bank of India (RBI) circulars.
        Your task is to answer questions based on the RBI circulars and related financial regulations.
        - Return only the answer to the question.
        - Do not include any additional information or explanations.
    """
    # Prepare messages for each prompt in the batch
    batched_messages = []
    for prompt in prompts:
        batched_messages.append([
            {"role": "system", "content": sys_prompt},
            {"role": "user", "content": prompt}
        ])

    # Apply chat template to each set of messages and collect texts
    batched_texts = [
        tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        ) for messages in batched_messages
    ]

    print(f"Processing {len(prompts)} prompts in batch...")

    try:
        # Tokenize the entire batch
        # padding=True pads sequences to the longest sequence in the batch
        # return_tensors="pt" returns PyTorch tensors
        model_inputs = tokenizer(batched_texts, return_tensors="pt", padding=True).to(model.device)

        # Generate responses for the entire batch
        # max_new_tokens controls the maximum number of tokens to generate per response
        # **model_inputs unpacks the dictionary into keyword arguments
        generated_ids = model.generate(
            **model_inputs,
            max_new_tokens=512,
            temperature=0.01,
            top_p=0.95,
            top_k=50
        )

        # Calculate the length of the input IDs for each original prompt
        # This is necessary to slice out only the newly generated tokens
        input_lengths = model_inputs.input_ids.shape[1]

        # Slice generated_ids to get only the new tokens for each item in the batch
        # We iterate through each generated sequence and remove the input part
        batch_generated_ids_only = [
            output_ids[input_lengths:] for output_ids in generated_ids
        ]

        # Decode the generated tokens back to text for the entire batch
        responses = tokenizer.batch_decode(batch_generated_ids_only, skip_special_tokens=True)
        print("Batch processing complete.")
        return responses

    except Exception as e:
        print(f"Error during generation: {e}")
        return ["Error generating response." for _ in prompts] # Return error for each prompt



In [15]:
prompt_to_process = eval_dataset[:5]["question"]
print(f"Processing prompt: {prompt_to_process}")

In [16]:
batch_responses = generate_responses_from_prompts(model, tokenizer, prompt_to_process)

# Print the generated responses
print("\n--- Generated Responses ---")
for i, response in enumerate(batch_responses):
    print(f"Prompt {i+1}: \"{prompt_to_process[i]}\"")
    print(f"Response {i+1}:\n{response}\n{'-'*30}\n")

In [17]:
# Initialize Gemini LLM
llm = ChatGoogleGenerativeAI(
        model=GOOGLE_MODEL_ID, 
        temperature=GOOGLE_TEMPERATURE,
        max_tokens=GOOGLE_MAX_TOKENS,
        timeout=None,
        api_key=GOOGLE_API_KEY,
    )

In [21]:


def generate_answers_with_transformers(
    model,
    eval_dataset: Dataset,
    batch_size: int = 5,
) -> List[Dict[str, str]]:
    """
    Generates answers for a dataset using an transformers LLM.

    Args:
        model: The loaded HuggingFace model for transformers.
        eval_dataset (datasets.Dataset): The dataset containing 'question' fields.
        batch_size (int): The number of samples to process in each batch.
        device (str): The device to run the model on (e.g., "cuda" or "cpu").

    Returns:
        List[Dict[str, str]]: A list of dictionaries, each containing the original question
                              and the 'generated_answer' from Outlines.
    """

    generated_data = []

    print(f"Generating answers in batches of {batch_size}...")
    for i in range(0, len(eval_dataset), batch_size):
        batch_slice = eval_dataset.select(list(range(i, min(i + batch_size, len(eval_dataset)))))
        
        inputs = [item['question'] for item in batch_slice.to_list()]

        try:
            
            batch_response_instance = generate_responses_from_prompts(model, tokenizer, inputs)
            
            for original_item, transformers_output in zip(batch_slice.to_list(), batch_response_instance):
                generated_data.append({
                    "question": original_item['question'],
                    "generated_answer": transformers_output
                })
        except Exception as e:
            print(f"Error during transformers generation for batch starting at index {i}: {e}")
            for original_item in batch_slice.to_list():
                 generated_data.append({
                    "question": original_item['question'],
                    "generated_answer": "\\ (Generation Failed)"
                })

    print("transformers answer generation complete.")
    return generated_data

In [22]:
generated_answers = generate_answers_with_transformers(
    model,
    eval_dataset,
    batch_size=5,
)

In [23]:
print(generated_answers)

In [24]:
def evaluate_answers_with_gemini(
    eval_dataset: Dataset,
    generated_answers_data: List[Dict[str, str]],
    llm: ChatGoogleGenerativeAI,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
    """
    Evaluates generated answers using a Gemini LLM, returning only the score in the structured output.

    Args:
        eval_dataset (datasets.Dataset): The original dataset containing 'question',
                                          'answer' (ground truth), and 'evaluation_criteria' fields.
        generated_answers_data (List[Dict[str, str]]): A list of dictionaries, where each
                                                       dictionary contains the original question
                                                       and the 'generated_answer'.
        llm (ChatGoogleGenerativeAI): The LangChain Gemini LLM instance.

    Returns:
        tuple: A tuple containing:
               - list: A list of dictionaries, where each dictionary contains the original question,
                       ground truth answer, generated answer, and evaluation score.
               - dict: A summary dictionary of the evaluation.
    """

    SYSTEM_MESSAGE = """
    You are an expert evaluator tasked with determining if an answer satisfies specified evaluation criteria.

    You will receive:
    1. A question
    2. The evaluation criteria
    3. The model's answer to evaluate

    Provide your score based on the criteria:
    - Score 1 if the answer fully satisfies ALL the specified criteria
    - Score 0 if it fails to meet ANY of the criteria

    Format your response as a JSON object with a single key 'score' containing just the number 1 or 0.
    For example: {{"score": 1}}
    """.strip()

    PROMPT_TEMPLATE = """
    Question: {question}
    Evaluation Criteria: {evaluation_criteria}
    Model Answer: {model_answer}

    Provide the evaluation score in the specified JSON format.
    """.strip()

    gemini_eval_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", SYSTEM_MESSAGE),
            ("human", PROMPT_TEMPLATE),
        ]
    )

    gemini_eval_chain = gemini_eval_prompt | llm.with_structured_output(EvaluationResult)

    results = []
    total_evaluated = 0
    total_score = 0
    failed_evaluations_due_to_error = 0

    print(f"Evaluating answers with Gemini...")
    combined_data = list(zip(eval_dataset.to_list(), generated_answers_data))

    for original_item, generated_item in combined_data:
        total_evaluated += 1

        question = original_item['question']
        ground_truth_answer = original_item['answer']
        evaluation_criteria = original_item['evaluation_criteria']
        generated_answer = generated_item['generated_answer']

        gemini_input = {
            "question": question,
            "evaluation_criteria": evaluation_criteria,
            "model_answer": generated_answer,
        }

        score_val = 0

        try:
            structured_result = gemini_eval_chain.invoke(gemini_input)
            score_val = structured_result.score
            total_score += score_val

        except Exception as e:
            print(f"Error during Gemini evaluation for question: '{question}'. Error: {e}")
            score_val = 0
            failed_evaluations_due_to_error += 1

        results.append({
            "question": question,
            "ground_truth_answer": ground_truth_answer,
            "generated_answer": generated_answer,
            "evaluation_score": score_val,
        })

    # Calculate summary statistics
    successful_eval_count = total_evaluated - failed_evaluations_due_to_error
    percentage_passed_criteria = (total_score / successful_eval_count) * 100 if successful_eval_count > 0 else 0

    summary = {
        "total_evaluations_attempted": total_evaluated,
        "successful_evaluations": successful_eval_count,
        "failed_evaluations_due_to_error": failed_evaluations_due_to_error,
        "total_passed_criteria": total_score,
        "percentage_passed_criteria": percentage_passed_criteria,
        "overall_evaluation_summary": (
            f"Out of {total_evaluated} attempted evaluations, "
            f"{successful_eval_count} were successfully processed by Gemini. "
            f"{failed_evaluations_due_to_error} evaluations encountered an error. "
            f"Among the successfully evaluated answers, {total_score} met all criteria, "
            f"resulting in a {percentage_passed_criteria:.2f}% pass rate."
        )
    }
    print("Gemini evaluation complete.")
    return results, summary


In [25]:
evaluation_results, evaluation_summary = evaluate_answers_with_gemini(
    eval_dataset=eval_dataset, 
    generated_answers_data=generated_answers, 
    llm=llm,
)

In [26]:
print(evaluation_results[:10])

In [27]:

print("\n--- Evaluation Summary ---")
summary_stats = evaluation_summary
print(f"Total Evaluations Attempted: {summary_stats['total_evaluations_attempted']}")
print(f"Evaluations Failed Due to Error: {summary_stats['failed_evaluations_due_to_error']}")
print(f"Total Answers Passed Criteria (Score 1): {summary_stats['total_passed_criteria']}")
print(f"Percentage of Successfully Evaluated Answers Passing Criteria: {summary_stats['percentage_passed_criteria']:.2f}%")
print(f"Overall Summary: {summary_stats['overall_evaluation_summary']}")

In [28]:
import os
import json
def save_evaluation_to_json(evaluation_results: list[dict], evaluation_summary: dict, output_dir: str, filename: str = "gemini_evaluation_report.json"):
    """
    Saves evaluation results and summary to a JSON file in the specified directory.

    Args:
        evaluation_results (list[dict]): A list of dictionaries, where each dictionary
                                         represents the evaluation of a single question-answer pair.
        evaluation_summary (dict): A dictionary containing the overall summary of the evaluation.
        output_dir (str): The directory where the JSON file should be saved.
                          This directory will be created if it does not exist.
        filename (str, optional): The name of the JSON file. Defaults to "gemini_evaluation_report.json".
    """
    # Combine evaluation results and summary into a single dictionary
    output_data = {
        "evaluation_results": evaluation_results,
        "evaluation_summary": evaluation_summary
    }

    # Ensure the output directory exists
    try:
        os.makedirs(output_dir, exist_ok=True)
    except OSError as e:
        print(f"Error creating directory {output_dir}: {e}")
        return

    # Construct the full path for the JSON file
    file_path = os.path.join(output_dir, filename)

    # Write the data to a JSON file
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(output_data, f, indent=4)
        print(f"Successfully wrote evaluation results and summary to {file_path}")
    except IOError as e:
        print(f"Error writing to file {file_path}: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

In [29]:
save_evaluation_to_json(
    evaluation_results=evaluation_results,
    evaluation_summary=evaluation_summary,
    output_dir="./Evaluation_Results",
    filename="qwen2.5-0.5B.json"
)