# Regulatory Information Retrieval and Answer Generation (RIRAG)

This notebook solves the following task of the Regulatory Information Retrieval and Answer Generation competition.

_Using the question and the passages retrieved in Subtask 1 (See ObliQA.ipynb notebook), participants must generate a comprehensive, accurate, and coherent answer. This subtask emphasizes the ability to synthesize information from multiple sources and present it in a clear and logical manner, ensuring that the answer fully addresses the compliance and obligation requirements of the query._

The notebook demonstrates how we can leverage _Retrieval Augmented Generation_ and _Large Language Models_ to synthesize the results obtained through the hybrid (lexical and semantic) search to provide an accurate and precise answer to help professionals navigate the regulatory content.

In [1]:
# Import azure open AI library to use access API-based LLMs. 
from openai import AzureOpenAI

# Import data structures used for throttling implementation for standard deployments in Azure.
from collections import (
    defaultdict,
    deque
)
from threading import Lock
import asyncio

# Import util libraries
from tqdm import tqdm
from dotenv import load_dotenv
import os
import json
import time

In [2]:
# Copy RePASs repo to validate our results - ONLY RUN ONCE
#!git clone https://github.com/RegNLP/RePASs.git && cd RePASs

In [3]:
# Load environment variables to handle access to the openAI API using secrets
# Variables to be defined:
# QNA_ENDPOINT_URL: Deployment endpoint for inference/chat completion
# QNA_OPENAI_API_KEY: Key to access openAI API
#
load_dotenv()

False

In [4]:
# Load passages from disk
ndocs = 40  # Number of regulatory documents to process
passages = defaultdict(str) # List to store all passages extracted from the regulatory documents

# Extract the passages in each document
for i in range(1, ndocs + 1):
    with open(os.path.join("ObliQADataset/StructuredRegulatoryDocuments", f"{i}.json")) as f:
        doc = json.load(f)  # Loads the contents of the JSON file
        for psg in doc:  # Map each passageId to the actual content
            passages[psg["ID"]] = psg["Passage"]

In [5]:
rankings_dict = defaultdict(list) # Maps a question to the relevant passage and its corresponding ranking score

# Load the rankings file in memory
with open('data/rankings_hybrid.trec', 'r') as f:
    # File format: QuestionID Q0 DocumentID Rank Score Method
    for line in f:
        parts = line.strip().split()
        question_id = parts[0]
        document_id = parts[2]
        rank = int(parts[3])
        score = float(parts[4])
        rankings_dict[question_id].append({
            'doc': document_id,
            'score': score
        })

In [7]:
def extract_passages(question_id: str) -> list[str]:
    """
    Extracts the passages content that are relevant for answering the given question.
    Given a valid question id, it returns at least one passage and up to 10 passages
    that surpass a given relevance threshold.
    
    Args:
        question_id: The question id for which we want to extract the relevant passages
        
    Returns:
        List[str]: A list of passages that are relevant for answering the given question
    """
    retrieved_passages = []
    should_stop = False
    
    for i in range(len(rankings_dict[question_id])):
        # Si hubo una diferencia en relevancia significativa entre dos pasajes, no extraer mas pasajes
        # Si ya se extrajeron 10 pasajes, no extraer mas
        if should_stop or len(retrieved_passages) == 10:
            break
            
        # Si no se ha extraido ningun pasaje, extraer al menos uno
        if len(retrieved_passages) == 0:
            retrieved_passages.append(rankings_dict[question_id][i]["doc"])
            continue
                
        # Revisar si hay una diferencia en relevancia entre este y el siguiente pasaje de mas del 10%
        if i < len(rankings_dict[question_id]) - 1 and rankings_dict[question_id][i]["score"] - rankings_dict[question_id][i+1]["score"] > 0.1:
                should_stop = True

        # No incluir pasajes poco relevantes
        if rankings_dict[question_id][i]["score"] < 0.72:
            break

        retrieved_passages.append(rankings_dict[question_id][i]["doc"])
        
    # Extraer el texto plano
    retrieved_passages = [passages[doc] for doc in retrieved_passages]
    
    return retrieved_passages

## Azure OpenAI - Standard deployment

First, we use Azure OpenAI standard deployment to synthesize the retrieved passages for each question using `gpt 3.5 turbo`.
Since the API has enabled rate limit for both token and requests per minute, we use a decorator to throttle the function in the client to prevent sending requests that will be blocked.

In [None]:
endpoint = os.getenv('QNA_ENDPOINT_URL')
openAIKey = os.getenv('QNA_OPENAI_API_KEY')
llm_model = 'gpt-35-turbo'

