# Practical Session: LLM Tools – Agentic Behaviour with LangGraph

Welcome to this practical session at RANLP 2025. Over the next ninety minutes we will work together to build a small agentic system using **LangGraph**, a library that orchestrates long‑running workflows across language models and tools. To begin with we will see what a single model can do when asked to summarise a meeting transcript. Then we will gradually introduce structure: breaking the problem into multiple steps, adding loops and branching, and persisting state. LangGraph’s low‑level design makes it possible to checkpoint and resume long‑running agents and to weave in human feedback when needed. By the end of the session you will have a working prototype that reads a meeting transcript and produces a concise report with tasks and decisions.

In [1]:
#%pip uninstall -q -y torch torchvision torchaudio transformers vllm datasets evaluate sacrebleu sentencepiece langgraph accelerate peft bitsandbytes pandas compressed-tensors spacy langcodes xgrammar vllm

In [2]:
#%pip install -q torch==2.7.1 torchaudio torchvision --index-url https://download.pytorch.org/whl/cu126
#%pip install -q transformers datasets evaluate sacrebleu sentencepiece langgraph langchain langchain-community grandalf sentence-transformers langchain_huggingface langchain_openai accelerate>=0.21.0 peft bitsandbytes pandas


In [3]:
# Environment setup
import os
import sys
import random
import numpy as np
import torch

# Set a fixed seed for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# Detect device
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Device: {DEVICE}')

# Create output directory
OUT_DIR = './out'
os.makedirs(OUT_DIR, exist_ok=True)

Device: cuda



## Problem setting: meeting minutes

Think back to your last research meeting or project stand‑up. Several people talk at once, ideas overlap and the conversation drifts before circling back. Important decisions and follow‑up tasks are easy to miss. In this practical we will design a helper that listens to these meetings and produces a clear summary and a list of tasks with assignees and deadlines. Given a raw transcript, our system will extract the salient points and update a shared task list so everyone knows what to do next.


In the previous practical you saw that a strong foundation model can perform surprisingly well on a range of tasks without fine‑tuning. We'll start by using **Qwen2.5-14B-Instruct**, a 14‑billion‑parameter model that we will host locally, to see how far we can get with a simple prompt. We'll ask it to summarise a meeting transcript and extract action items. Examining the results will show us the strengths and weaknesses of a single LLM call and motivate a more structured approach.

### Experiment: direct prompting

Take a look at the transcript provided in the next cell. It captures a short meeting among three colleagues and includes small talk, interruptions and overlapping topics to mimic the messy reality of meetings. We'll use this example to test our prompt and see how the base model handles summarisation and task extraction.

In [4]:
transcript = """
A: Good morning, everyone. Thanks for joining on short notice. I wanted us to sync about the product launch timeline because we`ve been getting questions from marketing.
B: Morning! Yeah, I saw the emails yesterday. They`re pushing for a concrete date, but I don`t think engineering is fully comfortable committing yet.
C: Exactly. We`re still ironing out some of the backend issues. The integration with the payment system isn`t as smooth as it should be, and if we push too quickly, we`ll have failures during checkout.
A: Right, and we don`t want customers to be the ones finding those bugs. B, from your side, do you think another two weeks would make a difference?
B: Two weeks sounds reasonable, but it depends on how quickly C`s team can finalize the fixes. If QA doesn`t get enough time, we`ll just be pushing the risk down the road.
C: True, but we`ve made progress. Yesterday the team managed to cut down the error rate by almost 40%. If we keep that pace, by the end of next week we should be ready for a full round of regression tests.
A: That`s encouraging. So maybe we can give marketing a tentative date, but with the condition that it`s dependent on QA sign-off.
B: I think they`ll accept that as long as we word it clearly. They mainly need something to build their campaigns around.
C: Should we aim for the 15th, then? That gives us a week to stabilize and a week for QA.
A: Yes, let`s go with that. And if anything unexpected comes up, we`ll update. B, could you draft a short note to marketing after this call?
B: Sure, I`ll take care of it. Do you want me to include the “tentative” language or phrase it as a “target date”?
A: Better to say “target date,” but add that it`s subject to final QA approval. That way we`re transparent without sounding uncertain.
C: Works for me. I`ll also send a daily progress update to both of you so we can react quickly if we see delays.
B: Perfect. One last thing—A, are we still on for the client demo next Thursday?
A: Yes, but let`s keep it internal features only. Nothing related to payments until we`re sure it`s solid.
C: Good call. I`ll make sure the demo build is clean by Wednesday afternoon.
A: Great. Thanks, both of you. I think we`ve got a clear plan now.
B: Sounds good. Talk soon.
C: Bye, everyone.
"""


Let's run our baseline prompt on the short transcript and see what the model produces. Pay attention to whether the summary captures the key points and whether the extracted tasks have clear owners and deadlines.

For these experiments we'll use the **LangChain** framework. **LangChain** provides a unified interface to different LLM providers, utilities for prompt templating and output parsing, and a simple way to chain components together. In the code you'll see we create a prompt template, call the Fireworks model and inspect the results.

> A note on **LangChain**: **LangChain** gives us small building blocks—prompt templates, model wrappers, and a common “runnable” interface—so we can wire prompts to models without hard-coding strings or vendor-specific code. 

In [5]:
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline, BitsAndBytesConfig
from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace, HuggingFaceEmbeddings

model_id = "Qwen/Qwen2.5-14B-Instruct"

tok = AutoTokenizer.from_pretrained(model_id, use_fast=True)
bnb = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_compute_dtype="bfloat16")  # 4-bit
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=bnb,
    dtype="bfloat16",
    low_cpu_mem_usage=True,
    device_map="auto",
    do_sample=False,
    temperature=0
)

# IMPORTANT: set return_full_text=False so the prompt isn’t echoed back in the output
gen = pipeline(
    "text-generation",
    model=model,
    tokenizer=tok,
    max_new_tokens=8192,
    do_sample=True,
    pad_token_id=tok.eos_token_id,
    return_full_text=False,  # without this, the full prompt is prepended
)

llm_raw = HuggingFacePipeline(pipeline=gen)
llm = ChatHuggingFace(llm=llm_raw)

# now invoke like a normal chat model
resp = llm.invoke([
    ("system", "You are a helpful planning assistant."),
    ("human", "Summarise this transcript …")
])
print(resp.content)


embed_id = "Qwen/Qwen3-Embedding-0.6B"
embed = HuggingFaceEmbeddings(
    model_name=embed_id,
    model_kwargs={"device": "cuda"},
    encode_kwargs={"normalize_embeddings": True},
    multi_process=False
)


a = embed.embed_query("get me icecream")
b = embed.embed_query("bring me vanilla icecream")

cos = sum(x*y for x,y in zip(a,b))
print("cosine(similar tasks) ≈", round(cos, 3))

The following generation flags are not valid and may be ignored: ['temperature']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

Device set to use cuda:0


I'd be happy to help summarize the transcript, but I need the content of the transcript first. Please provide the text you want summarized.
cosine(similar tasks) ≈ 0.864


In [6]:
system_prompt = "You are a planning assistant in charge of analysing meeting transcripts to extract key information and tasks accurately."
user_prompt = """

Analyze the meeting transcript:

[Transcription]
{transcript}
[End Transcription]

Extract key information and tasks. Your answer should be:

1 - Meeting notes summary (a summary of work related content discussed in the meeting).
2 - Tasks (specific tasks assigned to individuals or teams).
3 - Decisions made (any important decisions or agreements reached during the meeting).
4 - Updates on past tasks (status updates on previously assigned tasks).
"""

