In [None]:
from pathlib import Path

md_text = Path("files/coffee_shops_sf.md").read_text(encoding="utf-8")
print(md_text[:500])  # preview first 500 chars

In [20]:
model = init_chat_model(
    model="deephat-v1-7b",  # e.g. "gpt-3.5-turbo" or "lmstudio-llama2"
    model_provider="openai",  # because LM Studio mimics OpenAI's API
    base_url="http://localhost:1234/v1",
    api_key="not-needed"  # LM Studio accepts any string here
)

In [16]:
import uuid
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model

# Initialize Model
model = init_chat_model("google_genai:models/gemini-3-flash-preview")

# --- 1. STATE DEFINITIONS ---
class Checkpoint(TypedDict):
    id: str
    name: str
    objective: str
    study_material: str 
    quiz_questions: list[str]
    user_answers: list[str]

class State(TypedDict):
    report: str
    user_request: str
    checkpoints: list[Checkpoint]
    current_checkpoint_idex: int

In [None]:


# --- 2. PYDANTIC MODELS ---

# For Node 1
class CheckpointItem(BaseModel):
    name: str = Field(description="Name of the checkpoint")
    objective: str = Field(description="Objective of the checkpoint")

class CheckpointResponse(BaseModel):
    checkpoints: List[CheckpointItem]

# For Node 2
class CheckpointContent(BaseModel):
    study_material: str = Field(description="Brief study material(content) (approx 300 words)")
    quiz_questions: List[str] = Field(description="Exactly 3 assessment questions")

# Setup LLMs
structure_gen = model.with_structured_output(CheckpointResponse)
content_gen = model.with_structured_output(CheckpointContent)


# --- 3. NODE 1: GENERATE STRUCTURE ---
def generate_structure(state: State):
    report = state['report']
    response = structure_gen.invoke(f"Extract checkpoints from: {report}")
    
    clean_checkpoints = []
    for item in response.checkpoints:
        data = item.model_dump()
        data['id'] = str(uuid.uuid4())
        # Initialize placeholders
        data['study_material'] = "" 
        data['quiz_questions'] = []
        clean_checkpoints.append(data)
        
    return {"checkpoints": clean_checkpoints}


# --- 4. NODE 2: CREATE CONTENT (UPDATED WITH BATCH) ---
def create_content(state: State):
    report = state['report']
    user_req = state['user_request']
    checkpoints = state['checkpoints']
    
    # A. Prepare the list of prompts (Input List)
    batch_prompts = []
    
    for cp in checkpoints:
        prompt = f"""
        Context: {report}
        User Goal: {user_req}
        
        Task: Create content for this checkpoint:
        - Topic: {cp['name']}
        - Objective: {cp['objective']}
        """
        batch_prompts.append(prompt)
    
    # B. Execute Batch (Parallel Processing)
    # The model processes all prompts effectively at the same time
    # Returns a list of CheckpointContent objects
    batch_results = content_gen.batch(batch_prompts)
    
    # C. Map results back to the checkpoints
    # zip() pairs the original checkpoint with its corresponding result
    updated_checkpoints = []
    for cp, result in zip(checkpoints, batch_results):
        cp['study_material'] = result.study_material
        cp['quiz_questions'] = result.quiz_questions
        updated_checkpoints.append(cp)
    
    return {"checkpoints": updated_checkpoints}


# --- 5. GRAPH SETUP ---
builder = StateGraph(State)
builder.add_node("generate_structure", generate_structure)
builder.add_node("create_content", create_content)

builder.add_edge(START, "generate_structure")
builder.add_edge("generate_structure", "create_content")
builder.add_edge("create_content", END)

graph = builder.compile()

# --- 6. EXECUTION ---
inputs = {
    "report": md_text,
    "user_request": "I want to learn best coffee shops in San Francisco."
}

result = graph.invoke(inputs)

# Verify Output
import json
print(json.dumps(result["checkpoints"], indent=2))

