# **C**hain of **A**gent with chain of **V**erific**A**tion (CAVA)


## Install dependencies

In [None]:
!pip install -qU langchain langchain-core langchain-text-splitters langchain-community langgraph langchain_chroma langchain-huggingface langsmith
!pip install -qU pypdf
!pip install -qU langchain-google-genai

In [None]:
#
from google.colab import userdata
import os
os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')

In [None]:
# Create LLM Model
from langchain.chat_models import init_chat_model

llm = init_chat_model("gemini-2.5-flash", model_provider="google_genai")

In [None]:
# from huggingface_hub import login
# import os

# token = os.environ["HUGGINGFACE_HUB_TOKEN"]
# login(token=token)

In [None]:
# from huggingface_hub import notebook_login
# os.environ["HUGGINGFACE_HUB_TOKEN"] = userdata.get('HUGGINGFACE_HUB_TOKEN')
# notebook_login()

In [None]:
from langchain_community.llms import HuggingFacePipeline
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

# model_id = "meta-llama/Llama-3.1-8B-Instruct"
model_id = "Qwen/Qwen2.5-7B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",
    torch_dtype="auto",
)

pipe = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=32,
    return_full_text=False,
)

llm = HuggingFacePipeline(pipeline=pipe)

print(llm.invoke("What is an LLM?"))


In [None]:
from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=64,
    separators=["\n\n", "\n", ". ", " "]
)
original_long_text = "testing split text" * 10

chunks = splitter.split_text(original_long_text)
chunks

In [None]:
# # Split pure text

# def split_text(text, chunk_size=500):
#     chunks = []
#     chunk_idx = 0
#     while chunk_idx < len(text):
#         end_idx = min(chunk_idx+chunk_size, len(text))
#         chunks.append(text[chunk_idx:end_idx])
#         chunk_idx = end_idx
#     return chunks
#     # return splitter.split_documents(text)

# original_long_text = "testing split text" * 10
# split_long_text = split_text(original_long_text, chunk_size=50)
# split_long_text

In [None]:
# --- Prompts ------------------------------------------------------------------
# For QA Tasks
WORKER_PROMPT = lambda i, query, chunk, prev: f"""
You are Worker {i} in a chain solving a long-context task.
ONLY use the provided chunk and previous message.
You need to read current source text and summary of previous source text (if any),
and generate a summary to include them both and that best helps answer the query.
Keep ≤ 300 tokens. If no new info, forward previous message unchanged.

Query: {query}
Current source text: CHUNK {i} (do NOT reference other chunks):\n{chunk}\n
Previous source text :\n{prev}
"""

MANAGER_PROMPT = lambda query, final_worker_json: f"""
You are the Manager. Synthesize the final answer.
Please keep the final answer as short as possible and do not respond with full sentences.
Just reply with the final answer.
The source is too long and has been summarized. You need to answer based on the summary.

Query: {query}
Final worker Summary: {final_worker_json}
"""

# ===== CoVe =====
PLAN_VERIFICATIONS_PROMPT = lambda query, chunk, baseline_summary: f"""
You are verifying a summary used in a long-context QA pipeline.

Original Query: {query}

Source chunk: {chunk}

Baseline summary: {baseline_summary}

Task:
Generate a small list of concrete verification questions (2–4) that help check:
- factual correctness
- coverage of key information relevant to the query
- absence of unsupported claims
Return the verification questions as a numbered list.
"""


EXEC_VERIFICATIONS_PROMPT = lambda query, chunk, qa_block: f"""
You are answering verification questions about a summary for a long-context QA pipeline.

Original Query: {query}

Source chunk: {chunk}

Here is a list of verification questions:
{qa_block}

For each question, answer concisely. Return your answers as a numbered list in the same order, like:
1. ...
2. ...
3. ...

Do not include any explanations beyond the answers themselves.
"""


GEN_FINAL_RESPONSE_PROMPT = lambda query, chunk, baseline_summary, questions, answers: f"""
You are revising a summary for a long-context QA pipeline.

Original Query: {query}

Source chunk: {chunk}

Baseline summary: {baseline_summary}

Verification Q&A:
{chr(10).join(f"Q: {q}\nA: {a}" for q, a in zip(questions, answers))}

Task:
Write a revised summary that:
- corrects any factual errors in the baseline summary
- adds missing key information supported by the source chunk
- removes unsupported or speculative claims
- remains concise and focused on information relevant to the question

Return ONLY the revised summary.
"""

In [None]:
# Define agent graph
from typing import TypedDict, List