`ChatPromptTemplate` lets us declare a sequence of chat messages—usually a `"system"` instruction plus a `"human"` message with placeholders like `{transcript}`—and render them into exactly what the model expects. It’s safer (no brittle string concatenation), easier to read, and it stays portable if we switch models later. You can think of it as a **form** where the transcript is the field we fill in; LangChain handles the formatting.


In [7]:
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    ("human", user_prompt),
])

LangChain introduces a concise syntax called the LangChain Expression Language (LCEL) that lets you compose *runnables* using a pipe operator (`|`). For example:

```python
(prompt | llm).invoke({"transcript": transcript_short})
```

first formats the input with the prompt and then passes the result to the model. This is analogous to Unix pipes: the output of one stage becomes the input to the next. Using `|` keeps our code clean and supports streaming and easy debugging.

In [8]:
response = (prompt | llm).invoke({"transcript": transcript})
print("Response:\n", response.content)

Response:
 ### 1. Meeting Notes Summary
The meeting focused on aligning the product launch timeline with marketing's requirements while ensuring technical readiness. Key points included:
- Engineering is not fully prepared due to ongoing backend issues, particularly with the payment system integration.
- There was a discussion on whether an additional two weeks would suffice for resolving these issues.
- Significant progress has been made, reducing error rates by almost 40%.
- A tentative launch date of the 15th was proposed, contingent upon successful QA sign-off.
- The team agreed to maintain transparency while being realistic about timelines.
- Preparations for a client demo were also discussed, limiting it to internal features until payment-related issues are resolved.

### 2. Tasks
- **B**: Draft a short note to marketing indicating the target launch date as the 15th, but subject to final QA approval.
- **C**: Send daily progress updates to A and B regarding the backend fixes and 

Take a moment to read the model's output. Does the summary reflect the main themes of the meeting? Are the tasks clearly identified with owners and due dates? Make note of any omissions or hallucinations—you'll use these observations to improve the pipeline in the next sections.

## Building a basic meeting‑minutes agent

In the previous section you saw how a single prompt can summarise a meeting and extract tasks. However, relying on one prompt is brittle: important details are missed and tasks may be misassigned. In this stage we’ll construct a basic agent with LangGraph that decomposes the job into three nodes – one to summarise the transcript, one to extract tasks, and one to list explicit decisions. By structuring the workflow we’ll make each step simpler and easier to debug.

Before we can build our agent we need to define the state that flows through it. 

LangGraph uses *schemas* to describe the structure of the state. The main kinds of schemas you can provide to `StateGraph` are:

- **state\_schema** – describes the complete set of keys your graph may read or write. It can be a `TypedDict`, a `dataclass` or a Pydantic `BaseModel`.
- **input\_schema** – a subset of the state schema specifying which keys must be provided when you call `invoke`. When omitted, the state schema serves as the input schema.
- **output\_schema** – a subset of the state schema defining which keys are returned from the graph. Use this to hide internal channels or intermediate values.
- **context\_schema** – an optional read‑only structure passed to every node, used for immutable configuration or dependencies such as database connections.

You can also define private state classes for channels that are only used for internal communication between nodes.

#### Step 1 - Summarisation Node

For the first step the state only needs two fields: the original transcript and a summary that we’ll populate.

In [9]:
# Minimal LangGraph imports
from langgraph.graph import StateGraph, END, START
from typing_extensions import TypedDict

# Our minimal state holds the meeting transcript and a summary.
class OverallState(TypedDict):
    transcript: str
    summary: str

# Define which keys are required as input when invoking the graph.
class InputState(TypedDict):
    transcript: str

# Define which keys we want to expose at the end of the graph.
class OutputState(TypedDict):
    summary: str


We then write a summarize_node function that uses our chat model to produce a concise summary of the meeting. This node will read the transcript from the state and return a new dictionary with the summary.

In [10]:
# Node: generate a concise summary of the meeting transcript using our LLM helper.
def summarize_node(state: InputState) -> dict:
    system_prompt = (
        "You are a planning assistant who writes very concise summaries of meetings. "
        "Capture only the most important objectives and outcomes."
    )
    user_prompt = "Summarize the following meeting transcript in a few sentences:{transcript}"
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", user_prompt),
    ])

    response = (prompt | llm).invoke({"transcript": state["transcript"]})
    return {"summary": response.content}


Now, using LangGraph simple abstractions we define the different nodes and connect them. 

In [11]:
# Build a minimal graph that just summarizes the transcript
builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
builder.add_node('summarize', summarize_node)
builder.set_entry_point('summarize')
builder.add_edge('summarize', END)

# Compile the graph
summary_graph = builder.compile()


Once the graph schema is completed, we compile it and invoke our agent passing the transcript in the **InputState**.

In [12]:
# Invoke the graph on our short transcript
result = summary_graph.invoke({"transcript": transcript})
print('Summary:', result['summary'])

Summary: The meeting focused on aligning the product launch timeline due to marketing's requests for a specific date. Engineering highlighted unresolved backend issues, particularly with the payment system, necessitating more time for stabilization. After assessing recent improvements, a tentative launch target was set for the 15th of the month, subject to final QA approval. A will receive a detailed progress update daily. Additionally, the team agreed to proceed with an upcoming client demo focusing solely on internal features unrelated to payments until further validation is complete.


#### Step 2 - Extracting Tasks

A summary gives us the high‑level picture, but we also need to know what work needs to be done. We extend our state to include a `tasks` field and write a `tasks_node` function. This node asks the model to list each action item along with the person responsible and any deadlines.

In [13]:
# Extended state with tasks
class OverallState(TypedDict):
    transcript: str
    summary: str
    tasks: str

# Node: extract tasks
def tasks_node(state: OverallState) -> dict:
    system_prompt = (
        "You are a planning assistant who extracts tasks from meetings. "
        "List each task with the responsible person and any deadlines. Use bullet points."
    )
    user_prompt = "Extract the tasks from this meeting transcript:{transcript}"
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", user_prompt),
    ])

    response = (prompt | llm).invoke({"transcript": state["transcript"]})
    return {"tasks": response.content}


In [14]:
# Build a graph with two sequential nodes: summarization followed by task extraction
builder = StateGraph(OverallState)
builder.add_node('summarize', summarize_node)
builder.add_node('tasks', tasks_node)
builder.set_entry_point('summarize')
builder.add_edge('summarize', 'tasks')
builder.add_edge('tasks', END)

meeting_graph = builder.compile()

result = meeting_graph.invoke({"transcript": transcript})
print('Summary:', result['summary'])
print('Tasks:', result['tasks'])


Summary: The meeting focused on aligning the product launch timeline due to marketing's inquiries. Engineering highlighted unresolved backend issues, particularly with the payment system, suggesting a two-week delay for stability. The team agreed on aiming for a target launch date of the 15th, contingent upon final QA approval. Marketing will receive a formal notification shortly. Daily progress updates will be shared to address any unforeseen delays promptly. Additionally, the client demo will proceed with internal features only, excluding payment-related functionalities until further validation.
Tasks: - **Draft a note to marketing regarding the product launch target date (by end of today)**  
  Responsible: B

- **Send daily progress updates on backend fixes**  
  Responsible: C

- **Prepare for the client demo (internal features only, no payments)**  
  Responsible: C  
  Deadline: Wednesday afternoon