if not endpoint:
    raise ValueError("The environment variable QNA_ENDPOINT_URL is not defined.")

if not openAIKey:
    raise ValueError("The environment variable QNA_OPENAI_API_KEY is not defined.")

openAI_client = AzureOpenAI(
    azure_endpoint=endpoint,
    api_key=openAIKey,
    api_version="2024-05-01-preview"
)

# Esta clase limita el numero de veces que se puede llamar una funcion en un intervalo de tiempo
# dado. Garantiza que las funcion se llame todas las veces pero no el mismo orden que fueron invocadas
# Limits the number of times we can call a function in a given time interval. 
# Guarantees that a function call will eventually happen, but it does not necessarily respect the order in which 
# the function was called
class Throttle:
    def __init__(self, rate_limit, time_window):
        self.rate_limit = rate_limit # Max number of calls allowed in the given interval
        self.time_window = time_window # The time interval
        self.calls = deque() # Stores the function calls so we can track when to remove calls as they become stale
        self.lock = Lock() # Locks to prevent concurrent access to the function
        self.queue = asyncio.Queue() # Stores function calls that need to be awaited before being executed

    def __call__(self, func):
        async def wrapped_func(*args, **kwargs):
            # Reference the global func
            nonlocal func
            # Lock concurrent access
            with self.lock:
                current_time = time.time()
                
                # Remove function calls that are outside of the time window
                while self.calls and self.calls[0] < current_time - self.time_window:
                    self.calls.popleft()

                # If we can make a call without exceeding the rate limit, then we just call it   
                if len(self.calls) < self.rate_limit:
                    self.calls.append(current_time)
                    return await func(*args, **kwargs)
                else:
                    # Otherwise, queue the function call for later
                    await self.queue.put((func, args, kwargs))
                    
                    # Process function calls in the queue
                    while not self.queue.empty():
                        # Dequeue the function call
                        func, args, kwargs = await self.queue.get()
                        current_time = time.time()

                        # Remove function calls that are outside of the time window
                        while self.calls and self.calls[0] < current_time - self.time_window:
                            self.calls.popleft()

                        # If we can make a call without exceeding the rate limit, then we just call it
                        if len(self.calls) < self.rate_limit:
                            self.calls.append(current_time)
                            result = await func(*args, **kwargs)
                            self.queue.task_done()
                            return result
                        else:
                            # Otherwise, wait a few seconds before retrying 
                            await self.queue.put((func, args, kwargs)) # Enqueue the function call again
                            await asyncio.sleep(min(10, self.time_window))
                    
        return wrapped_func
    
def build_prompt(question: str, relevant_passages: list[str]):
    """
    Builds the prompt that will be used to synthesize the passages
    
    Args:
        question: A well formed regulatory question
        relevant_passages: A list of relevant passages that should help answer the question
    
    Returns:
        A tuple with both the system prompt that contains instructions on how to answer and
        the user prompt that contains the actual question and passages
    
    """

    system_prompt = """You are a regulatory compliance assistant. Provide a **complete**, **coherent**, and **correct** response to the given question by synthesizing the information from the provided passages. Your answer should **fully integrate all relevant obligations, best practices, and insights**, and directly address the question. The passages are presented in order of relevance, so **prioritize the information accordingly** and ensure consistency in your response, avoiding any contradictions. Additionally, reference **specific regulations and key compliance requirements** outlined in the regulatory content to support your answer. **Do not use any extraneous or external knowledge** outside of the provided passages when crafting your response.
    """
    
    ## Add examples directly from sample.json

    user_prompt = f"Question: {question}\n\nPassages:\n\n"
    for idx, passage in enumerate(relevant_passages, 1):
        user_prompt += f"{passage}\n\n"
        
    return (system_prompt, user_prompt)

# Allow maximum 60 calls every 70 seconds
@Throttle(rate_limit=60, time_window=70)
async def summarize_answer(question: str, relevant_passages: list[str]) -> str:
    """
    Summarizes the answer based on the provided passages
    
    Args:
        question: A well formed regulatory question
        relevant_passages: A list of relevant passages that should help answer the question
    """

    (system_prompt, user_prompt) = build_prompt(question, relevant_passages)

    # Executes the LLM API call
    completion = openAI_client.chat.completions.create(
        model=llm_model, # we are using gpt-3.5-turbo
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0.25, # Controls how deterministic the response is. Higher values result in more creativity. We want more easy-to-reproduce results
        frequency_penalty=0.0,
        presence_penalty=0.0,
        stop=None,
        stream=False,
        max_tokens=800,
    )

    return completion.choices[0].message.content
    

In [10]:
answers = []

