This notebook demonstrates a simple synchronous RAG smoke test:

- It loads a handful of ground-truth questions from sample_gt.csv into test_dict, giving you a short list of prompts to exercise the pipeline.
`ProcessChunks` is instantiated with default_config, then get_chunks/embed_chunks/index_chunks run sequentially over a few PDF pages to build the in-memory VectorSearch. The tqdm progress bar reflects the blocking loop, so you can see embedding progress even in a synchronous run.

- The test loop is a plain for over test_dict wrapped with tqdm. Each iteration calls run_rag(question) synchronously, waits for the OpenAI response, and appends the resulting RAGResult to all_results. Because the work happens serially, the bar advances after every completed call, giving you immediate feedback on latency per query.

- Finally, it converts all_results into a pandas DataFrame to inspect question, answer, retrieved context, and the token/cost metrics populated inside RAGResult. That table is the quick health check confirming the synchronous pipeline works end-to-end before trying any async batching or larger evaluations.

In [1]:
import pandas as pd

gt_sample = pd.read_csv("sample_gt.csv")
gt_dict = gt_sample.to_dict(orient="records")
gt_dict[2]

{'question': 'performance differences in model APIs',
 'summary_answer': 'The excerpt mentions that the same AI model may perform differently across various APIs due to optimization techniques used, which necessitates thorough testing when switching APIs.',
 'difficulty': 'intermediate',
 'text': 'After developing a model, a developer can choose to open source it, make it\naccessible via an API, or both. Many model developers are also model\nservice providers. Cohere and Mistral open source some models and\nprovide APIs for some. OpenAI is typically known for their commercial\nmodels, but they’ve also open sourced models (GPT-2, CLIP). Typically,\nmodel providers open source weaker models and keep their best models\nbehind paywalls, either via APIs or to power their products.\nModel APIs can be available through model providers (such as OpenAI and\nAnthropic), cloud service providers (such as Azure and GCP [Google Cloud\nPlatform]), or third-party API providers (such as Databricks Mosa

In [3]:
test_dict = gt_dict[:2]
test_dict

[{'question': 'importance of reliable benchmarks in AI',
  'summary_answer': 'It emphasizes that many benchmarks may not accurately measure the intended metrics, stressing the need for careful evaluation.',
  'difficulty': 'beginner',
  'text': 'writing benchmarks. As new benchmarks are constantly introduced and old\nbenchmarks become saturated, you should look for the latest benchmarks.\nMake sure to evaluate how reliable a benchmark is. Because anyone can\ncreate and publish a benchmark, many benchmarks might not be measuring\nwhat you expect them to measure.',
  'id': 'a762b76e'},
 {'question': 'what is evaluation-driven development in ai?',
  'summary_answer': 'Evaluation-driven development refers to the approach of establishing evaluation criteria before building an AI application, similar to how test-driven development focuses on writing tests before code.',
  'difficulty': 'beginner',
  'text': 'Before investing time, money, and resources into building an application,\nit’s impo

In [2]:
from rag import ProcessChunks, default_config

pc = ProcessChunks(config=default_config)
chunks = pc.get_chunks(300, 250, start_page=1, end_page=3)
embeddings = pc.embed_chunks(chunks)
vector_index = pc.index_chunks(embeddings, chunks)

100%|██████████| 11/11 [00:02<00:00,  4.94it/s]


In [4]:
# # test
# test_question = 'importance of reliable benchmarks in AI'

# from rag import search
# import json

# result = search(user_query=test_question)
# print(json.dumps(result, indent=2))

In [5]:
from rag import run_rag
from tqdm import tqdm

all_results = []
for q in tqdm(test_dict, total=len(test_dict)):
    result = run_rag(q['question'])
    all_results.append(result)
all_results

100%|██████████| 2/2 [00:11<00:00,  5.70s/it]


[RAGResult(question='importance of reliable benchmarks in AI', answer="Reliable benchmarks in AI are crucial for several reasons:\n\n1. **Model Selection**: With an overwhelming number of foundation models available, benchmarks help in evaluating and comparing these models based on different criteria relevant to specific applications. This aids teams in making informed choices regarding which models to deploy.\n\n2. **Evaluation of Performance**: Trustworthy benchmarks enable developers to assess how well a model performs on tasks critical to its intended uses, such as factual consistency, domain-specific capabilities (like math and reasoning), and overall effectiveness in real-world applications.\n\n3. **Visibility and Evaluation**: Many AI applications struggle with uncertain returns on investment, often due to a lack of clarity around how these applications are performing. Reliable benchmarks provide a means for continuous evaluation, ensuring that deployed models can be monitored e

In [6]:
pd.DataFrame(all_results)

Unnamed: 0,question,answer,context,input_tokens,output_tokens,input_cost,output_cost,total_cost
0,importance of reliable benchmarks in AI,Reliable benchmarks in AI are crucial for seve...,"[{'start': 750, 'text': 'of foundation models ...",910,282,0.0001365,0.0001692,0.0003057
1,what is evaluation-driven development in ai?,**Evaluation-driven development** in AI is an ...,"[{'start': 2000, 'text': 'st even more. AI app...",911,193,0.00013665,0.0001158,0.00025245


# Asyncrony
1. ThreadPoolExecutor
    - ThreadPoolExecutor spins up worker threads and runs your blocking function (e.g., ask_llm) in parallel. Each thread still blocks while waiting for I/O, but other threads keep working, so you get concurrency without rewriting your functions—just submit callables to the pool. 
        - threads = drop-in parallelism for blocking code; asyncio = cooperative concurrency for non-blocking code with more control but higher upfront integration cost.
        - you spin up several helper workers (threads). Each worker runs a normal function. When one worker blocks on a slow API call, another worker can keep going. It’s easy—wrap your existing function in pool.submit. Downsides: threads consume more memory, and you can’t easily fine-tune rate limits or cancellation.
2. Full async (asyncio) keeps everything in a single thread with an event loop. Functions become async def, they await network calls, and the loop interleaves tasks cooperatively. It’s more scalable (no thread overhead, easier to throttle) and gives finer control over concurrency, but requires async-aware libraries (OpenAI async client, async HTTP, etc.) and more refactoring.
    - you stay in one worker but slice your time carefully. Functions become async def, and whenever they hit I/O they await it; the event loop then lets another task run meanwhile. It’s lighter weight and gives you precise control (semaphores, cancellation, timeouts) but only works if the underlying libraries offer async APIs, and you have to refactor your code to use await.
3. what's the difference between `results = map_progress(pool, page_docs, process_document)` and `results = [f.result() for f in tqdm(futures)]`
    1. map_progress(pool, page_docs, process_document) wraps everything: it submits each doc, attaches a tqdm progress bar that updates as futures finish, waits for all .result() calls, and returns results in submission order. You get progress feedback plus consistent ordering in one helper.
        - map_progress is a higher-level utility that bundles submission, live progress updates, and result collection
        - Use map_progress when you want a reusable helper that handles submission, live progress updates, and preserving order—all you provide is the pool, your iterable, and the function. It’s great for batch jobs where you need consistent behavior every time.
        - map_progress is your all-in-one helper. You hand it the executor, the list of work, and the function to run. It submits tasks, shows a live progress bar as each finishes, and hands back results in the same order you gave them—so you don’t have to write that glue code every time.
        - use map_progress when you want a ready-made, consistent pattern for batch jobs

    2. results = [f.result() for f in tqdm(futures)] assumes you already created futures elsewhere and just iterates over them sequentially. The tqdm bar only advances once each .result() returns (no as‑completed updates), and you’re responsible for building the futures list and tracking order yourself.
        - the list comprehension is a manual, one-off way to wait on pre-existing futures
        - use this list comprehension when: 
            - Futures are created elsewhere and you just need to wait for them.           
            - You’re debugging or doing a quick one-off and don’t care about as-completed updates.
            - You need custom ordering or handling that doesn’t fit the helper.
        - The manual [f.result() for f in futures] approach is bare bones. You already built the futures somewhere else, and now you’re just waiting on them one by one. It’s fine for quick experiments or special cases, but there’s no built-in progress feedback or ordering guarantees beyond the sequence you iterate.
        - use the simple loop when you need custom behavior or you already have the futures and just want to block until they’re done
4. _client_local is a threading.local() object; each thread sees its own independent attributes on it.
getattr(_client_local, "client", None) checks whether the current thread already has a client attribute. If not, it returns None.
If client is missing, the function creates a fresh OpenAI() instance and stores it on _client_local.client. Because this storage lives inside threading.local(), each thread sets up its own client the first time it calls get_client.
Subsequent calls on the same thread reuse that thread’s client, so there’s no shared httpx connection between threads—eliminating the “Already borrowed” runtime error.
Finally, get_client returns the per-thread client for ask_llm to use.

        ``` 
        _client_local = threading.local()

        def get_client():
            client = getattr(_client_local, "client", None)
            if client is None:
                client = OpenAI()
                _client_local.client = client
            return client

        ```
    - The leading underscore is just a convention: _client_local signals “module-private”–it’s meant for internal use inside rag.py, not exported as part of the public API. Functionally it’s the same as naming it client_local; the underscore just hints to readers that they shouldn’t access it from outside the module.

In [17]:
[q["question"] for q in test_dict][:2]

['importance of reliable benchmarks in AI',
 'what is evaluation-driven development in ai?']

In [8]:
#test asyncrony

from tqdm import tqdm
from rag import run_rag

questions = [q["question"] for q in test_dict]

def map_progress(pool, seq, f):
    """Map function f over seq using the provided executor pool while
    displaying a tqdm progress bar. Returns a list of results in submission order.
    """
    results = []
    
    with tqdm(total=len(seq)) as progress:
        futures = []
    
        for el in seq:
            future = pool.submit(f, el)
            future.add_done_callback(lambda p: progress.update())
            futures.append(future)

        for future in futures:
            result = future.result()
            results.append(result)
        
        return results

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=6) as pool:
    results = map_progress(pool, questions, run_rag)


100%|██████████| 2/2 [00:09<00:00,  4.74s/it]


In [10]:
pd.DataFrame(results)

Unnamed: 0,question,answer,context,input_tokens,output_tokens,input_cost,output_cost,total_cost
0,importance of reliable benchmarks in AI,Reliable benchmarks in AI are essential for se...,"[{'start': 750, 'text': 'of foundation models ...",910,230,0.0001365,0.000138,0.0002745
1,what is evaluation-driven development in ai?,Evaluation-driven development in AI focuses on...,"[{'start': 2000, 'text': 'st even more. AI app...",911,199,0.00013665,0.0001194,0.00025605


In [None]:
#test asyncrony

from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

with ThreadPoolExecutor(max_workers=6) as pool:
    futures = [pool.submit(run_rag, q["question"]) for q in test_dict]
    results = [f.result() for f in tqdm(futures)]

In [3]:
# test function
from rag import run_rag_concurrent

gt_path = "sample_gt.csv"
run_rag_concurrent(gt_path, "test.json")

100%|██████████| 2/2 [00:07<00:00,  3.97s/it]


[RAGResult(question='importance of reliable benchmarks in AI', answer='Reliable benchmarks are crucial in AI for several reasons:\n\n1. **Model Evaluation**: Benchmarks provide a standardized way to evaluate the performance of different AI models. Given the vast landscape of foundation models, benchmarks help in comparing models based on consistent criteria. This enables engineers to select the most suitable model for a specific application by relying on trustworthy performance metrics.\n\n2. **Guided Model Selection**: With an increasing number of models to choose from, benchmarks assist developers in deciding which models to use. They help in filtering down options based on the application’s requirements, thus making the selection process more efficient and informed.\n\n3. **Visibility into Performance**: Reliable benchmarks offer insights into how AI applications are functioning. When applications are deployed, knowing whether they are working effectively is critical. Without this k