- **Ensure QA sign-off before finalizing the launch date**  
  Responsible: C (i

### Step 3: Capture Decisions

Meetings often end with explicit decisions or agreements. To capture these, we add a `decisions` field to our state and write a `decisions_node`. This node lists the important decisions made during the meeting in bullet form. We then combine our three nodes into a single graph to produce a structured report.

In [15]:
# Extend the state again to include decisions
class OverallState(TypedDict):
    transcript: str
    summary: str
    tasks: str
    decisions: str

def decisions_node(state: OverallState) -> dict:
    system_prompt = (
        "You are a planning assistant who lists important decisions made during a meeting. "
        "Return bullet points summarising each decision."
    )
    user_prompt = "List any explicit decisions from this meeting transcript:{transcript}"
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", user_prompt),
    ])

    response = (prompt | llm).invoke({"transcript": state["transcript"]})
    
    return {"decisions": response.content}


In [16]:
# Build a graph with three nodes: summarize -> tasks -> decisions
builder = StateGraph(OverallState)
builder.add_node('summarize', summarize_node)
builder.add_node('tasks', tasks_node)
builder.add_node('decisions', decisions_node)
builder.set_entry_point('summarize')
builder.add_edge('summarize', 'tasks')
builder.add_edge('tasks', 'decisions')
builder.add_edge('decisions', END)

full_graph = builder.compile()

result = full_graph.invoke({"transcript": transcript})
print('Summary:', result['summary'], end='\n\n')
print('Tasks:', result['tasks'], end='\n\n')
print('Decisions:', result['decisions'])


Summary: The meeting focused on aligning the product launch timeline due to pressure from marketing for a concrete date. Engineering highlighted ongoing backend issues, particularly with payment system integration, suggesting another two weeks might be necessary before committing to a firm date. After discussing, they agreed on targeting the 15th for the launch, contingent upon final QA approval. Marketing will receive a note stating this as a "target date" with a clear mention of its dependency on QA sign-off. Additionally, daily progress updates will be shared to ensure quick responses to any potential delays. A reminder was set for an upcoming client demo, limiting it to internal features until payment-related issues are resolved.

Tasks: - B to draft a short note to marketing stating the "target date" of the 15th, subject to final QA approval. Deadline: End of today.
- C to send daily progress updates on backend fixes to A and B. Deadline: Daily until further notice.
- C to ensure 

Our basic agent successfully decomposes the meeting summarisation problem into three steps: summarising the transcript, extracting tasks, and listing decisions. This simple graph demonstrates how LangGraph passes state between nodes and makes each step independent. However, the tasks are free‑form strings, meaning that some items might be missing, others could be ambiguous, and there is no way to validate them automatically. To build a more robust assistant we need to validate and refine the model’s outputs. In the next stage we will add a self‑reflection loop.

## 🧪 Exercise 1: Add a "highlights" node after the summary

**What:** From the existing `summary` in state, create 2–3 concise bullet highlights.  
**Why:** Practice adding a node that *derives* data from previous state, no LLM needed.

**Rules:**
- Use simple heuristics (split by sentences, pick the 2–3 most informative).
- Return `{"highlights": List[str]}`.
- Wire this node **after** `summary`.

**Success check:** After running the graph, `state["highlights"]` exists and is a short list.


In [17]:
# Your code for the graph here

### Self‑reflection to improve task extraction

In the basic agent we asked the model to list “tasks” and hoped for the best. Sometimes it works; other times it could miss items or invent details. We’re going to teach our agent to check its own work. The idea is simple: generate a first pass of tasks, run a small critic against a rubric (“don’t invent names or dates; keep titles concise; owners must appear in the transcript”), and if it fails, run a targeted repair pass before we accept the result. This follows the “reflection / Reflexion” pattern popular in agent design and is a natural fit for LangGraph because we can route on a condition and even loop a couple of times.

#### Step 1 - Standardise the task format

We’ll switch from free-form task text to **typed objects**. That makes the critic’s job clear and the repair step easier to target. We use Pydantic models for `Task`, `TaskList`, and a tiny `Reflection` result. LangChain supports structured output via schema-aware prompts and parsers; we’ll use a Pydantic parser so this works with any chat model.

We first define what a *Task* is. Binding this schema to the LLM makes the output predictable.

Here we ask the model to return tasks as objects, then normalise and de-duplicate them with Pydantic v2. We accept natural phrasing for due_date (“next Friday”, “Q4”) and drop unknown fields so downstream code stays stable. The model may call fields by different names, so we use AliasChoices to map common variants (e.g., owner, assigned_to, who → assignee). Finally, a light model_validator filters obviously broken items and a pass of de-duplication removes near-duplicates.

In [18]:
from typing import Optional, List, Literal, Dict, Any
from pydantic import BaseModel, Field, ConfigDict, field_validator, AliasChoices, model_validator
import json

Status = Literal["ToDo", "InProgress", "Done", "Closed"]

class Task(BaseModel):
    model_config = ConfigDict(extra="ignore")  # drop unexpected fields
    title: str = Field(min_length=3, validation_alias=AliasChoices("title", "task", "name"))
    assignee: str = Field(min_length=1, validation_alias=AliasChoices("assignee", "person", "owner", "assigned_to", "who"))
    description: str = Field(min_length=1, validation_alias=AliasChoices("description", "details", "info", "notes"))
    # due_date intentionally free text; NO ISO enforcement (can be "next week", "Q4", etc.)
    due_date: Optional[str] = Field(default=None, validation_alias=AliasChoices("due_date", "deadline", "when", "by", "due"))
    status: Status = "ToDo"

    @field_validator("title", "assignee", "description", "due_date", mode="before")
    @classmethod
    def _strip(cls, v):
        return v.strip() if isinstance(v, str) else v

class TaskList(BaseModel):
    tasks: List[Task]
    
    @model_validator(mode="before")
    @classmethod
    def _normalize_and_filter(cls, v):
        """Accept various wrapper shapes and drop blatantly invalid task dicts early."""
        items = v if isinstance(v, list) else v.get("tasks") if isinstance(v, dict) else None
        if items is None and isinstance(v, dict) and isinstance(v.get("properties"), dict):
            maybe = v["properties"].get("tasks")
            if isinstance(maybe, list):
                items = maybe
        if items is None:
            return v  # let Pydantic raise later if shape invalid

        filtered = []
        for it in items:
            if not isinstance(it, dict):
                continue
            title = (it.get("title") or it.get("task") or it.get("name") or "").strip()
            assignee = (it.get("assignee") or it.get("person") or it.get("owner") or it.get("assigned_to") or it.get("who") or "").strip()
            desc = (it.get("description") or it.get("details") or it.get("info") or it.get("notes") or "").strip()
            if title and assignee and desc:
                filtered.append(it)
        return {"tasks": filtered}

    @model_validator(mode="after")
    def _deduplicate(self):
        """Deduplicate tasks (case-insensitive on key fields) preserving first occurrence."""
        seen = set()
        dedup = []
        for t in self.tasks:
            key = (
                t.title.strip().lower(),
                t.assignee.strip().lower(),
                t.description.strip().lower(),
                (t.due_date or "").strip().lower(),
                t.status,
            )
            if key not in seen:
                seen.add(key)
                dedup.append(t)
        self.tasks = dedup
        return self