[
  {
    "name": "Blue Bottle Coffee",
    "objective": "Research single-origin coffee, pour-over methods, and minimalist aesthetic across multiple San Francisco locations.",
    "id": "2538c326-ae0f-485e-a818-3c890e83b1f3",
    "study_material": "Blue Bottle Coffee, founded in 2002 in Oakland by James Freeman, stands as a cornerstone of the Third Wave coffee movement in San Francisco. The company is renowned for its commitment to freshness, famously roasting beans in small batches and ensuring they are delivered to cafes quickly to maintain peak flavor. Central to the Blue Bottle experience is the pour-over method. Unlike automated drip machines, the manual pour-over allows baristas to control variables like water temperature, bloom time, and flow rate. This precision is essential for highlighting the delicate and complex profiles of single-origin coffees, which are beans sourced from a specific farm or region rather than being blended. By focusing on single-origin offerings, Blue Bo

In [18]:
def administer_quiz(state: State):
    # 1. Identify which checkpoint is active
    # We default to 0 (the first one) if the index hasn't been set yet
    current_index = state.get("current_checkpoint_index", 0)
    
    # Safety check: If we are out of checkpoints, do nothing (or handle end)
    checkpoints = state["checkpoints"]
    if current_index >= len(checkpoints):
        return {}

    current_cp = checkpoints[current_index]

    # 2. Prepare the data to show the user
    # This dictionary is what will be returned to your frontend/client when the graph pauses
    display_content = {
        "title": current_cp["name"],
        "content": current_cp["study_material"],
        "questions": current_cp["quiz_questions"]
    }

    # 3. INTERRUPT AND WAIT
    # The graph pauses here. 
    # It sends 'display_content' to the user.
    # It resumes ONLY when the user sends back their answers.
    user_responses = interrupt(display_content)
    
    # 4. Save the answers
    # We update the specific checkpoint in the list with the user's answers
    current_cp["user_answers"] = user_responses
    
    # We return the updated list. 
    # Note: We do NOT increment the index yet. 
    # Usually, we increment after the Evaluation node determines we passed.
    checkpoints[current_index] = current_cp
    
    return {
        "checkpoints": checkpoints,
        # Ensure the index is set in state if it wasn't before
        "current_checkpoint_index": current_index 
    }

In [23]:
import uuid
import operator
from typing import TypedDict, List, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt
from pydantic import BaseModel, Field
from langchain.chat_models import init_chat_model

# --- 1. SETUP MODEL ---
# Ensure you have your API key set in env: GOOGLE_API_KEY
model = init_chat_model("google_genai:models/gemini-flash-latest")

# --- 2. DEFINE STATE SCHEMAS ---

class Checkpoint(TypedDict):
    id: str
    name: str
    objective: str
    # Content fields (populated by Node 2)
    study_material: str 
    quiz_questions: list[str]
    # User Interaction fields (populated by Node 3)
    user_answers: list[str]
    # Evaluation fields (populated by Node 4)
    score: int
    passed: bool
    feedback: str

class State(TypedDict):
    report: str
    user_request: str
    checkpoints: list[Checkpoint]
    current_checkpoint_index: int 

# --- 3. DEFINE PYDANTIC MODELS (LLM INTERFACE) ---

# Schema for Node 1 (Structure)
class CheckpointItem(BaseModel):
    name: str = Field(description="Name of the checkpoint")
    objective: str = Field(description="Objective of the checkpoint")

class CheckpointResponse(BaseModel):
    checkpoints: List[CheckpointItem]

# Schema for Node 2 (Content)
class CheckpointContent(BaseModel):
    study_material: str = Field(description="Brief study material (approx 100 words)")
    quiz_questions: List[str] = Field(description="Exactly 3 assessment questions")

# Schema for Node 4 (Evaluation)
class EvaluationResult(BaseModel):
    score: int = Field(description="Score out of 100")
    feedback: str = Field(description="Constructive feedback for the student")
    passed: bool = Field(description="True if score >= 70, False if failed")

# Create Structured LLMs
structure_gen = model.with_structured_output(CheckpointResponse)
content_gen = model.with_structured_output(CheckpointContent)
evaluator_gen = model.with_structured_output(EvaluationResult)


# --- 4. DEFINE NODES ---

def generate_structure(state: State):
    """Node 1: Breaks the report down into topics (No content yet)."""
    print("--- Generating Structure ---")
    report = state['report']
    response = structure_gen.invoke(f"Extract learning checkpoints from this report: {report}")
    
    clean_checkpoints = []
    for item in response.checkpoints:
        data = item.model_dump()
        # Initialize Defaults
        data['id'] = str(uuid.uuid4())
        data['study_material'] = ""
        data['quiz_questions'] = []
        data['user_answers'] = []
        data['score'] = 0
        data['passed'] = False
        data['feedback'] = ""
        
        clean_checkpoints.append(data)
        
    return {"checkpoints": clean_checkpoints, "current_checkpoint_index": 0}


def create_content(state: State):
    """Node 2: Generates study material and questions in PARALLEL (Batch)."""
    print("--- Creating Content (Batch) ---")
    report = state['report']
    user_req = state['user_request']
    checkpoints = state['checkpoints']
    
    # Prepare Batch Prompts
    prompts = []
    for cp in checkpoints:
        prompt = f"""
        Context: {report}
        User Goal: {user_req}
        Task: Create content for checkpoint: '{cp['name']}'
        Objective: {cp['objective']}
        """
        prompts.append(prompt)
    
    # Run Batch
    results = content_gen.batch(prompts)
    
    # Map back to state
    updated_checkpoints = []
    for cp, res in zip(checkpoints, results):
        cp['study_material'] = res.study_material
        cp['quiz_questions'] = res.quiz_questions
        updated_checkpoints.append(cp)
        
    return {"checkpoints": updated_checkpoints}


def administer_quiz(state: State):
    """Node 3: Pauses graph to show content and wait for user answers."""
    idx = state.get("current_checkpoint_index", 0)
    checkpoints = state["checkpoints"]
    
    if idx >= len(checkpoints):
        return {} # Safety catch

    current_cp = checkpoints[idx]
    
    print(f"--- Administering Quiz: {current_cp['name']} ---")

    # Prepare Payload for UI/User
    user_view = {
        "title": current_cp["name"],
        "material": current_cp["study_material"],
        "questions": current_cp["quiz_questions"]
    }

    # *** INTERRUPT ***
    # The graph STOPS here. Resume with: graph.invoke(Command(resume=answers_list))
    user_answers = interrupt(user_view)
    
    # Resume Logic
    current_cp["user_answers"] = user_answers
    checkpoints[idx] = current_cp
    
    return {"checkpoints": checkpoints}


def evaluate_submission(state: State):
    """Node 4: Grades the quiz and decides next steps."""
    print("--- Evaluating Submission ---")
    idx = state["current_checkpoint_index"]
    checkpoints = state["checkpoints"]
    current_cp = checkpoints[idx]
    
    # Evaluate
    prompt = f"""
    Topic: {current_cp['name']}
    Questions: {current_cp['quiz_questions']}
    Answers: {current_cp['user_answers']}
    Rubric: Pass mark is 70.
    """
    result = evaluator_gen.invoke(prompt)
    
    # Save Result
    current_cp["score"] = result.score
    current_cp["passed"] = result.passed
    current_cp["feedback"] = result.feedback
    checkpoints[idx] = current_cp
    
    # Decide Index Movement
    next_idx = idx
    if result.passed:
        print(f"PASSED ({result.score}). Moving to next topic.")
        next_idx = idx + 1
    else:
        print(f"FAILED ({result.score}). Retrying same topic.")
        # next_idx stays the same
        
    return {"checkpoints": checkpoints, "current_checkpoint_index": next_idx}


def simplified_teaching(state: State):
    """Node 5: (Placeholder) Remedial teaching logic."""
    print("--- Simplified Teaching Mode (Remediation) ---")
    # In future: Generate simpler content here
    return {} # No state update, just pass through back to quiz


# --- 5. ROUTING LOGIC ---

def decide_next_step(state: State) -> Literal["administer_quiz", "simplified_teaching", END]:
    idx = state["current_checkpoint_index"]
    checkpoints = state["checkpoints"]
    
    # 1. Done?
    if idx >= len(checkpoints):
        print("--- All Checkpoints Completed ---")
        return END
        
    # 2. Failed? (Check current index status)
    current_cp = checkpoints[idx]
    # If we have a score but passed is False, we need help
    if "passed" in current_cp and current_cp["passed"] is False:
        return "simplified_teaching"
        
    # 3. Ready for Quiz (New topic or retry)
    return "administer_quiz"


# --- 6. BUILD GRAPH ---

builder = StateGraph(State)

# Add Nodes
builder.add_node("generate_structure", generate_structure)
builder.add_node("create_content", create_content)
builder.add_node("administer_quiz", administer_quiz)
builder.add_node("evaluate_submission", evaluate_submission)
builder.add_node("simplified_teaching", simplified_teaching)

# Add Edges
builder.add_edge(START, "generate_structure")
builder.add_edge("generate_structure", "create_content")
builder.add_edge("create_content", "administer_quiz") # Start 1st quiz
builder.add_edge("administer_quiz", "evaluate_submission")
builder.add_edge("simplified_teaching", "administer_quiz") # Retry loop

# Add Conditional Edge
builder.add_conditional_edges(
    "evaluate_submission",
    decide_next_step,
    {
        "administer_quiz": "administer_quiz",
        "simplified_teaching": "simplified_teaching",
        END: END
    }
)

# Compile
# Note: checkpointer is usually required for interrupts in production
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# --- 7. USAGE EXAMPLE (Mocking the Interaction) ---

# A. Start the Graph
thread_config = {"configurable": {"thread_id": "user_session_1"}}
initial_input = {
    "report": "Python is great. It has lists, dicts, and loops.",
    "user_request": "I want to learn basics."
}

print("\n--- STARTING GRAPH ---")
# This will run until the first interrupt (The first Quiz)
for event in graph.stream(initial_input, thread_config):
    pass 

# B. Check Status (We are paused now)
state_snapshot = graph.get_state(thread_config)
print(f"\nCurrent Node: {state_snapshot.next}")
# The 'interrupt' value contains the Study Material & Questions
print(f"User sees: {state_snapshot.tasks[0].interrupts[0].value['title']}")

# C. User Responds (Resume Graph)
from langgraph.types import Command
print("\n--- USER SUBMITS ANSWERS ---")
# We send the answers to resume execution
graph.invoke(
    Command(resume=["Answer 1", "Answer 2", "Answer 3"]), 
    thread_config
)


--- STARTING GRAPH ---
--- Generating Structure ---
--- Creating Content (Batch) ---
--- Administering Quiz: Python Lists ---

Current Node: ('administer_quiz',)
User sees: Python Lists

--- USER SUBMITS ANSWERS ---
--- Administering Quiz: Python Lists ---
--- Evaluating Submission ---
FAILED (65). Retrying same topic.
--- Simplified Teaching Mode (Remediation) ---
--- Administering Quiz: Python Lists ---


{'report': 'Python is great. It has lists, dicts, and loops.',
 'user_request': 'I want to learn basics.',
 'checkpoints': [{'name': 'Python Lists',
   'objective': 'Understand the structure and basic operations of Python lists.',
   'id': '51d65d66-8723-476e-8e0a-de546022684c',
   'study_material': 'Python lists are ordered, mutable sequences of items used to store collections of data. They are defined using square brackets []. Lists are zero-indexed, meaning the first element is accessed using index 0. Because lists are mutable, you can change, add, or remove elements after the list has been created. Common operations include accessing elements by index (e.g., my_list[0]), adding new elements using the .append() method, and checking the number of elements using the len() function. Lists can hold elements of different data types simultaneously.',
   'quiz_questions': ["What symbol is used to define a Python list, and what does it mean for a list to be 'mutable'?",
    'If a Python lis