Install all the dependencies

In [None]:
!pip install langchain langgraph pinecone-client langchain-pinecone google-cloud-aiplatform pandas pydantic --quiet

Import all the dependencies

import os
import json
import pandas as pd
from typing import Dict, Any

import pinecone
from langchain.embeddings import VertexAIEmbeddings
from langchain.llms import VertexAI
from langgraph.graph import StateGraph, END
from langchain_pinecone import PineconeVectorStore

In [None]:
# Load Dataset

with open("self_critique_loop_dataset.json", "r") as f:
    kb_data = json.load(f)

print("Loaded KB entries:", len(kb_data))
pd.DataFrame(kb_data).head()


In [None]:
# Embeddings & Pinecone Indexing referred from Assignment3_pinecone_quickstart_guide.ipynb

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "<YOUR_PINECONE_KEY>")
PINECONE_ENV = os.getenv("PINECONE_ENV", "<YOUR_PINECONE_ENV>")
INDEX_NAME = "assignment3-kb"

# Init Pinecone
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)

# Create index if missing
if INDEX_NAME not in pinecone.list_indexes():
    pinecone.create_index(INDEX_NAME, dimension=768)

# Embeddings
embedding_model = VertexAIEmbeddings(model_name="gemini-embedding-001")

# Prepare texts & metadata
texts, metadatas, ids = [], [], []
for entry in kb_data:
    text = entry.get("answer_snippet") or entry.get("text") or entry.get("question","")
    doc_id = entry.get("doc_id") or entry.get("id") or f"KB_{len(ids)+1}"
    meta = {
        "doc_id": doc_id,
        "question": entry.get("question",""),
        "source": entry.get("source",""),
        "last_updated": entry.get("last_updated","")
    }
    texts.append(text)
    metadatas.append(meta)
    ids.append(doc_id)

# Vectorstore (LangChain-Pinecone wrapper from quickstart)
vectorstore = PineconeVectorStore(
    index_name=INDEX_NAME,
    embedding=embedding_model,
    namespace=None,
    pinecone_api_key=PINECONE_API_KEY,
)

# Upsert KB
vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
print(f"Upserted {len(ids)} docs into Pinecone index {INDEX_NAME}")


In [None]:
# LangGraph Workflow

llm = VertexAI(model="gemini-pro", temperature=0)

class RAGState(Dict[str, Any]):
    question: str
    retrieved_docs: list
    answer: str
    critique: str

# Node 1: Retrieve
def retrieve_kb(state: RAGState):
    docs = vectorstore.similarity_search(state["question"], k=5)
    state["retrieved_docs"] = [d.page_content for d in docs]
    return state

# Node 2: Generate
def generate_answer(state: RAGState):
    context = "\n".join(state["retrieved_docs"])
    prompt = f"""
    Question: {state['question']}
    Context (KB): {context}
    Provide an answer citing doc_ids as [KBxxx].
    """
    state["answer"] = llm(prompt)
    return state

# Node 3: Critique
def critique_answer(state: RAGState):
    prompt = f"""
    Review this answer:
    {state['answer']}
    Decide if it's COMPLETE or needs refinement.
    Respond with either:
    COMPLETE
    REFINE: <keywords>
    """
    state["critique"] = llm(prompt)
    return state

# Node 4: Refine
def refine_answer(state: RAGState):
    if state["critique"].startswith("REFINE"):
        keywords = state["critique"].replace("REFINE:", "").strip()
        docs = vectorstore.similarity_search(keywords, k=1)
        if docs:
            state["retrieved_docs"].append(docs[0].page_content)
        context = "\n".join(state["retrieved_docs"])
        prompt = f"""
        Question: {state['question']}
        Context (KB): {context}
        Provide a refined answer citing doc_ids as [KBxxx].
        """
        state["answer"] = llm(prompt)
    return state


In [None]:
# Create LangGraph

graph = StateGraph(RAGState)

graph.add_node("retrieve_kb", retrieve_kb)
graph.add_node("generate_answer", generate_answer)
graph.add_node("critique_answer", critique_answer)
graph.add_node("refine_answer", refine_answer)

graph.set_entry_point("retrieve_kb")
graph.add_edge("retrieve_kb", "generate_answer")
graph.add_edge("generate_answer", "critique_answer")

def critique_condition(state: RAGState):
    return "refine_answer" if state["critique"].startswith("REFINE") else END

graph.add_conditional_edges("critique_answer", critique_condition)
graph.add_edge("refine_answer", END)

compiled_graph = graph.compile()


In [None]:
# Test Queries

test_queries = [
    "What are best practices for caching?",
    "How should I set up CI/CD pipelines?",
    "What are performance tuning tips?",
    "How do I version my APIs?",
    "What should I consider for error handling?"
]

results = []
for q in test_queries:
    final_state = compiled_graph.invoke({"question": q})
    results.append({
        "question": q,
        "answer": final_state["answer"],
        "critique": final_state["critique"]
    })

pd.DataFrame(results)