def to_tasklist(x) -> TaskList:
    if isinstance(x, TaskList):
        return x
    if isinstance(x, dict):
        if "tasks" in x:                      # already TaskList-shaped
            return TaskList.model_validate(x)
        if x.get("parsed") is not None:       # LCEL StructuredOutput shape
            return TaskList.model_validate(x["parsed"])
        raw = x.get("raw")
        if raw is not None and hasattr(raw, "content"):  # fallback: parse LLM JSON text
            data = json.loads(raw.content)
            return TaskList.model_validate(data)
    raise ValueError(f"Unsupported tasks_struct shape: {type(x)}; keys={list(x.keys()) if isinstance(x, dict) else None}")

Now, our new state should hold the `TaskList` the model is going to output

In [19]:

class OverallState(TypedDict):
    transcript: str
    summary: str
    decisions: str
    tasks_struct: TaskList  # validated list of Task dicts for downstream automation

LangChain’s `with_structured_output()` binds our Pydantic model to the chat model so the return value is already parsed and validated. We still give the model a short rubric to reduce guesswork. The prompt is assembled with ChatPromptTemplate, which keeps variables and messaging tidy.

> Note: You must verify if your model allows this. This is a feature embedded into the LLM and allowed by the AI Provider.

In [20]:
tasklist_llm = llm.with_structured_output(
    TaskList,
    method="json_schema"
)

The `tasks_structured_node` node reads the meeting `transcript` and returns a validated `TaskList`.

In [21]:
SCHEMA_CONSTRAINTS = """
    Schema & constraints:
    - Only keys: title, description, assignee, due_date?, status.
    - Title and Description are non-empty strings must be representative of the task at hand.
    - Status ∈ ["ToDo", "InProgress", "Done", "Closed"] (exact casing).
    - due_date is optional (null of free text); keep whatever natural phrasing appears (e.g. "next Friday", "in two weeks", "Q4"). Do NOT normalize to ISO.
    - Assignee must be a non-empty string, is a speaker in the transcript.
"""

def tasks_structured_node(state: OverallState) -> Dict:
    system_prompt = f"""
        You are a planning assistant who extracts tasks from meeting transcripts.
        Your goal is to identify and extract actionable tasks from a transcript.
        Return ONLY a TaskList JSON *instance* (the data), not a JSON Schema.
        The JSON must be valid and correct.

        {SCHEMA_CONSTRAINTS}
    """
    
    user_prompt = "Transcript:\n{transcript}\nYou must always return valid JSON object."
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", user_prompt),
    ])
    
    tasklist = (prompt | tasklist_llm).invoke({"transcript": state["transcript"]})
    return {"tasks_struct": tasklist}

We insert our new tasks_structured node between summarize and decisions. This keeps the flow readable and lets each node focus on one job. The code below reuses summarize_node and decisions_node from Stage 1; run those cells first if you skipped ahead.

In [22]:
import pprint

# Build a graph with three nodes: summarize -> tasks -> decisions
builder = StateGraph(OverallState)
builder.add_node('summarize', summarize_node)
builder.add_node('tasks_structured', tasks_structured_node)
builder.add_node('decisions', decisions_node)
builder.set_entry_point('summarize')
builder.add_edge('summarize', 'tasks_structured')
builder.add_edge('tasks_structured', 'decisions')
builder.add_edge('decisions', END)

full_graph = builder.compile()


In [23]:

try:
    result = full_graph.invoke({"transcript": transcript})
    print('Summary:', result['summary'], end='\n\n')
    print('Tasks:')
    
    for t in to_tasklist(result['tasks_struct']).tasks:
        pprint.pprint(t)
        print()
    print()
    print('Decisions:', result['decisions'])
except Exception as e:
    print(e)


Invalid json output: {
  "tasks": [
    {
      "title": "Draft note to marketing about the product launch target date",
      "description": "B should draft a note mentioning the target launch date as the 15th, while clearly stating it's subject to final QA approval.",
      "assignee": "B",
      "due_date": "Today",
      "status": "ToDo"
    },
    {
      "title": "Send daily progress updates",
      "description": "C should send daily updates on backend fixes to A and B.",
      "assignee": "C",
      "due_date": "Every day until the launch date",
      "status": "ToDo"
    },
    "title": "Prepare client demo for next Thursday",
    "description": "Ensure the demo build excludes payment features until QA is complete.",
    "assignee": "C",
    "due_date": "Wednesday afternoon",
    "status": "ToDo"
  ]
}
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/OUTPUT_PARSING_FAILURE 


In [24]:
tasklist_llm = llm.with_structured_output(
    TaskList,
    method="json_schema", 
    include_raw=True
)

In [25]:
# 4) Fallback: try to auto-fix bad JSON then parse to TaskList
from langchain_core.output_parsers import JsonOutputParser
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser, RetryOutputParser

In [26]:

def tasks_structured_node_reparse(state: OverallState) -> Dict:
    system_prompt = f"""
        You are a planning assistant who extracts tasks from meeting transcripts.
        Your goal is to identify and extract actionable tasks from a transcript.
        Return ONLY a TaskList JSON *instance* (the data), not a JSON Schema.
        You must always return valid JSON object.

        {SCHEMA_CONSTRAINTS}
    """
    
    user_prompt = "Transcript:\n{transcript}\nYou must always return valid JSON object."
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", user_prompt),
    ])

    out = (prompt | tasklist_llm).invoke({"transcript": state["transcript"]})
    if out.get("parsed") is not None:
        tasklist = out["parsed"]
    else:
        base = PydanticOutputParser(pydantic_object=TaskList)
        fixing = OutputFixingParser.from_llm(parser=base, llm=llm, max_retries=5)
        print("trying to parse:", out["raw"].content)
        tasklist = fixing.parse(out["raw"].content)  # <-- no prompt needed
        print(tasklist)

    return {"tasks_struct": to_tasklist(tasklist) }

In [27]:

# Build a graph with three nodes: summarize -> tasks -> decisions
builder = StateGraph(OverallState)
builder.add_node('summarize', summarize_node)
builder.add_node('tasks_structured', tasks_structured_node_reparse)
builder.add_node('decisions', decisions_node)
builder.set_entry_point('summarize')
builder.add_edge('summarize', 'tasks_structured')
builder.add_edge('tasks_structured', 'decisions')
builder.add_edge('decisions', END)

full_graph = builder.compile()

In [28]:

result = full_graph.invoke({"transcript": transcript})
print('Summary:', result['summary'], end='\n\n')
print('Tasks:')

for t in to_tasklist(result['tasks_struct']).tasks:
    pprint.pprint(t)
    print()
print()
print('Decisions:', result['decisions'])


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Summary: The meeting focused on setting a product launch timeline, addressing concerns raised by marketing while ensuring engineering readiness. It was decided to set a target launch date of the 15th, contingent upon final QA approval. Engineering aims to resolve backend issues and complete regression testing by the end of next week. Marketing will receive a formal update post-meeting, and daily progress updates will be shared. Additionally, the team agreed to limit the upcoming client demo to internal features only, excluding payment-related functionalities until further validation.

Tasks:
Task(title='Draft a note to marketing regarding the product launch', assignee='B', description='Draft a note to marketing indicating the target date for the product launch is the 15th, but specify that this is subject to final QA approval.', due_date=None, status='ToDo')