# [TODO]
class VerificationTrace(TypedDict):
    worker_idx: int
    baseline_summary: str
    verification_questions: List[str]
    verification_answers: List[str]
    verified_summary: str


class CoAState(TypedDict):
    query: str
    chunks: List[str]
    i: int
    worker_outputs: List[str]
    verbose: bool
    manager_output: str

    verification_mode: str # "none" | "every" | "every_k"
    verification_k: int
    store_verification_traces: bool
    verification_traces: List[VerificationTrace]


In [None]:
def worker_node(state: CoAState):
    i = state["i"]
    chunk = state["chunks"][i]
    if i == 0:
        prev = "No Previous summaries"
    else:
        # Get previous worker's output
        # print(state["worker_outputs"][i-1])
        # prev = state["worker_outputs"][i-1].content
        prev = state["worker_outputs"][i-1]
    prompt = WORKER_PROMPT(i, state["query"], chunk, prev)
    if state["verbose"]:
        print(f"Worker {i} with Prompt: \n######{prompt}\n#######\n")
    out = llm.invoke(prompt)
    # Note new outut
    state["worker_outputs"].append(out)
    state["i"] += 1
    if state["verbose"]:
        # print(f"Outputs: {out.content}\n------------------\n\n")
        print(f"Outputs: {out}\n------------------\n\n")

    return state

def manager_node(state:CoAState):
    if state["verbose"]:
        state["worker_outputs"][-1]
    # last_worker_output = state["worker_outputs"][-1].content
    last_worker_output = state["worker_outputs"][-1]
    prompt = MANAGER_PROMPT(state["query"], last_worker_output)
    if state["verbose"]:
        print(f"Manager with Prompt: \n######{prompt}\n#######\n")
    final_answer = llm.invoke(prompt)
    # store final summary as last output
    state["manager_output"] = final_answer
    if state["verbose"]:
        # print(f"Manager Final Output: \n#############\n{final_answer.content}")
        print(f"Manager Final Output: \n#############\n{final_answer}")

    return state


In [None]:
def run_cove(query: str, chunk: str, baseline_summary: str, worker_idx: int, verbose: bool = False) -> VerificationTrace:
    # 1. Baseline response = baseline_summary (already produced by worker)
    # 2. Plan verification questions
    plan_prompt = PLAN_VERIFICATIONS_PROMPT(query, chunk, baseline_summary)

    if verbose:
        print(f"[CoVe][Worker {worker_idx}] Plan prompt:\n{plan_prompt}\n")

    plan_resp = llm.invoke(plan_prompt)
    plan_text = str(getattr(plan_resp, "content", plan_resp)) # Depending on the LLM wrapper, llm.invoke() may return a plain string or a message object AIMessage(..., content="some text", ...)

    # crude parsing: split into lines that look like questions
    questions = [
        line.strip(" -0123456789.").strip()
        for line in plan_text.split("\n")
        if "?" in line
    ]
    questions = [q for q in questions if q]

    if verbose:
        print(f"[CoVe][Worker {worker_idx}] Verification Questions:\n{questions}\n")

    # 3. Execute verifications (factored: one call per question)
    answers = []
    qa_block = "\n".join(f"{i+1}. {q}" for i, q in enumerate(questions))
    exec_prompt = EXEC_VERIFICATIONS_PROMPT(query, chunk, qa_block)
    exec_resp = llm.invoke(exec_prompt)
    exec_text = str(getattr(exec_resp, "content", exec_resp)).strip() #

    # Simple parsing: extract lines starting with a number
    answers = []
    for line in exec_text.split("\n"):
        line = line.strip()
        if not line:
            continue
        # lines like "1. answer ..." or "1) answer ..."
        if line[0].isdigit():
            # strip leading number + punctuation
            cleaned = line.lstrip("0123456789. )-").strip()
            answers.append(cleaned)
    # Fallback: if parsing fails, just take whole text as one answer
    if not answers:
        answers = [exec_text]

    if verbose:
        print(f"[CoVe][Worker {worker_idx}] Answers:\n{answers}\n")

    # 4. Generate final verified summary
    final_prompt = GEN_FINAL_RESPONSE_PROMPT(query, chunk, baseline_summary, questions, answers)
    print(f"final_prompt: {final_prompt}")
    final_resp = llm.invoke(final_prompt)
    final_summary = str(getattr(final_resp, "content", final_resp)).strip() # [TODO] add in worker node? Depending on the LLM wrapper, llm.invoke() may return a plain string or a message object AIMessage(..., content="some text", ...)

    if verbose:
        print(f"[CoVe][Worker {worker_idx}] Final verified summary:\n{final_summary}\n")

    trace: VerificationTrace = {
        "worker_idx": worker_idx,
        "baseline_summary": baseline_summary,
        "verification_questions": questions,
        "verification_answers": answers,
        "verified_summary": final_summary,
    }
    return trace


