# Gemini + RAG: From Simple Answers to Validated, Structured Output

This notebook is a continuation of `hugging_face_chromadb_demo.ipynb`. Make sure you have run that notebook first so that the ChromaDB collection is persisted to disk.

**What you will learn:**
1. **RAG with [Gemini](https://ai.google.dev/gemini-api/docs/quickstart)** — retrieve context, generate an answer
2. **LLM-as-a-judge** — validate the answer against sources
3. **Structured output** — JSON mode for predictable responses
4. **Multimodal** — send PDF pages directly

In [1]:
import os
import re
import unicodedata
from pathlib import Path
import json
from google import genai
import chromadb

DATA_DIR = Path('data')
PDF_DIR = DATA_DIR / 'pdf'
TXT_DIR = DATA_DIR / 'txt'

MODEL = 'gemini-2.5-flash'
TEMPERATURE = 0

def normalize_text(text):
    """Normalize PDF-extracted text for reliable string matching.

    Handles ligatures (fi→fi), hyphenated line breaks (frag-\nmentation),
    and extra whitespace from PDF layout.
    """
    text = unicodedata.normalize('NFKD', text)
    text = re.sub(r'-\s*\n\s*', '', text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

# Initialize Gemini client; you'll probably be setting api_key instead
from dotenv import load_dotenv
load_dotenv()
client = genai.Client(
    vertexai=True,
    project=os.getenv('GCP_PROJECT'),
    location=os.getenv('GCP_LOCATION')
)

# Load the persisted ChromaDB collection from previous notebook
chroma_client = chromadb.PersistentClient()
collection = chroma_client.get_collection(name='ostep')

In [2]:
# Ask a question and retrieve relevant documents
question = "What is a semaphore and how is it used?"
results = collection.query(query_texts=[question], n_results=5, include=['documents'])

retrieved_docs = []
for doc_id, doc_text in zip(results['ids'][0], results['documents'][0]):
    chapter_id, page_idx_str = doc_id.rsplit('_', 1)
    page_index = int(page_idx_str)
    page_number = page_index + 1
    first_page = (TXT_DIR / chapter_id / '0.txt').read_text()
    chapter_name = first_page.split('\n')[1].strip()
    retrieved_docs.append({
        'doc_id': doc_id,
        'chapter_id': chapter_id,
        'chapter_name': chapter_name,
        'page_index': page_index,
        'page_number': page_number,
        'text': normalize_text(doc_text)
    })

for doc in retrieved_docs:
    print(f"[{doc['doc_id']}] {doc['chapter_name']} — page {doc['page_number']}")

[threads-sema_1] Semaphores — page 2
[threads-sema_2] Semaphores — page 3
[threads-sema_17] Semaphores — page 18
[threads-sema_0] Semaphores — page 1
[threads-sema_4] Semaphores — page 5


## 1. Ask Gemini a Question

We have 5 relevant pages from ChromaDB. Let's send them to Gemini as context and ask our question. The simplest approach: just get a free-text answer.

### Build the prompt

In [3]:
context = ""
for doc in retrieved_docs:
    context += (
        f"<SOURCE chapter_id=\"{doc['chapter_id']}\" page_number=\"{doc['page_number']}\">\n"
        f"{doc['text']}\n"
        f"</SOURCE>\n\n"
    )

prompt = f"""Answer the question using ONLY the provided source documents.

Question:
{question}

Sources:
{context}"""

print(prompt[:1000])

Answer the question using ONLY the provided source documents.

Question:
What is a semaphore and how is it used?

Sources:
<SOURCE chapter_id="threads-sema" page_number="2">
2 SEMAPHORES the semaphore, we must first initialize it to some value, as the code in Figure 31.1 does. 1 #include <semaphore.h> 2 sem_t s; 3 sem_init(&s, 0, 1); Figure 31.1: Initializing A Semaphore In the figure, we declare a semaphore s and initialize it to the value 1 by passing 1 in as the third argument. The second argument to sem_init() will be set to 0 in all of the examples we’ll see; this indicates that the semaphore is shared between threads in the same process. See the man page for details on other usages of semaphores (namely, how they can be used to synchronize access across different processes), which require a different value for that second argument. After a semaphore is initialized, we can call one of two functions to interact with it, sem_wait() or sem_post(). The behavior of these two functions 

### Now, let's check the AI response

In [4]:
response = client.models.generate_content(
    model=MODEL,
    contents=prompt,
    config={'temperature': TEMPERATURE}
)
print(response.text[:1000])

A semaphore is an object with an integer value that can be manipulated using two routines: `sem_wait()` and `sem_post()`. The initial value of the semaphore determines its behavior.

The routines operate as follows:
*   `sem_wait(sem_t *s)`: Decrements the value of semaphore `s` by one. If the value of semaphore `s` becomes negative, the calling thread will wait (suspend execution). If multiple threads call `sem_wait()`, they can all be queued waiting to be woken.
*   `sem_post(sem_t *s)`: Increments the value of semaphore `s` by one. If there are one or more threads waiting, it wakes one of them up.
The value of the semaphore, when negative, is equal to the number of waiting threads.

Semaphores are used as a synchronization primitive and can function as both locks and condition variables.

Here are two primary ways semaphores are used:

1.  **As a Lock (Binary Semaphore):**
    *   A semaphore can be used to protect a critical section, ensuring that only one thread can execute it at 

That looks like a solid answer. But how do we know it's actually correct and derived from our textbook sources? LLMs are prone to hallucination — they can produce fluent, confident text that's completely made up.

There are several ways to address this. In this demo, we'll look at one approach: **LLM-as-a-judge**.

## 2. LLM-as-a-Judge: Is the Answer Grounded?

The idea is simple: use a second Gemini call whose only job is to check whether each claim in the answer is supported by the source text. The judge task is easier than generation — it's just comparing claims against documents.

### Let us build the AI judge

In [5]:
judge_prompt = f"""You are a fact-checker. Given an AI-generated answer and the source documents it was based on, determine if every claim in the answer is supported by the sources.

Return JSON with this format:
{{
    "is_grounded": true/false,
    "unsupported_claims": [
        {{"claim": "...", "explanation": "..."}}
    ]
}}

If all claims are supported, set is_grounded to true and unsupported_claims to an empty list.

AI-generated answer:
{response.text}

Source documents:
{context}"""

print("Here is how the AI judge prompt looks like:")
print(judge_prompt[:1000])

Here is how the AI judge prompt looks like:
You are a fact-checker. Given an AI-generated answer and the source documents it was based on, determine if every claim in the answer is supported by the sources.

Return JSON with this format:
{
    "is_grounded": true/false,
    "unsupported_claims": [
        {"claim": "...", "explanation": "..."}
    ]
}

If all claims are supported, set is_grounded to true and unsupported_claims to an empty list.

AI-generated answer:
A semaphore is an object with an integer value that can be manipulated using two routines: `sem_wait()` and `sem_post()`. The initial value of the semaphore determines its behavior.

The routines operate as follows:
*   `sem_wait(sem_t *s)`: Decrements the value of semaphore `s` by one. If the value of semaphore `s` becomes negative, the calling thread will wait (suspend execution). If multiple threads call `sem_wait()`, they can all be queued waiting to be woken.
*   `sem_post(sem_t *s)`: Increments the value of semaphore 

### Run the AI judge

In [6]:
judge_response = client.models.generate_content(
    model=MODEL,
    contents=judge_prompt,
    config={
        'temperature': TEMPERATURE,
        'response_mime_type': 'application/json'
    }
)

verdict = json.loads(judge_response.text)
print(f"Grounded: {verdict['is_grounded']}")
if verdict['unsupported_claims']:
    for claim in verdict['unsupported_claims']:
        print(f"  - {claim['claim']}: {claim['explanation']}")
else:
    print("All claims are supported by the sources.")

Grounded: True
All claims are supported by the sources.


### Putting It Together: Retrieve → Generate → Judge → Retry

![RAG + LLM judge](rag_judge.png)

Now let's wrap the whole pipeline into a reusable function. The flow:

1. **Retrieve** relevant pages from ChromaDB
2. **Generate** a free-text answer with Gemini
3. **Judge** — ask a second Gemini call if the answer is grounded
4. **Retry** — if not grounded, feed back the unsupported claims and regenerate

First, we'll extract the retrieval + context-building logic into a helper (it was duplicated in cells above).

In [7]:
def retrieve_and_build_context(question, collection, n_results=5):
    """Retrieve relevant docs from ChromaDB and format them as labeled context."""
    results = collection.query(query_texts=[question], n_results=n_results, include=['documents'])

    retrieved_docs = []
    for doc_id, doc_text in zip(results['ids'][0], results['documents'][0]):
        chapter_id, page_idx_str = doc_id.rsplit('_', 1)
        page_index = int(page_idx_str)
        page_number = page_index + 1
        first_page = (TXT_DIR / chapter_id / '0.txt').read_text()
        chapter_name = first_page.split('\n')[1].strip()
        retrieved_docs.append({
            'doc_id': doc_id,
            'chapter_id': chapter_id,
            'chapter_name': chapter_name,
            'page_index': page_index,
            'page_number': page_number,
            'text': normalize_text(doc_text)
        })

    context = ""
    for doc in retrieved_docs:
        context += (
            f"<SOURCE chapter_id=\"{doc['chapter_id']}\" page_number=\"{doc['page_number']}\">\n"
            f"{doc['text']}\n"
            f"</SOURCE>\n\n"
        )

    return retrieved_docs, context

Now the main function. It calls the helper above, then runs a **generate → judge → retry** loop:

- **Generate**: ask Gemini to answer using only the source documents.
- **Judge**: a second Gemini call checks if every claim is supported by the sources.
- **Retry**: if the judge finds unsupported claims, we feed them back as error feedback and regenerate (up to `max_retries` times).

In [8]:
def query_with_judge(question, collection, max_retries=3):
    """Full RAG pipeline: retrieve -> generate -> judge -> retry if not grounded."""
    retrieved_docs, context = retrieve_and_build_context(question, collection)

    error_feedback = ""
    for attempt in range(1, max_retries + 1):
        # Generate free-text answer
        prompt = f"""Answer the question using ONLY the provided source documents.
        {error_feedback}
        Question:
        {question}

        Sources:
        {context}"""

        response = client.models.generate_content(
            model=MODEL,
            contents=prompt,
            config={'temperature': TEMPERATURE}
        )
        answer_text = response.text

        # Judge: is the answer grounded?
        judge_prompt = f"""You are a fact-checker. Given an AI-generated answer and the source documents it was based on, determine if every claim in the answer is supported by the sources.

        Return JSON with this format:
        {{
            "is_grounded": true/false,
            "unsupported_claims": [
                {{"claim": "...", "explanation": "..."}}
            ]
        }}

        If all claims are supported, set is_grounded to true and unsupported_claims to an empty list.

        AI-generated answer:
        {answer_text}

        Source documents:
        {context}"""

        judge_response = client.models.generate_content(
            model=MODEL,
            contents=judge_prompt,
            config={
                'temperature': TEMPERATURE,
                'response_mime_type': 'application/json'
            }
        )
        verdict = json.loads(judge_response.text)

        if verdict['is_grounded']:
            print(f'Attempt {attempt}: grounded! All claims supported.')
            return answer_text, retrieved_docs

        # Not grounded — retry with feedback
        print(f'Attempt {attempt}: not grounded, retrying...')
        for claim in verdict['unsupported_claims']:
            print(f"  - {claim['claim']}: {claim['explanation']}")
        error_feedback = (
            "\nA fact-checker found these unsupported claims in your previous answer — fix them:\n"
            + '\n'.join(f"- {c['claim']}: {c['explanation']}" for c in verdict['unsupported_claims'])
            + '\n'
        )

    print(f'Failed after {max_retries} attempts.')
    return answer_text, retrieved_docs

Let's test the full pipeline.

In [9]:
answer_text, _ = query_with_judge(question, collection)
print()
print('=== Answer ===')
print(answer_text[:1000])

Attempt 1: grounded! All claims supported.

=== Answer ===
A semaphore is an object with an integer value that can be manipulated using two routines: `sem_wait()` and `sem_post()`. Before use, a semaphore must be initialized to some value using `sem_init()`.

**How it is used:**

1.  **Initialization:** A semaphore `s` is initialized using `sem_init(&s, 0, initial_value)`. The second argument `0` indicates it's shared between threads in the same process.

2.  **`sem_wait()`:**
    *   Decrements the value of the semaphore by one.
    *   If the semaphore's value becomes negative, the calling thread suspends execution and waits.
    *   If the value is zero or positive, it returns immediately.
    *   Multiple threads can call `sem_wait()` and be queued waiting to be woken.

3.  **`sem_post()`:**
    *   Increments the value of the semaphore by one.
    *   If there are one or more threads waiting (i.e., the semaphore's value was negative before the increment), it wakes one of them up.


Why does LLM-as-a-judge work?

- **Simpler task**: The judge only needs to answer "is this claim supported?" — a much easier job than writing the answer in the first place.
- **Source text in context**: The judge has the source documents right in front of it for direct comparison.
- **Catches semantic issues**: Unlike string matching, the judge can detect paraphrased hallucinations — claims that *sound* right but aren't actually in the sources.
- **Tradeoff**: The judge isn't deterministic — it can make mistakes too. But for catching whether an *answer* is grounded (not just whether a *quote* was copied correctly), it's a better fit than string matching.

## 3. Structured Output

So far, Gemini has returned free-text answers. That's fine for a chatbot, but real applications need predictable structure — a frontend needs specific fields, a database needs typed columns, a downstream API needs a fixed schema.

Gemini's JSON mode (`response_mime_type: 'application/json'`) guarantees valid JSON output. We describe the shape we want in the prompt, and Gemini returns it — same technique we used for the judge, but now with a richer schema that includes citations and key concepts.

In [10]:
prompt = f"""Answer the question using ONLY the provided source documents.

Return JSON with this format:
{{
    "answer": "your answer here",
    "citations": [
        {{"chapter_id": "...", "page_number": 1}}
    ],
    "key_concepts": ["concept1", "concept2"]
}}

Rules:
- For each citation, set chapter_id and page_number to match the SOURCE tag exactly.
- For key_concepts, list important terms that appear in the source text.

Question:
{question}

Sources:
{context}"""

response = client.models.generate_content(
    model=MODEL,
    contents=prompt,
    config={
        'temperature': TEMPERATURE,
        'response_mime_type': 'application/json'
    }
)

In [11]:
# Print as json
print(response.text[:1500])

{
    "answer": "A semaphore is an object characterized by an integer value, which can be manipulated using two primary routines: `sem_wait()` and `sem_post()`. Before use, a semaphore must be initialized to a specific value using `sem_init()`. The `sem_wait()` routine decrements the semaphore's value and will cause the calling thread to suspend execution if the value becomes negative (or is not greater than or equal to 0). Conversely, `sem_post()` increments the semaphore's value and, if there are threads waiting, wakes one of them up.\n\nSemaphores are used for various synchronization purposes:\n1.  **As a lock (binary semaphore)**: To protect critical sections, a semaphore is initialized to 1. A thread calls `sem_wait()` before entering the critical section and `sem_post()` upon exiting, ensuring mutual exclusion.\n2.  **As an ordering primitive**: To make one thread wait for another to complete an action. For instance, a parent thread can wait for a child thread to finish by initia

In [12]:
# Print the response in user friendly format
answer = json.loads(response.text)
print('=== Answer ===')
print(answer['answer'][:1000])
print()
print('=== Citations ===')
for c in answer['citations']:
    print(f"  [{c['chapter_id']} p.{c['page_number']}]")
print()
print('=== Key Concepts ===')
print(answer['key_concepts'])

=== Answer ===
A semaphore is an object characterized by an integer value, which can be manipulated using two primary routines: `sem_wait()` and `sem_post()`. Before use, a semaphore must be initialized to a specific value using `sem_init()`. The `sem_wait()` routine decrements the semaphore's value and will cause the calling thread to suspend execution if the value becomes negative (or is not greater than or equal to 0). Conversely, `sem_post()` increments the semaphore's value and, if there are threads waiting, wakes one of them up.

Semaphores are used for various synchronization purposes:
1.  **As a lock (binary semaphore)**: To protect critical sections, a semaphore is initialized to 1. A thread calls `sem_wait()` before entering the critical section and `sem_post()` upon exiting, ensuring mutual exclusion.
2.  **As an ordering primitive**: To make one thread wait for another to complete an action. For instance, a parent thread can wait for a child thread to finish by initializing

By setting `response_mime_type: 'application/json'` and describing the desired shape in the prompt, Gemini returns valid JSON that we parse with `json.loads()`. Same technique as the judge call — just a richer schema with citations and key concepts.

## 4. Multimodal PDF Input (BONUS)

So far, we've been sending extracted text to Gemini. But Gemini is multimodal — it can read PDFs directly. This is useful when the text extraction misses important information: tables, figures, diagrams, formulas, or scanned documents.

Let's send the full PDF (as bytes) instead of plain text, and still get structured JSON back.

In [13]:
# Use the top retrieved document from our original question
top = retrieved_docs[0]
pdf_path = PDF_DIR / f"{top['chapter_id']}.pdf"
pdf_bytes = pdf_path.read_bytes()
print(f"Sending PDF: {pdf_path.name} ({len(pdf_bytes)} bytes)")

Sending PDF: threads-sema.pdf (150277 bytes)


In [14]:
# Send the full PDF directly to Gemini
multimodal_prompt = f"""You are given a PDF from a textbook on operating systems.
Answer the question using ONLY what you see in this document.

Return JSON with this format:
{{
    "answer": "your answer here",
    "citations": [
        {{"chapter_id": "...", "page_number": 1}}
    ],
    "key_concepts": ["concept1", "concept2"]
}}

Rules:
- Set chapter_id to "{top['chapter_id']}".
- Set page_number to the page where you found the supporting information.
- For key_concepts, list important terms that appear in the document.

Question:
{question}"""

response = client.models.generate_content(
    model=MODEL,
    contents=[
        multimodal_prompt,
        genai.types.Part.from_bytes(data=pdf_bytes, mime_type='application/pdf')
    ],
    config={
        'temperature': TEMPERATURE,
        'response_mime_type': 'application/json'
    }
)

multimodal_answer = json.loads(response.text)
print('=== Answer ===')
print(multimodal_answer['answer'][:1000])
print()
print('=== Citations ===')
for c in multimodal_answer['citations']:
    print(f"  [{c['chapter_id']} p.{c['page_number']}]")
print()
print('=== Key Concepts ===')
print(multimodal_answer['key_concepts'][:500])

=== Answer ===
A semaphore is an object with an integer value that can be manipulated using two routines: `sem_wait()` and `sem_post()`. The `sem_wait()` routine decrements the semaphore's value by one and causes the calling thread to suspend execution if the value becomes negative (or indicates no resource is available). The `sem_post()` routine increments the semaphore's value by one and, if there are any threads waiting, wakes one of them up.

Semaphores are versatile synchronization primitives used for various concurrency problems:
1.  **As Locks (Binary Semaphores)**: By initializing a semaphore to 1, it can serve as a mutual exclusion lock. A thread calls `sem_wait()` before entering a critical section to acquire the lock and `sem_post()` after exiting to release it. If the lock is already held, other threads attempting to acquire it will wait.
2.  **For Ordering Events**: By initializing a semaphore to 0, it can enforce a specific order of events between threads. One thread can 

When is multimodal input useful?

- **Tables and figures**: text extraction often mangles table layouts; Gemini can read them directly from the PDF.
- **Scanned documents**: if the PDF is a scan (image-based), text extraction may fail entirely.
- **Formulas and diagrams**: mathematical notation and visual diagrams are better understood from the rendered page.

## Summary

Four patterns you can take away from this notebook:

1. **RAG basics** — retrieve context from ChromaDB, send it to Gemini, get a free-text answer.

2. **LLM-as-a-judge** — a second Gemini call validates whether the answer is grounded in the source text. If not, feed the unsupported claims back as feedback and retry.

3. **Structured output** — use `response_mime_type: 'application/json'` to get valid JSON from Gemini. Describe the desired shape in the prompt and parse with `json.loads()`.

4. **Multimodal input** — when text extraction isn't enough, send the full PDF directly to Gemini. You still get structured JSON back, and the same judge validates it.