Task(title='Send daily progress updates', assignee='C', description='Send daily progress updates to A and B about the backend fi

Great! now we have proper task extraction. Run the Cell above again a few times, you might see that once in a while, there are missalignments in the task detection. Sometimes it detects 3 tasks. Other times it detects 4, and so on.

This is how our graph looks right now.

In [29]:
# from IPython.display import Image, display
# display(Image(full_graph.get_graph().draw_mermaid_png()))

print(full_graph.get_graph().draw_ascii())

    +-----------+    
    | __start__ |    
    +-----------+    
          *          
          *          
          *          
    +-----------+    
    | summarize |    
    +-----------+    
          *          
          *          
          *          
+------------------+ 
| tasks_structured | 
+------------------+ 
          *          
          *          
          *          
    +-----------+    
    | decisions |    
    +-----------+    
          *          
          *          
          *          
    +---------+      
    | __end__ |      
    +---------+      


#### Step 2 - Critique and repair the task list

Even with a schema, the model sometimes omits tasks or includes spurious ones. We can improve reliability by adding a critic. We define a `Reflection` Pydantic model with three fields: `approve` (a boolean indicating whether the current task list meets our rubric), `issues` (a list of short descriptions of problems) and `instructions` (concise guidance on how to fix those problems). A critic node uses this model to decide whether the candidate task list is acceptable. If it isn’t, a repair node generates a revised list following the critic’s instructions. We loop between critic and repair until the tasks are approved or a maximum number of iterations is reached. LangGraph’s conditional edges and immutable state make it easy to express this loop.

In [30]:
MAX_REFLECTION_LOOPS = 3 # keep tiny for speed & determinism

class Reflection(BaseModel):
    approve: bool = Field(..., description="True if the candidate TaskList passes the rubric.")
    issues: List[str] = Field(default_factory=list, description="Concise bullet points of problems found.")
    instructions: str = Field(..., description="Short, actionable instructions to revise the TaskList.")

    @field_validator("instructions", mode="before")
    @classmethod
    def _norm_instructions(cls, v):
        # Accept list[str] or str; normalize to a single newline-separated string
        if isinstance(v, list):
            return "\n".join(s.strip() for s in v if str(s).strip())
        return str(v).strip()

    
def to_reflection(x) -> Reflection:
    if isinstance(x, Reflection):
        return x
    if hasattr(Reflection, "model_validate"):
        return Reflection.model_validate(x)
    return Reflection.parse_obj(x)

Our `OverallState` now has to support storing critics and attempt counts

In [31]:
class OverallState(TypedDict):
    transcript: str
    summary: str
    decisions: str
    tasks_struct: TaskList
    approved: bool
    attempts: int
    issues: List[str]
    instructions: str
    history: List[Dict[str, Any]]

We encode the acceptance rubric in the critic’s system prompt, and bind both critic and repair to structured outputs so they return `Reflection` and `TaskList` objects directly.

In [32]:
INSTRUCTIONS_STYLE = """
    - '+ title=... assignee=...': add a grounded task; ensure fields are supported by cited lines.
    - '- title=... assignee=...': remove EXACT matching task if present.
    - '~ title=... assignee=... field=... -> ...': modify the specified field value only.
"""

REFLECTION_SYSTEM = f"""
    You are a TASK QUALITY AUDITOR. Review a candidate TaskList against this rubric:

    {SCHEMA_CONSTRAINTS}

    GROUNDING REQUIREMENT:
    For every issue you raise, cite supporting transcript line numbers like L12 or ranges L34-L37. You will receive the transcript as a single block; treat each newline-delimited utterance as one line (first line = L1). If evidence spans non-contiguous lines, list them comma-separated.

    Return a Reflection JSON: approve (bool), issues (list of strings), instructions (string).

    All Instructions MUST be a plan consisting of one directive per line. For example:

    {INSTRUCTIONS_STYLE}

    Reject on any violation but accept clear interpretations that might not be explicit.
    You must always return valid JSON object.
"""

REPAIR_SYSTEM = f"""
    You are a TASK REPAIR EXPERT. You will receive a transcript, a candidate list of TASKS and some Instructions to fix the TASKS.

    You must follow ONLY the Instructions and FIX the TASKS accordingly. Treat them atomically:
    {INSTRUCTIONS_STYLE}

    Preserve unchanged tasks.
    If a directive cannot be executed due to missing grounding, SKIP it (do not guess).
    You must always return valid JSON object..
"""


In [33]:
critic_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", REFLECTION_SYSTEM),
        ("human", 
            (
                "Transcript (for grounding; each line is numbered implicitly by order):\n{transcript}\n\n"\
                "Candidate TaskList JSON:\n{candidate_json}\n\n"\
                "You must always return valid JSON object."
            ),
        )
    ]
)

repair_prompt = ChatPromptTemplate.from_messages(
    [
    ("system", REPAIR_SYSTEM),
    ("human", 
        (
            "Transcript:\n{transcript}\n\n"\
            "Candidate TaskList JSON (to revise):\n{candidate_json}\n\n"\
            "Diff-style critic instructions:\n{instructions}\n\n"\
            "FOLLOW THE INSTRUCTIONS and Fix the TaskList. You must always return valid JSON object."
        ),
    )
    ]
)

reflection_llm = llm.with_structured_output(
    Reflection, 
    method="json_schema",
    include_raw=True
)

critic_chain = critic_prompt | reflection_llm
repair_chain = repair_prompt | tasklist_llm

Now, our `reflect_node` runs the critic. If the critic approves or we hit the loop cap, we stop; otherwise we store instructions for the repair step.
`repair_node` regenerates a corrected `TaskList` using the critic’s instructions. We also append a small history trail for debugging.

In [34]:
def reflect_node(state: OverallState) -> Dict[str, Any]:
    candidate_json = to_tasklist(state["tasks_struct"]).model_dump_json()
    attempts = state.get("attempts", 0)
    history = state.get("history", [])

    # Pass raw transcript; critic infers line numbers by order.
    response = critic_chain.invoke( {
            "transcript": state["transcript"],
            "candidate_json": candidate_json
    })
    
    if response.get("parsed") is not None:
        reflection = response["parsed"]
    else:
        base = PydanticOutputParser(pydantic_object=Reflection)
        fixing = OutputFixingParser.from_llm(parser=base, llm=llm, max_retries=5)
        print("trying to parse (reflection):", response["raw"].content)
        reflection = fixing.parse(response["raw"].content)  # <-- no prompt needed
        
    reflection = to_reflection(reflection)
    reflection_approved = reflection.approve and not reflection.issues
    
    out = {
        "approved": reflection_approved,
        "issues": reflection.issues,
        "history": history + [{"stage": "reflect", **reflection.model_dump()}],
        "attempts": attempts,  # increment on repair
    }

    if reflection_approved or attempts >= MAX_REFLECTION_LOOPS:
        return out
    
    print(f"Reflection found issues (attempt {attempts + 1}):")
    for issue in reflection.issues:
        print(f" - {issue}")
    print()

    out["instructions"] = reflection.instructions
    return out