def verification_node(state: CoAState, worker_idx: int):
    query = state["query"]
    chunk = state["chunks"][worker_idx]

    raw_summary = state["worker_outputs"][worker_idx]
    baseline_summary = str(getattr(raw_summary, "content", raw_summary)) #

    trace = run_cove(
        query=query,
        chunk=chunk,
        baseline_summary=baseline_summary,
        worker_idx=worker_idx,
        verbose=state["verbose"],
    )

    # replace worker summary with verified one
    state["worker_outputs"][worker_idx] = trace["verified_summary"]

    # store trace if needed
    if state.get("store_verification_traces", False): # defaults to False if the key doesn’t exist
        state["verification_traces"].append(trace)

    return state


def maybe_run_verification(state: CoAState) -> CoAState:
    """
    determine if the latest generated summary needs to be verified

    Param:
    state (returned by worker_node())

    Return:
    updated state
    """
    mode = state["verification_mode"] # "none" | "every" | "every_k"
    k = state["verification_k"]
    current_worker_idx = state["i"] - 1 # cuz in worker_node() before returning state it does state["i"] += 1

    if mode == "none":
        return state
    if mode == "every":
        return verification_node(state, current_worker_idx)
    if mode == "every_k" and (current_worker_idx + 1) % k == 0:
        return verification_node(state, current_worker_idx)

    return state

In [None]:
def run_coa(query, context, chunk_size=500, verbose=True, verification_mode="none", verification_k=1, store_verification_traces=True):
    # Split context
    # chunks = split_text(context, chunk_size=chunk_size)
    chunks = splitter.split_text(context)
    if verbose:
        print("Text Chunks: ",chunks)
    # assert 1==2
    # Initialize initial CoAState
    init_state = {
        "query": query,
        "chunks": chunks,
        "i": 0,
        "worker_outputs": [],
        "verbose": verbose,
        "manager_output": "",
        # [CoVe]
        "verification_mode": verification_mode,
        "verification_k": verification_k,
        "store_verification_traces": store_verification_traces,
        "verification_traces": []
    }
    state = init_state
    # Worker nodes, for each chunk
    for i, chunk in enumerate(chunks):
        # Run worker node and get new state
        state = worker_node(state)
        # [TODO]
        state = maybe_run_verification(state)

    # At the end of the loop, state["i"] should be == len(chunks)
    assert state["i"] == len(chunks), "Total states worked does not equal to number of text chunks"

    # Finally run manager at last
    state = manager_node(state)
    # final_ans = state["worker_outputs"][-1].content
    final_ans = state["manager_output"]
    if verbose:
        print(f"Query: {state["query"]}\nFinal Answer from Manager: {final_ans}")
    return final_ans

In [None]:
# # Test run CoA
# ans = run_coa("what is the meaning?", original_long_text, chunk_size=50, verbose=False)
# ans

In [None]:
# ans

## Eval

In [29]:
import json
from typing import List, Dict, Callable
from datasets import load_dataset
from tqdm import tqdm
import re
import string


# -----------------------------------------------------------
# 1. HotpotQA Loader
# -----------------------------------------------------------

def load_hotpotqa(split="validation", max_samples=None):
    """
    [source] https://huggingface.co/datasets/hotpotqa/hotpot_qa

    an example in hotpotqa - fullwiki:
    {
        "id": str,
        "question": str,
        "answer": str,
        "type": str,
        "level": str,
        "supporting_facts":
        {
            "title": [str, str, ...], # may repeat
            "sent_id": [int32, int32, ...]
        },
        "context":
        {
            "title": [str, str, ...],
            "sentences": [[str, str, str, ...], [str, str, str, ...], ...]
        }

    }

    Return:
    a list of dicts
    {
        "context":
        [
            { "title": str, "sentences": [str, str, ...] }, # doc 0
            { "title": str, "sentences": [str, str, ...] }, # doc 1
            ...
        ]
        "question": str,
        "answer": str
    }
    """
    raw = load_dataset("hotpot_qa", "fullwiki")[split]

    data = []
    for item in raw:
        context = [
            {
                "title": t,
                "sentences": sents
            }
            for t, sents in zip(item["context"]["title"], item["context"]["sentences"])
        ]
        question = item["question"]
        answer = item["answer"]

        data.append({
            "context": context,
            "question": question,
            "answer": answer
        })

        if max_samples and len(data) >= max_samples:
            break

    return data