# load the test dataset
with open("ObliQADataset/ObliQA_test.json") as f:
    data = json.load(f)  # Load the JSON file
    
    # For each question:
    for e in tqdm(data):  # tqdm adds a progress bar
        query = e['Question']  # Extract the actual question
        question_id = e["QuestionID"] # Extract the question id
        
        retrieved_passages = extract_passages(question_id)

        answer = await summarize_answer(query, retrieved_passages)

        answers.append({
            "QuestionID": question_id,
            "RetrievedPassages": retrieved_passages,
            "Answer": answer
        })

# Store the results in a json File
with open("data/answers.json", "w") as f:
    json.dump(answers, f, indent=2)        

100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2786/2786 [1:56:08<00:00,  2.50s/it]


## Azure OpenAI - Batch deployment

Second, we use Azure OpenAI batch deployment to synthesize the retrieved passages for each question using `gpt-4o-mini`.
We leverage the batch API to send all queries at once. The response is retrieved offline from the Azure open AI portal.

In [25]:
def queue_batch_summarization_job(jobs):
    """
    Create a batch Job in Azure open AI to generate the answers for all questions with a single request
    """
    endpoint = os.getenv('QNA_ENDPOINT_URL')
    openAIKey = os.getenv('QNA_OPENAI_API_KEY')

    if not endpoint:
        raise ValueError("No se ha definido la variable de entorno QNA_ENDPOINT_URL")

    if not openAIKey:
        raise ValueError("No se ha definido la variable de entorno QNA_OPENAI_API_KEY")

    openAI_client = AzureOpenAI(
        azure_endpoint=endpoint,
        api_key=openAIKey,
        api_version="2024-08-01-preview"
    )

    # File that can either be uploaded manually or programatically
    file_name = "data/batch_questions.jsonl"

    # Save file contents using json lines format
    with open(file_name, 'w') as file:
        for job in jobs:
            file.write(json.dumps(job) + '\n')

    # Upload the file programatically
    batch_file = openAI_client.files.create(
      file=open(file_name, "rb"),
      purpose="batch"
    )
    
    # Wait until the file upload is done
    while True:
        file = openAI_client.files.retrieve(batch_file.id)
        if file.status == "processed" or file.status == "error":
            break
        time.sleep(10)
    
    # Trigger the batch job using the uploaded file
    # Result should terminate in less than 24 hours
    batch_job = openAI_client.batches.create(
      input_file_id=batch_file.id,
      endpoint="/v1/chat/completions",
      completion_window="24h"
    )
    
    return batch_job

In [26]:
jobs = []

# Load the test dataset
with open("ObliQADataset/ObliQA_test.json") as f:
    data = json.load(f)  # Load the JSON file
    
    # For each question:
    for e in tqdm(data):  # tqdm adds a progress bar
        query = e['Question']  # Extract the actual question
        question_id = e["QuestionID"] # Extract the question id

        retrieved_passages = extract_passages(question_id)

        (system_prompt, user_prompt) = build_prompt(query, retrieved_passages)
        
        jobs.append({
            "custom_id": question_id,
            "method": "POST",
            "url": "/chat/completions",
            "body": {
                "model": "gpt-4o-mini",
                "messages": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt}
                ],
                "temperature": 0.25,
                "frequency_penalty": 0.0,
                "presence_penalty": 0.0,
                "max_tokens": 800,
            }
        })
        
batch_job = queue_batch_summarization_job(jobs)

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2786/2786 [00:00<00:00, 94557.58it/s]


In [29]:
# At this point the result has been downloaded offline and uploaded to the data folder
answers = []

with open("data/batch_result.jsonl") as f:
    # Parse each line of the file as a JSON
    results = [json.loads(line) for line in f]
    
    for result in results:
        # For each result, create an entry in answers array to later create the output file
        question_id = result["custom_id"]
        # Since this function is deterministic, we can just call it again
        # Clearly, there's an optimization we can do to avoid calling this twice
        retrieved_passages = extract_passages(question_id) 
        answer = result["response"]["body"]["choices"][0]["message"]["content"]
        
        answers.append({
            "QuestionID": question_id,
            "RetrievedPassages": retrieved_passages,
            "Answer": answer
        })
        
# Save the results as a JSON file
with open("data/answers-4o.json", "w") as f:
    json.dump(answers, f, indent=2)  

In [None]:
## Script to evaluate the results. Results are placed in /RePASs/data/hybrid or /RePASs/data/hybrid-4o
## These scripts must be run using the virtual env in RePASs

#python scripts/evaluate_model.py --input_file ./../data/answers.json --group_method_name hybrid
#python scripts/evaluate_model.py --input_file ./../data/answers-4o.json --group_method_name hybrid-4o