In [35]:
def repair_node(state: OverallState) -> Dict[str, Any]:    
    candidate_json = to_tasklist(state["tasks_struct"]).model_dump_json()
    attempts = state.get("attempts", 0)
    history = state.get("history", [])

    response = repair_chain.invoke(
        {
            "transcript": state["transcript"],
            "candidate_json": candidate_json,
            "instructions": state.get("instructions", ""),
        }
    )
    
    if response.get("parsed") is not None:
        fixed = response["parsed"]
    else:
        base = PydanticOutputParser(pydantic_object=Reflection)
        fixing = OutputFixingParser.from_llm(parser=base, llm=llm, max_retries=5)
        print("trying to parse (repair):", response["raw"].content)
        fixed = fixing.parse(response["raw"].content)  # <-- no prompt needed

    fixed_tl = to_tasklist(fixed)
    print(f"Repair attempt {attempts + 1} completed. New Tasks (n={len(fixed_tl.tasks)}):")
    for t in fixed_tl.tasks:
        print(f"  - {t.title} [{t.assignee}]  due={t.due_date}  status={t.status}")

    return {
        "tasks_struct": fixed_tl,
        "attempts": attempts + 1,
        "history": history + [{"stage": "repair", "tasks": fixed_tl.model_dump()}],
    }

    return {
        "tasks_struct": fixed_tl,
        "attempts": attempts + 1,
        "history": history + [{"stage": "repair", "tasks": fixed_tl}],
    }

Now, we should wire the loop with a gate

We send control `tasks_structured → reflect → (approved? decisions : repair → reflect …)`.

In [36]:
from langgraph.graph import StateGraph, END

builder = StateGraph(OverallState)
builder.add_node("summarize", summarize_node) # existing
builder.add_node("tasks_structured", tasks_structured_node_reparse) # existing
builder.add_node("reflect", reflect_node) # new
builder.add_node("repair", repair_node) # new
builder.add_node("decisions", decisions_node) # existing

builder.set_entry_point("summarize")
builder.add_edge("summarize", "tasks_structured")
builder.add_edge("tasks_structured", "reflect")

def _gate(state: OverallState) -> str:
    # stop if approved or we've reached the small retry cap
    if state.get("approved") or state.get("attempts", 0) >= MAX_REFLECTION_LOOPS:
        return "decisions"
    return "repair"

builder.add_conditional_edges("reflect", _gate, {"repair": "repair", "decisions": "decisions"})
builder.add_edge("repair", "reflect")
builder.add_edge("decisions", END)

full_graph_reflect_2 = builder.compile()

In [37]:
refl_state_2 = full_graph_reflect_2.invoke({"transcript": transcript})

print("\nRefined tasks after reflection:")
pprint.pprint(to_tasklist(refl_state_2["tasks_struct"]).tasks)

print("\nApproved?", refl_state_2.get("approved"), " | Attempts:", refl_state_2.get("attempts", 0))
if refl_state_2.get("issues"):
    print("Critic issues:", *refl_state_2["issues"], sep="\n - ")