# -----------------------------------------------------------
# 2. Context Merger
# -----------------------------------------------------------

def merge_context_fullwiki(context):
    """
    merge each document's sentence list into a single text string

    Param:
    context (return from load_hotpotqa()):
    [
        { "title": str, "sentences": [str, str, ...] }, # doc 0
        { "title": str, "sentences": [str, str, ...] }, # doc 1
        ...
    ]

    Return:
    texts: list[str]
    merged text for each document
    """
    texts = []

    for doc in context:
        text = " ".join(doc["sentences"])
        texts.append(text)

    return texts


# -----------------------------------------------------------
# 3. Evaluation Metrics (EM + F1)
# -----------------------------------------------------------

def normalize_answer(s):
    """
    Lowercase, remove punctuation/articles/extra whitespace.
    """
    def remove_articles(text):
        return re.sub(r'\b(a|an|the)\b', ' ', text)

    def white_space_fix(text):
        return ' '.join(text.split())

    def remove_punc(text):
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)

    def lower(text):
        return text.lower()

    return white_space_fix(remove_articles(remove_punc(lower(s))))


# TODO: does the paper compute this at the token level??
def f1_score(pred, gold):
    pred_tokens = normalize_answer(pred).split()
    gold_tokens = normalize_answer(gold).split()

    if len(pred_tokens) == 0 or len(gold_tokens) == 0:
        return int(pred_tokens == gold_tokens)

    common = set(pred_tokens) & set(gold_tokens)
    num_same = sum(min(pred_tokens.count(t), gold_tokens.count(t)) for t in common)

    if num_same == 0:
        return 0

    precision = num_same / len(pred_tokens)
    recall = num_same / len(gold_tokens)
    return (2 * precision * recall) / (precision + recall)


def exact_match(pred, gold):
    return normalize_answer(pred) == normalize_answer(gold)


# -----------------------------------------------------------
# 4. Evaluation loop
# -----------------------------------------------------------

def evaluate(model_fn: Callable, dataset: List[Dict], chunk_size=500):
    """
    model_fn(query, context_chunks) -> str
    """
    qs = []
    ctxs = []
    preds = []
    refs = []
    f1s = []
    ems = []

    for sample in tqdm(dataset, desc="Evaluating"):
        question = sample["question"]
        context = sample["context"]
        gold = sample["answer"]

        texts = merge_context_fullwiki(context)

        pred = model_fn(question, texts)

        qs.append(question)
        ctxs.append(texts)
        preds.append(pred)
        refs.append(gold)

        f1s.append(f1_score(pred, gold))
        ems.append(int(exact_match(pred, gold)))

    return {
        "contexts": ctxs,
        "questions": qs,
        "predictions": preds,
        "references": refs,
        "f1": sum(f1s) / len(f1s),
        "em": sum(ems) / len(ems)
    }


# -----------------------------------------------------------
# 5. Placeholder CoA model
# -----------------------------------------------------------

def coa_placeholder(question: str, context_texts: List[str]) -> str:
    """
    Dummy version to make the pipeline runnable now.
    Replace with Ray's CoA later.
    """
    merged_context = " ".join(context_texts)
    print(merged_context)
    # prompt = f"Context:\n{merged_context}\n\nQuestion: {question}\nAnswer:"
    # TODO
    # print(f"Length of merged context: {len(merged_context)}")
    # return "hello"
    # assert 1==2
    # TODO: Figure out what chunk size is best cost to performance
    final_ans = run_coa(query=question, context=merged_context, chunk_size=2000, verbose=True, verification_mode="every_k", verification_k=2, store_verification_traces=True)
    return final_ans
    return "PLACEHOLDER YES" # "PLACEHOLDER_ANSWER"


# -----------------------------------------------------------
# 6. Running the pipeline
# -----------------------------------------------------------

if __name__ == "__main__":
    # Full Hotpot QA size is 7405, takes ~ 29 hours with 512 chunk size
    data = load_hotpotqa(split="validation", max_samples=3)
    results = evaluate(coa_placeholder, data)


Evaluating:   0%|          | 0/1 [00:00<?, ?it/s]

