# RAG Answer Generation

Set the dataset folder and filenames below to use with any RAG dataset (single-hop, multi-hop, lang-RAG).


In [1]:
import os, json, requests, time
from dotenv import load_dotenv
load_dotenv()
# Set your dataset folder here (e.g. 'single-hop-RAG-dataset', 'multi-hop-RAG-dataset', 'lang-RAG-dataset')
dataset_folder = 'single-hop-RAG-dataset'  # Change as needed
input_json = ('questions.json')  # Change as needed
output_json = ('doc-1_300_bge_small_jinja-reranker-tiny_qwen4B-standard.json')  # data_type, generator, embedding, reranker
log_filename = output_json.replace('.json', '_log.csv')
input_json_path = os.path.join(dataset_folder, input_json)
output_json_path = os.path.join(dataset_folder, output_json)
answer_file_path = os.path.abspath('agent_files/answer.json')
answer_archive_path = os.path.join(dataset_folder, 'answer_generated_all.json')
logging_path = os.path.join(dataset_folder, 'logging.json')
NUM_SAMPLES = 500  # Change as needed
RAG_API_URL = os.environ.get('RAG_API_URL', 'http://127.0.0.1:10000/v1/chat/completions')
RAG_API_METHOD = os.environ.get('RAG_API_METHOD', 'GET').upper()

In [2]:
def call_rest_api(question):

    # Archive and remove any existing local agent answer file
    if os.path.exists(answer_file_path):
        with open(answer_file_path, 'r', encoding='utf-8') as f:
            existing_data = json.load(f)
        with open(answer_archive_path, 'a', encoding='utf-8') as f:
            json.dump(existing_data, f, indent=2, ensure_ascii=False)
            f.write('\n')
        os.remove(answer_file_path)
    headers = {'Content-Type': 'application/json'}
    payload = {'messages': [{'role': 'user', 'content': question}]}
    start_time = time.time()
    try:
        response = requests.request(
            method=RAG_API_METHOD,
            url=RAG_API_URL,
            headers=headers,
            data=json.dumps(payload),
            timeout=60
        )
        response.raise_for_status()
        response_json = response.json()
    except Exception as e:
        print(f'Request failed: {e}')
        return {'response': '', 'retrieved_contexts': [], 'response_time': None}
    end_time = time.time()
    response_time = end_time - start_time
    try:
        with open(logging_path, 'a', encoding='utf-8') as log_file:
            json.dump(response_json, log_file, indent=2, ensure_ascii=False)
            log_file.write('\n')
    except Exception:
        pass
    # Updated parsing for nested 'Result' key
    message = response_json.get('content', {}).get('Result', {})
    answer = message.get('content', '') if isinstance(message, dict) else ''
    rag_entries = message.get('rag_entries', []) if isinstance(message, dict) else []
    retrieved_contexts = [entry.get('text', '') for entry in rag_entries if isinstance(entry, dict)]
    print(f'Retrieved {len(retrieved_contexts)} chunks for question\n')
    return {'response': answer, 'retrieved_contexts': retrieved_contexts, 'response_time': response_time}

In [3]:
def update_answers_via_rest(input_path, output_path, num_examples=NUM_SAMPLES):
    """Generate answers via REST API and save results locally only."""
    import os, json
    try:
        with open(input_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        updated_data = []
        for i, entry in enumerate(data[:num_examples]):
            question = entry.get('user_input', '')
            print(f'ğŸ”„ Generating answer for entry {i + 1}/{num_examples}')
            generated = call_rest_api(question)
            updated_entry = entry.copy()
            updated_entry['response'] = generated.get('response', '')
            updated_entry['retrieved_contexts'] = generated.get('retrieved_contexts', [])
            updated_entry['response_time'] = generated.get('response_time', None)
            updated_data.append(updated_entry)
        # Save results
        os.makedirs('results', exist_ok=True)
        results_path = os.path.join('results_final', os.path.basename(output_path))
        with open(results_path, 'w', encoding='utf-8') as f:
            json.dump(updated_data, f, indent=2, ensure_ascii=False)
    except Exception as e:
        print(f'Error during answer generation: 100{e}')

In [4]:

# save logs and generated answers to results_final folder
os.makedirs('results_final', exist_ok=True)
update_answers_via_rest(input_json_path, output_json_path, NUM_SAMPLES)

ğŸ”„ Generating answer for entry 1/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 2/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 3/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 4/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 5/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 6/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 7/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 8/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 9/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 10/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 11/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 12/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 13/500
Retrieved 5 chunks for question

ğŸ”„ Generating answer for entry 14/500
Retrieved 5 chunks f