trying to parse: {
    "tasks": [
        {
            "title": "Draft note to marketing about product launch target date",
            "description": "Write a note mentioning the tentative launch date of 15th, but explicitly state that it's subject to final QA approval.",
            "assignee": "B",
            "due_date": "before the end of this week",
            "status": "ToDo"
        },
        {
            "title": "Send daily progress updates",
            "description": "Provide regular updates on the backend work to A and B.",
            "assignee": "C",
            "due_date": "daily until further notice",
            "status": "ToDo"
        },
        "title": "Prepare clean demo build",
        "description": "Ensure the demo build does not include any unstable payment features and is ready by Wednesday afternoon.",
            "assignee": "C",
            "due_date": "Wednesday afternoon",
            "status": "ToDo"
        }
    ]
}
tasks=[Task(title='Draft note 

This is how our new graph looks like. You can see we have a small cycle following the Reflexion pattern, in which externally from the actual generation, we get an evaluation and instructions to fix/improve the generated tasks. Then, the repair node take those instructions and try to fix the candidate response previously shown.

In [38]:
# display(Image(full_graph_reflect.get_graph().draw_mermaid_png()))
print(full_graph_reflect_2.get_graph().draw_ascii())

         +-----------+             
         | __start__ |             
         +-----------+             
                *                  
                *                  
                *                  
         +-----------+             
         | summarize |             
         +-----------+             
                *                  
                *                  
                *                  
      +------------------+         
      | tasks_structured |         
      +------------------+         
                *                  
                *                  
                *                  
          +---------+              
          | reflect |              
          +---------+              
          **         ..            
        **             ..          
       *                 .         
+--------+          +-----------+  
| repair |          | decisions |  
+--------+          +-----------+  
                          * 

Lets take a peek at the history. Lets see why the gate decided to stop

In [39]:
refl_state_2

{'transcript': '\nA: Good morning, everyone. Thanks for joining on short notice. I wanted us to sync about the product launch timeline because we`ve been getting questions from marketing.\nB: Morning! Yeah, I saw the emails yesterday. They`re pushing for a concrete date, but I don`t think engineering is fully comfortable committing yet.\nC: Exactly. We`re still ironing out some of the backend issues. The integration with the payment system isn`t as smooth as it should be, and if we push too quickly, we`ll have failures during checkout.\nA: Right, and we don`t want customers to be the ones finding those bugs. B, from your side, do you think another two weeks would make a difference?\nB: Two weeks sounds reasonable, but it depends on how quickly C`s team can finalize the fixes. If QA doesn`t get enough time, we`ll just be pushing the risk down the road.\nC: True, but we`ve made progress. Yesterday the team managed to cut down the error rate by almost 40%. If we keep that pace, by the end

By asking the model to evaluate its own output we improve task quality without changing the base model. The reflection loop demonstrates how LangGraph can route execution based on state and support iterative refinement.

### Persisting tasks with Langgraph Memory

Our agent now produces a clean, approved TaskList. If we stop here, those tasks vanish when the run ends. In this stage we give the agent a simple long-term memory so tasks survive across runs and across transcripts. LangGraph was built for stateful, long-running agents and supports durability and memory out-of-the-box. We’ll keep it simple: a tiny key–value store for tasks, plus a dash of semantic matching so we update existing tasks instead of creating duplicates.

In [40]:

import re
from langgraph.store.base import BaseStore           # Type for injection

There are two kinds of memory to keep straight:

- Checkpointing: lets the graph pause/resume during execution (e.g., across the reflection loop). That’s short-term robustness within a run.
- Task storage: a small, persistent store where approved tasks are written so they’re still there on the next run.

LangGraph’s design helps with both: durable execution and memory primitives for agent state. In our class we’ll use a lightweight local store so everyone can run this without extra services. 

To keep the focus on agent logic, we use a in-memory key–value store. Each task is saved under a stable key (we’ll derive it from the `title`) and we keep the original fields (`title`, `description`, `assignee`, `due_date`, `status`).

In [41]:
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()

And a simple list tasks function to see what we got in store

In [42]:
def _ns(user_id: str) -> tuple[str, ...]:
    return ("users", user_id, "tasks")

def list_tasks_store(store: InMemoryStore, user_id: str, limit: int = 1000) -> List[dict]:
    ns = _ns(user_id)
    # No query ⇒ list; increase limit for classroom demos. :contentReference[oaicite:12]{index=12}
    items = store.search(ns, limit=limit)
    out = []
    for it in items:
        v = it.value
        out.append({
            "key": it.key,
            "title": v.get("title"),
            "description": v.get("description"),
            "assignee": v.get("assignee"),
            "due_date": v.get("due_date"),
            "status": v.get("status"),
            "score": it.score,  # may be None if no query
        })
    return out

Our new state includes the `persist_results`

In [43]:

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END

class OverallState(TypedDict, total=False):
    transcript: str
    summary: str
    decisions: str
    tasks_struct: TaskList
    approved: bool
    attempts: int
    issues: List[str]
    instructions: List[str]
    persist_results: List[dict]
    history: List[Dict[str, Any]]

We define our persist node with key slug normalization

In [44]:
def _slug(s: str) -> str:
    s = "" if s is None else str(s)
    s = re.sub(r"[^a-z0-9\s\-_/]", "", s.strip().lower())
    s = re.sub(r"[\s/]+", "-", s)
    return s[:120] or "untitled"

# Persist ALL tasks deterministically (no tool-calling needed here)
def persist(state: OverallState, *, store: BaseStore):
    user_id = "u-demo"
    task_list = state["tasks_struct"]
    ns = _ns(user_id)

    results = []
    print("Persisting tasks...")
    for task in task_list.tasks:
        key = _slug(task.title)

        key_exists = store.get(ns, key) is not None
        action = "updated" if key_exists else "created"

        store.put(ns, key, task.model_dump())

        results.append({"action": action, "key": key})
        print(f"  ├─ {action.upper():7} → {key} [status={task.status}]")

    return {"persist_results": results}

And we are ready for building our graph again.

After the reflection loop approves the TaskList, the graph runs a persist node that:

- iterates through tasks_struct.tasks,
- looks up a best match in the store (semantic + lexical), and
- upserts (update-or-insert) a single normalized record per task.

Nothing here is model-dependent; it’s just deterministic logic that makes the agent feel like a real application instead of a demo.

In [45]:
# Build your graph as before
builder = StateGraph(OverallState)
builder.add_node("summarize", summarize_node)
builder.add_node("tasks_structured", tasks_structured_node_reparse)
builder.add_node("reflect", reflect_node)
builder.add_node("repair", repair_node)
builder.add_node("decisions", decisions_node)
builder.add_node("persist", persist)

builder.set_entry_point("summarize")
builder.set_finish_point("decisions")

def _gate(state: OverallState) -> str:
    if state.get("approved") or state.get("attempts", 0) >= MAX_REFLECTION_LOOPS:
        return "persist"
    return "repair"

builder.add_edge("summarize", "tasks_structured")
builder.add_edge("tasks_structured", "reflect")
builder.add_conditional_edges("reflect", _gate, {"repair": "repair", "persist": "persist"})
builder.add_edge("repair", "reflect")
builder.add_edge("persist", "decisions")
builder.add_edge("decisions", END)

# 3) Compile with the store injected so nodes can receive it
full_graph = builder.compile(store=store)

# display(Image(full_graph_reflect.get_graph().draw_mermaid_png()))
print(full_graph.get_graph().draw_ascii(), end="\n\n")


         +-----------+            
         | __start__ |            
         +-----------+            
               *                  
               *                  
               *                  
         +-----------+            
         | summarize |            
         +-----------+            
               *                  
               *                  
               *                  
     +------------------+         
     | tasks_structured |         
     +------------------+         
               *                  
               *                  
               *                  
          +---------+             
          | reflect |             
          +---------+             
          ..        ..            
        ..            ..          
       .                .         
+--------+          +---------+   
| repair |          | persist |   
+--------+          +---------+   
                          *       
                    

In [46]:

# 4) Run as usual
final_state = full_graph.invoke({"transcript": transcript})
print("Persist results:", final_state.get("persist_results"))

trying to parse: {
    "tasks": [
        {
            "title": "Draft note to marketing regarding product launch",
            "description": "B will draft a short note to marketing indicating the target date for the product launch is the 15th, but subject to final QA approval.",
            "assignee": "B",
            "status": "ToDo"
        },
        {
            "title": "Send daily progress updates",
            "description": "C will send daily progress updates to A and B so they can react quickly if there are any delays.",
            "assignee": "C",
            "status": "ToDo"
        },
        "title": "Prepare clean demo build",
        "description": "C will ensure the demo build for the internal client demo is clean by Wednesday afternoon, excluding payment-related features.",
        "assignee": "C",
        "status": "ToDo"
    ]
}
tasks=[Task(title='Draft note to marketing regarding product launch', assignee='B', description='B will draft a short note to marketin

In [47]:

# Quick peek: list tasks persisted in Store
for row in list_tasks_store(store=store, user_id="u-demo"):
    print(row)

{'key': 'send-daily-progress-updates', 'title': 'Send daily progress updates', 'description': 'C will send daily progress updates to A and B so they can react quickly if there are any delays.', 'assignee': 'C', 'due_date': None, 'status': 'ToDo', 'score': None}
{'key': 'conduct-full-round-of-regression-tests-by-the-end-of-next-week', 'title': 'Conduct full round of regression tests by the end of next week', 'description': 'C will coordinate the team to conduct a full round of regression tests by the end of next week, ensuring all identified issues are resolved.', 'assignee': 'C', 'due_date': 'by the end of next week', 'status': 'ToDo', 'score': None}
{'key': 'prepare-clean-demo-build', 'title': 'Prepare clean demo build', 'description': 'C will ensure the demo build for the internal client demo is clean by Wednesday afternoon, focusing on internal features only.', 'assignee': 'C', 'due_date': None, 'status': 'InProgress', 'score': None}


This short follow‑up transcript simulates a quick sync after the original meeting. We’ll re‑compile the graph with the same store and feed this new text. The goal is to update existing tasks (e.g., “notify marketing about the 15th target date”) rather than creating look‑alike duplicates with slightly different wording. Keep an eye on the ASCII graph printout: it reflects the flow we built—summarise → extract → reflect/repair → persist → decisions.

In [48]:
follow_up_transcript = """
A: Quick sync. First—B, the marketing email about the 15th target date?
B: I have done that, I sent it right after our last call and posted in #marketing. If QA slips, I'll post an update.
C: Noted.
A: Daily progress updates—can we standardize them?
C: Yes. I'll post a short update every day at 17:00 CET in the #launch-updates thread with error rate and blockers. We will keep it in progress.
A: Perfect.
A: What about the Demo build status?
C: Finished Tuesday EOD; it's clean and payments stay disabled. So that task's done.
A: Next Thursday's client demo—let's keep it internal features only. C, can you lead the walkthrough? I'll handle intros and Q&A. Let's set it for 11:00.
C: Works for me.
D (QA): We can kick off full regression Monday and aim to share the report by Wednesday morning.
A: Perfect.
C: To speed triage we need richer checkout error logs; I can take that.
A: Please aim for Friday EOD.
B: After QA sign-off, design wants a one-pager to brief campaigns. I'll draft it Tuesday.
A: And let's have a stakeholder risk review Monday at 09:00—I'll book it.
"""


final_state = full_graph.invoke({"transcript": follow_up_transcript})
print("Persist results:", final_state.get("persist_results"))

Reflection found issues (attempt 1):
 - The 'Richer checkout error logs' task has an incomplete description ('Create richer checkout error logs.') which should include more context (L36).
 - The 'Stakeholder risk review' task's description is missing key details ('Book and conduct a stakeholder risk review.' should specify who conducts it and what exactly needs to be done, L58).

Repair attempt 1 completed. New Tasks (n=6):
  - Standardize daily progress updates [C]  due=None  status=ToDo
  - Internal demo walkthrough [C]  due=Next Thursday at 11:00  status=ToDo
  - Richer checkout error logs [C]  due=Friday EOD  status=ToDo
  - Full regression testing [D]  due=None  status=ToDo
  - One-pager for campaigns [B]  due=Tuesday  status=ToDo
  - Stakeholder risk review [A]  due=Monday at 09:00  status=ToDo
Reflection found issues (attempt 2):
 - The task 'Richer checkout error logs' has an ambiguous due_date. The original transcript specifies Friday EOD, but it lacks context indicating wheth

In [49]:

# Quick peek: list tasks persisted in Store
for row in list_tasks_store(store=store, user_id="u-demo"):
    print(row)

{'key': 'send-daily-progress-updates', 'title': 'Send daily progress updates', 'description': 'C will send daily progress updates to A and B so they can react quickly if there are any delays.', 'assignee': 'C', 'due_date': None, 'status': 'ToDo', 'score': None}
{'key': 'conduct-full-round-of-regression-tests-by-the-end-of-next-week', 'title': 'Conduct full round of regression tests by the end of next week', 'description': 'C will coordinate the team to conduct a full round of regression tests by the end of next week, ensuring all identified issues are resolved.', 'assignee': 'C', 'due_date': 'by the end of next week', 'status': 'ToDo', 'score': None}
{'key': 'prepare-clean-demo-build', 'title': 'Prepare clean demo build', 'description': 'C will ensure the demo build for the internal client demo is clean by Wednesday afternoon, focusing on internal features only.', 'assignee': 'C', 'due_date': None, 'status': 'InProgress', 'score': None}
{'key': 'daily-progress-updates-at-1700-cet', 'ti

But we see it repeated some tasks that should be refering to the same thing! Let's fix that with semantic search.


In [50]:
store = InMemoryStore(
    index={"dims": 768, "embed": embed, "fields": ["title", "description"]}  # optional
)

USER_ID = "u-demo"

Find a match key (vector search)

In [51]:
# Use the store's vector index (title→embedding) to find nearest neighbor.
SIM_THRESHOLD = 0.7

def _nearest_title(store: BaseStore, ns: tuple, title: str):
    """Return (key, score) for the nearest existing task title, or (None, None)."""
    hits = store.search(ns, query=title, limit=1) or []
    if not hits:
        return None, None
    h = hits[0]
    
    return h.key, getattr(h, "score", None)


Semantic upsert persist node

In [52]:
def persist_semantic(state: OverallState, *, store: BaseStore):
    user_id = USER_ID
    ns = ("users", user_id, "tasks")
    task_list = state.get("tasks_struct")
    results = []
    
    print("Persisting (semantic upsert)…")
    for task in task_list.tasks:
        match_key, score = _nearest_title(store, ns, task.title)
        use_match = (score is not None) and (score >= SIM_THRESHOLD)

        key = match_key if use_match else _slug(task.title)
        action = "updated" if use_match else "created"

        store.put(ns, key, task.model_dump())  # uses the index fields you configured
        results.append({"action": action, "key": key, "score": score})

        print(f"  ├─ {action.upper():7} → {key} [status={task.status}] (score={None if score is None else round(score, 3)})")

    return {"persist_results": results}

In [53]:
# Build your graph as before
builder = StateGraph(OverallState)
builder.add_node("summarize", summarize_node)
builder.add_node("tasks_structured", tasks_structured_node_reparse)
builder.add_node("reflect", reflect_node)
builder.add_node("repair", repair_node)
builder.add_node("decisions", decisions_node)
builder.add_node("persist_semantic", persist_semantic)

builder.set_entry_point("summarize")
builder.set_finish_point("decisions")

def _gate(state: OverallState) -> str:
    if state.get("approved") or state.get("attempts", 0) >= MAX_REFLECTION_LOOPS:
        return "persist_semantic"
    return "repair"

builder.add_edge("summarize", "tasks_structured")
builder.add_edge("tasks_structured", "reflect")
builder.add_conditional_edges("reflect", _gate)
builder.add_edge("repair", "reflect")
builder.add_edge("persist_semantic", "decisions")
builder.add_edge("decisions", END)

# 3) Compile with the store injected so nodes can receive it
full_graph = builder.compile(store=store)

# display(Image(full_graph_reflect.get_graph().draw_mermaid_png()))
print(full_graph.get_graph().draw_ascii(), end="\n\n")


    +-----------+    
    | __start__ |    
    +-----------+    
          *          
          *          
          *          
    +-----------+    
    | summarize |    
    +-----------+    
          *          
          *          
          *          
+------------------+ 
| tasks_structured | 
+------------------+ 
          *          
          *          
          *          
    +---------+      
    | reflect |      
    +---------+      
          *          
          *          
          *          
    +---------+      
    | __end__ |      
    +---------+      



In [54]:

final_state = full_graph.invoke({"transcript": transcript})

trying to parse: {
    "tasks": [
        {
            "title": "Draft a note to marketing regarding the product launch",
            "description": "Draft a note mentioning the target date of the 15th for the product launch, but emphasize that it's subject to final QA approval.",
            "assignee": "B",
            "due_date": "Today",
            "status": "ToDo"
        },
        {
            "title": "Send daily progress updates",
            "description": "Provide daily updates on the progress of fixing backend issues to A and B.",
            "assignee": "C",
            "due_date": "Until further notice",
            "status": "ToDo"
        },
        "title": "Prepare a demo for internal features",
        "description": "Ensure that the demo build does not include any payment-related features and is clean by Wednesday afternoon.",
        "assignee": "C",
        "due_date": "Wednesday",
        "status": "ToDo"
    ]
}
tasks=[Task(title='Draft a note to marketing re

In [55]:
final_state = full_graph.invoke({"transcript": follow_up_transcript})

Reflection found issues (attempt 1):
 - The 'Marketing Email' task is missing.
 - The 'Demo build status' should be marked as 'Done'.
 - Task 'Next Thursday's client demo' should include A's role of handling intros and Q&A.

Repair attempt 1 completed. New Tasks (n=6):
  - Standardize daily progress updates [C]  due=None  status=ToDo
  - Next Thursday's client demo [C]  due=None  status=ToDo
  - Full regression testing [D]  due=None  status=ToDo
  - Richer checkout error logs [C]  due=Friday EOD  status=ToDo
  - One-pager for campaign briefing [B]  due=Tuesday  status=ToDo
  - Stakeholder risk review [A]  due=None  status=ToDo
Reflection found issues (attempt 2):
 - Task 'Next Thursday's client demo' should have its due_date field set to 'Thursday'.
 - Task 'Richer checkout error logs' has a typo in the description field, it should read'speed triage' rather than'speed up triage'.
 - Task 'One-pager for campaign briefing' has an incorrect due_date. It should be 'Tuesday EOD', not just '

## Wrap‑up and next steps

We began with a simple prompt and iteratively built a small, working agentic system that parses a messy meeting transcript and outputs a clean summary, a structured task list, and recorded decisions. Along the way you learned how to:
- Decompose a complex task into modular nodes (summary → tasks → decisions) and wire them together with LangGraph.
- Add a critic/repair loop to improve quality before persisting results.
- Persist results across runs using a lightweight memory store and a “semantic upsert” keyed on task titles.

This pattern—**decompose, validate, reflect, and persist**—is a reusable template for many agentic workflows.

### Stretch exercise: refine the semantic matcher

Our current semantic upsert matches tasks based only on their titles. As you saw earlier, this can still produce duplicates when two tasks have different wording but the same intent. For a challenge, think about how you might incorporate the task descriptions into the similarity search. Where in the code would you need to make changes? How would you decide when to fall back on the description? Try modifying your graph accordingly, re‑run it on the follow‑up transcript, and see if fewer duplicates remain. Let me know how it goes—or take it as a take‑home challenge!