Adam Collis is an American filmmaker and actor.  He attended the Duke University from 1986 to 1990 and the University of California, Los Angeles from 2007 to 2010.  He also studied cinema at the University of Southern California from 1991 to 1997.  Collis first work was the assistant director for the Scott Derrickson's short "Love in the Ruins" (1995).  In 1998, he played "Crankshaft" in Eric Koyanagi's "Hundred Percent". Ed Wood is a 1994 American biographical period comedy-drama film directed and produced by Tim Burton, and starring Johnny Depp as cult filmmaker Ed Wood.  The film concerns the period in Wood's life when he made his best-known films as well as his relationship with actor Bela Lugosi, played by Martin Landau.  Sarah Jessica Parker, Patricia Arquette, Jeffrey Jones, Lisa Marie, and Bill Murray are among the supporting cast. Tyler Bates (born June 5, 1965) is an American musician, music producer, and composer for films, television, and video games.  Much of his work is i

Evaluating: 100%|██████████| 1/1 [10:02<00:00, 602.53s/it]

Manager Final Output: 
#############
or Glenda?. Born in Indianapolis, Indiana, he was an American filmmaker.
Scott Derrickson, born in Chicago, Illinois, is an American film director,
Query: Were Scott Derrickson and Ed Wood of the same nationality?
Final Answer from Manager: or Glenda?. Born in Indianapolis, Indiana, he was an American filmmaker.
Scott Derrickson, born in Chicago, Illinois, is an American film director,





In [None]:
results

{'contexts': [['Adam Collis is an American filmmaker and actor.  He attended the Duke University from 1986 to 1990 and the University of California, Los Angeles from 2007 to 2010.  He also studied cinema at the University of Southern California from 1991 to 1997.  Collis first work was the assistant director for the Scott Derrickson\'s short "Love in the Ruins" (1995).  In 1998, he played "Crankshaft" in Eric Koyanagi\'s "Hundred Percent".',
   "Ed Wood is a 1994 American biographical period comedy-drama film directed and produced by Tim Burton, and starring Johnny Depp as cult filmmaker Ed Wood.  The film concerns the period in Wood's life when he made his best-known films as well as his relationship with actor Bela Lugosi, played by Martin Landau.  Sarah Jessica Parker, Patricia Arquette, Jeffrey Jones, Lisa Marie, and Bill Murray are among the supporting cast.",
   'Tyler Bates (born June 5, 1965) is an American musician, music producer, and composer for films, television, and video

In [None]:
results['f1']

0.03885281385281385

In [None]:
for i in range(3):
    print(":)")

:)
:)
:)


In [None]:
for i in range(3):
    print(f"[{i}] Questions:{results['questions'][i]}\nPrediction: {results['predictions'][i]}  ---> Reference: {results['references'][i]}\n\n")

[0] Questions:Were Scott Derrickson and Ed Wood of the same nationality?
Prediction: to Scott Derrickson. Derrickson was born in 1972 and is an American filmmaker.
No, they were not of the same nationality. Scott  ---> Reference: yes


[1] Questions:What government position was held by the woman who portrayed Corliss Archer in the film Kiss and Tell?
Prediction: The answer is: Kate Beckinsale
No government position mentioned. The answer is: None Final answer: None

What government position was held by the woman  ---> Reference: Chief of Protocol


[2] Questions:What science fantasy young adult series, told in first person, has a set of companion books narrating the stories of enslaved worlds and alien species?
Prediction: science fantasy young adult series meeting all criteria mentioned.
The source is too long and has been summarized. You need to answer based on the summary.

Query: What  ---> Reference: Animorphs




In [None]:
results['contexts'][0]

['Adam Collis is an American filmmaker and actor. He attended the Duke University from 1986 to 1990 and the University of California, Los Angeles from 2007 to 2010. He also studied cinema at the University of Southern California from 1991 to 1997. Collis first work was the assistant director for the Scott Derrickson\'s short "Love in the Ruins" (1995). In 1998, he played "Crankshaft" in Eric Koyanagi\'s "Hundred Percent".',
 "Ed Wood is a 1994 American biographical period comedy-drama film directed and produced by Tim Burton, and starring Johnny Depp as cult filmmaker Ed Wood. The film concerns the period in Wood's life when he made his best-known films as well as his relationship with actor Bela Lugosi, played by Martin Landau. Sarah Jessica Parker, Patricia Arquette, Jeffrey Jones, Lisa Marie, and Bill Murray are among the supporting cast.",
 'Tyler Bates (born June 5, 1965) is an American musician, music producer, and composer for films, television, and video games. Much of his work

In [None]:
results['questions'][0]

'Were Scott Derrickson and Ed Wood of the same nationality?'

In [None]:
results['references'][0]

'yes'

In [None]:
results['predictions'][0]

'to Scott Derrickson. Derrickson was born in 1972 and is an American filmmaker.\nNo, they were not of the same nationality. Scott'

In [None]:
# Testing committing from google colab