In [None]:
import os

ROOT_DIR = "...\\LiHua-World" # set the path
OUTPUT_FILE = 'LiHua-World-merged.txt'

# Merging the files and adding separators to make it easier for the model to distinguish between files
def merge_txt_files(root_dir, output_file):
    with open(output_file, 'w', encoding='utf-8') as outfile:
        for dirpath, _, filenames in os.walk(root_dir):
            for filename in filenames:
                if filename.lower().endswith('.txt'):
                    file_path = os.path.join(dirpath, filename)
                    try:
                        with open(file_path, 'r', encoding='utf-8') as infile:
                            content = infile.read()
                            outfile.write('=== DOCUMENT START ===\n')
                            outfile.write(content)
                            if not content.endswith('\n'):
                                outfile.write('\n')  
                            outfile.write('=== DOCUMENT END ===\n\n')
                    except Exception as e:
                        print(f"Error reading {file_path}: {e}")

merge_txt_files(ROOT_DIR, OUTPUT_FILE)

In [None]:
%pip install -q -U google-generativeai
%pip install -q requests

In [None]:
import datetime
import time
from google.generativeai import caching
import google.generativeai as genai
from google.generativeai import types
import json
import random

In [None]:
genai.configure(api_key="...") # set the key
config = types.GenerationConfig(temperature=0.0)

# Load the large context text from file
context_file_path = 'LiHua-World-merged.txt'  
with open(context_file_path, "r", encoding="utf-8") as f:
    context_text = f.read()


system_instruction = (
    "You are a domain expert assistant using ONLY the provided context to answer questions. "
    "Always stick to the facts in the context and do not speculate."
    "Format your response as: Answer: <answer>. Evidence: <evidence>."
    "Give a concise answer to the user's question, and then quote the exact TIME as supporting evidence."
    "If the context doesn't provide necessary information, Answer 'Insufficient information' and Evidence: N/A"
)

# Create the content payload for caching (the context is provided as a user message part)
cached_content = caching.CachedContent.create(
    model="gemini-1.5-flash-001",
    system_instruction=system_instruction,
    contents=context_text,
    display_name="haystack-context-cache",
    ttl=datetime.timedelta(minutes=600), # time to live
)

In [None]:
model = genai.GenerativeModel.from_cached_content(cached_content=cached_content)

In [None]:
# Load the QA dataset from json file
dataset_path = "query_set.json"  
with open(dataset_path, "r", encoding="utf-8") as f:
    qa_entries = json.load(f)

# convert qa_entries values to a list
if isinstance(qa_entries, dict):
    qa_entries = list(qa_entries.values())

print(f"Total entries loaded: {len(qa_entries)}")

# Sample 100 random entries for evaluation
random.seed(24)  
sample_entries = random.sample(qa_entries, 100)
print(f"Sampled {len(sample_entries)} entries for evaluation.")

In [None]:
responses = []  # tuples of (entry, model_answer)

for entry in sample_entries:
    question_text = entry["question"]
    # Call Gemini model with the question and cached context
    try:
        model_response = model.generate_content(question_text, generation_config=config)
        answer_text = model_response.text.strip()
    except Exception as e:
        # Handle any API errors
        print(f"Error querying model for question: {question_text[:50]}... - {e}")
        answer_text = ""  # empty answer on failure
    
    # Store the entry and model answer for later evaluation
    responses.append((entry, answer_text))
    
    # Delay for one second between API calls (was done to prevent the "exceeded rate limit" error)
    time.sleep(3)
    if len(responses)%10 == 0: 
        print(f"{len(responses)}/{len(sample_entries)} queries processed")

# Check a sample output
print("Example model response:")
print(f"Q: {responses[0][0]['question']}\nA: {responses[0][1]}")

In [None]:
# Write all queries and responses to a json file
output_data = []
for entry, answer in responses:
    output_data.append({
        "question": entry["question"],
        "expected_answer": entry["answer"],
        "expected_evidence": entry["evidence"],
        "question_type": entry["type"],
        "model_answer": answer
    })

output_file = "model_responses.json"
with open(output_file, "w", encoding="utf-8") as f:
    json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"All queries and responses have been written to {output_file}")


# Display the first 10 sample responses
print("\nFirst 10 sample responses:")
for i, (entry, response_text) in enumerate(responses[:10], start=1):
    print(f"\nSample {i}:")
    print(f"Question: {entry['question']}")
    print(f"Model Answer: {response_text}")

In [None]:
import re

# Helper function to normalize text for answer matching (ignore case and punctuation)
def normalize_text(s: str) -> str:
    """Convert text to lowercase and remove punctuation and extra whitespace."""
    s = s.lower()
    # Replace punctuation with space, then remove extra spaces
    s = ''.join(ch if ch.isalnum() or ch.isspace() else ' ' for ch in s)
    s = ' '.join(s.split())
    return s

# Function to check if the model's answer contains the expected answer
def check_answer(entry, model_output: str) -> bool:
    target = entry["answer"]
    output_norm = normalize_text(model_output)
    entry_type = entry["type"].lower()
    
    if entry_type == "single":
        # Single answer: the normalized target should be a substring of the normalized output.
        target_norm = normalize_text(str(target))
        return target_norm in output_norm
    elif entry_type == "multi":
        # Multi answer: check if any one of the target components appears in the output.
        if isinstance(target, list):
            target_components = [normalize_text(str(t)) for t in target]
        elif isinstance(target, str):
            # Split on common delimiters (comma, semicolon, slash, or ' and ')
            parts = re.split(r',|;|/| and ', target)
            target_components = [normalize_text(p) for p in parts if p.strip()]
        else:
            target_components = [normalize_text(str(target))]
        return any(comp in output_norm for comp in target_components)
    elif entry_type == "null":
        # Null type: the expected correct answer is "Insufficient information".
        expected = normalize_text("Insufficient information")
        return expected in output_norm
    else:
        # Fallback: if type is unrecognized, return False.
        return False

# Function to check if the model cited the exact evidence
def check_evidence(entry, model_output: str) -> bool:
    evidence_snippet = entry["evidence"].strip()
    return evidence_snippet in model_output

# Evaluate each response
score = 0
incorrect_log = []  # to record details of incorrect responses
for entry, model_ans in responses:
    answer_ok = check_answer(entry, model_ans)
    evidence_ok = check_evidence(entry, model_ans)
    if answer_ok and evidence_ok: # might be practical to remove evidence as it's hard for the model to recite correctly, 
        # even when the answer itself is spot-on
        score += 1  # correct on both counts
    else:
        # Log the incorrect case with relevant details
        incorrect_log.append({
            "question": entry["question"],
            "expected_answer": entry["answer"],
            "model_answer": model_ans,
            "expected_evidence": entry["evidence"],
            "type": entry["type"]
        })

print(f"Correctly answered: {score} out of {len(responses)}")

# Write the incorrect responses to a separate json file
incorrect_output_file = "incorrect_responses.json"
with open(incorrect_output_file, "w", encoding="utf-8") as f:
    json.dump(incorrect_log, f, indent=2, ensure_ascii=False)
print(f"Incorrect responses saved to {incorrect_output_file}")