# 05. Knowledge Graphs and GraphRAG

So far in this course, we‚Äôve explored **traditional RAG (Retrieval-Augmented Generation)** ‚Äî a paradigm where large language models retrieve **unstructured text chunks** from vector databases (like FAISS or LanceDB) and synthesize answers on-the-fly.  

While RAG works well for surface-level question answering, it **struggles with structure, reasoning, and relationships**. It treats text as isolated passages ‚Äî not as **entities** linked by meaning or causality.  That‚Äôs where **Knowledge Graphs** and **GraphRAG** step in.

## üß† Why Knowledge Graphs?

A **Knowledge Graph (KG)** represents knowledge as **nodes (entities)** and **edges (relationships)**, creating a structured and interpretable memory.  
In contrast to flat vector retrieval, KGs allow an agent to:

- **Reason symbolically** ‚Äî follow explicit paths like *‚ÄúPichu ‚Üí Pikachu ‚Üí Raichu‚Äù*.  
- **Disambiguate entities** ‚Äî distinguish *Thunderbolt (move)* vs *Thunderbolt (item)*.  
- **Fuse multi-source facts** ‚Äî merging structured and unstructured evidence.  
- **Explain answers** ‚Äî show the exact graph edges used in reasoning.

This yields an AI system that is not only more **precise** but also **auditable** and **less hallucinatory**.

**GraphRAG** blends the strengths of retrieval and structured reasoning:
1. **Retrieve** relevant context ‚Üí turn it into triples (`(subject, predicate, object)`).
2. **Store / update** these triples in a **graph backend** (persistent memory).  
3. **Reason on the graph** to answer complex or multi-hop queries.

In essence, **GraphRAG = RAG + Knowledge Graph Reasoning**. Instead of searching documents, we query the graph ‚Äî traversing relationships explicitly.


## üß© Enter Graphiti + FalkorDB

We‚Äôll use the [**Graphiti**](https://github.com/getzep/graphiti) library ‚Äî a lightweight, production-grade framework for building **temporal knowledge graphs** that integrate directly with LLMs.  

**FalkorDB** is a **high-performance graph database** built on Redis, which we use as the backend for Graphiti. It combines the **speed of in-memory databases** with **Cypher-style graph queries**, making it perfect for real-time AI agents that need to evolve their graph dynamically.

Graphiti uses structured outputs from LLMs to **extract triples**, **store them as graph edges**, and **enable reasoning** through its built-in query APIs and MCP server. Together, Graphiti + FalkorDB create the ideal playground for **GraphRAG agents** ‚Äî ones that can remember, reason, and adapt.

However, let's first start with what it takes to create graphical data. 

In [2]:
import os, json
from typing import List, Optional, Literal, Tuple, Dict
from dotenv import load_dotenv
import numpy as np

from pydantic_ai import Agent, RunContext
from openai import OpenAI

load_dotenv()  # expects OPENROUTER_API_KEY in your environment

OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")

CHAT_MODEL = "openrouter:google/gemini-2.5-flash"
EMBED_MODEL = "openai/text-embedding-3-large"

openai = OpenAI(
    base_url="https://openrouter.ai/api/v1",
    api_key=OPENROUTER_API_KEY,
)

EPISODES = [
    """Ash meets a timid Pichu that later evolves into Pikachu using friendship.
       Pikachu is an Electric-type and often uses Thunderbolt against Team Rocket.""",
    """During a gym battle, Pikachu faces a Ground-type opponent and struggles due to type disadvantage.
       Raichu appears later as Pikachu's evolution with a Thunder Stone.""",
    """Pikachu practices Quick Attack in the forest. Trainers discuss that Electric resists Flying and Steel."""
]

### üß© Define the Schema and Create a Triple-Extraction Agent

Before we can build a knowledge graph, we need to define **what relationships are allowed**. We‚Äôll describe our Pok√©mon world using a small, fixed schema of predicates such as:

- `HAS_TYPE` ‚Äî connects a Pok√©mon to its elemental type  
- `EVOLVES_TO` ‚Äî shows evolution paths  
- `NEEDS_ITEM` ‚Äî evolution dependency (e.g., Thunder Stone)  
- `LEARNS_MOVE` ‚Äî captures learnable moves  
- `WEAK_AGAINST`, `RESISTS` ‚Äî for type matchups  

Using this schema, we‚Äôll create two **Pydantic models**:
1. `Triple` ‚Äî represents one edge (`subject`, `predicate`, `object`)  
2. `BuildKGResult` ‚Äî wraps the list of extracted entities and triples  

Finally, we‚Äôll define a **PydanticAI Agent** called `builder` that takes raw episode text and returns structured triples according to our schema. This mimics how an information-extraction LLM in Graphiti works under the hood ‚Äî but here we do it manually for clarity.


In [3]:
from typing import List, Optional, Literal, Tuple
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext


ValidPredicates = Literal[
    "HAS_TYPE", "EVOLVES_TO", "NEEDS_ITEM", "LEARNS_MOVE", "WEAK_AGAINST", "RESISTS"
]

class Triple(BaseModel):
    subject: str
    predicate: ValidPredicates
    object: str
    fact: Optional[str] = None
    confidence: float = Field(0.9, ge=0.0, le=1.0)

class BuildKGResult(BaseModel):
    entities: List[str]
    triples: List[Triple]

builder = Agent[None, BuildKGResult](
    model=CHAT_MODEL,
    system_prompt=(
        "You are a precise IE system. Extract schema-conformant triples ONLY from the provided episode text.\n"
        "Schema:\n"
        "- Entities: Pokemon/Type/Move/Item are plain strings (e.g., 'Pikachu', 'Electric', 'Thunderbolt', 'Thunder Stone').\n"
        "- Relations: HAS_TYPE(Pokemon‚ÜíType), EVOLVES_TO(Pokemon‚ÜíPokemon), NEEDS_ITEM(Pokemon‚ÜíItem), "
        "LEARNS_MOVE(Pokemon‚ÜíMove), WEAK_AGAINST(Pokemon‚ÜíType), RESISTS(Pokemon‚ÜíType)\n"
        "Return a JSON with 'entities' and 'triples'."
    ),
    output_type=BuildKGResult,
)

### üï∏Ô∏è Build a Minimal In-Memory Graph

Now that we can extract structured triples from episode text, we need a simple data structure to **store them as a graph** ‚Äî with **nodes** and **edges**.

Here we‚Äôll implement a lightweight `MiniGraph` class that:
- Keeps track of **nodes** (unique entity names like *Pikachu*, *Electric*, *Thunderbolt*)  
- Stores **edges** (`subject ‚Üí predicate ‚Üí object`) as `Edge` dataclasses  
- Provides helper methods to generate text corpora of nodes and edges for embedding later

We‚Äôll also use `logfire` to instrument PydanticAI for observability and apply `nest_asyncio` so that async agents can run smoothly inside notebooks.

Finally, we‚Äôll loop through our Pok√©mon episode texts, extract triples using the `builder` agent, and populate the graph.  
This gives us an interpretable **knowledge graph memory** ‚Äî before we move on to embedding and semantic search.

In [4]:
from dataclasses import dataclass

import logfire
import nest_asyncio

nest_asyncio.apply()

logfire.configure()
logfire.instrument_pydantic_ai()

@dataclass
class Edge:
    subject: str
    predicate: str
    object: str
    fact: str = ""
    confidence: float = 1.0

class MiniGraph:
    def __init__(self):
        self.nodes = set()
        self.edges: List[Edge] = []
        # embedding indexes
        self.node_texts: List[str] = []    # e.g., node labels like "Pikachu"
        self.node_vecs: List[List[float]] = []
        self.edge_texts: List[str] = []    # e.g., "(Pikachu)-[HAS_TYPE]->(Electric)"
        self.edge_vecs: List[List[float]] = []

    def add_triple(self, t: Triple):
        self.nodes.add(t.subject); self.nodes.add(t.object)
        self.edges.append(Edge(t.subject, t.predicate, t.object, t.fact or "", t.confidence))

    def node_corpus(self) -> List[str]:
        return sorted(self.nodes)

    def edge_corpus(self) -> List[str]:
        return [f"({e.subject})-[{e.predicate}]->({e.object}) :: {e.fact}" for e in self.edges]

GRAPH = MiniGraph()

def add_episode_to_graph(text: str):
    res = builder.run_sync(f"Episode:\n{text}").output
    for t in res.triples:
        GRAPH.add_triple(t)
    return res

for ep in EPISODES:
    add_episode_to_graph(ep)

print("Nodes:", len(GRAPH.nodes))
print("Edges:", len(GRAPH.edges))

17:22:03.638 builder run
17:22:03.644   chat google/gemini-2.5-flash
17:22:05.713 builder run
17:22:05.714   chat google/gemini-2.5-flash
17:22:07.019 builder run
17:22:07.028   chat google/gemini-2.5-flash
Nodes: 10
Edges: 9


Let's see what graph was generated!

In [5]:
from rich import print as rprint

def pretty_print_graph(graph: MiniGraph):
    rprint(f"[purple]\nüï∏Ô∏è Knowledge Graph Summary[/]")
    rprint(f"[green]Nodes[/] ({len(graph.nodes)}): {', '.join(sorted(graph.nodes))}\n")
    rprint(f"[red]Edges[/] ({len(graph.edges)}):")
    for e in graph.edges:
        print(f"  ({e.subject}) -[{e.predicate}]-> ({e.object})"
              + (f"  | fact: {e.fact}" if e.fact else "")
              + (f"  [conf={e.confidence:.2f}]" if e.confidence != 1.0 else ""))
    print("-" * 60)

pretty_print_graph(GRAPH)

  (Pichu) -[EVOLVES_TO]-> (Pikachu)  | fact: Pichu that later evolves into Pikachu using friendship  [conf=0.90]
  (Pikachu) -[HAS_TYPE]-> (Electric)  | fact: Pikachu is an Electric-type  [conf=0.90]
  (Pikachu) -[LEARNS_MOVE]-> (Thunderbolt)  | fact: Pikachu is an Electric-type and often uses Thunderbolt against Team Rocket  [conf=0.90]
  (Pikachu) -[WEAK_AGAINST]-> (Ground)  [conf=0.90]
  (Pikachu) -[EVOLVES_TO]-> (Raichu)  [conf=0.90]
  (Pikachu) -[NEEDS_ITEM]-> (Thunder Stone)  [conf=0.90]
  (Pikachu) -[LEARNS_MOVE]-> (Quick Attack)  [conf=0.90]
  (Electric) -[RESISTS]-> (Flying)  [conf=0.90]
  (Electric) -[RESISTS]-> (Steel)  [conf=0.90]
------------------------------------------------------------


In [18]:
from copy import deepcopy 

def reflect_and_expand(graph: MiniGraph, max_iters: int = 3):
    prev_edge_count = -1
    iteration = 0

    while iteration < max_iters:
        iteration += 1
        current_count = len(graph.edges)
        if current_count == prev_edge_count:
            print(f"No new edges found after iteration {iteration-1}. Stopping reflection.")
            break

        prev_edge_count = current_count
        graph_state = json.dumps([e.__dict__ for e in graph.edges], indent=2)

        # Ask the same builder agent if new relationships can be added
        prompt = (
            f"Here is the current knowledge graph:\n{graph_state}\n\n"
            "Reflect on it and see if any *implicit* or *missing* relationships "
            "can be derived from this graph. Add only valid new triples, if any. "
            "Return empty if nothing new can be inferred."
        )

        reflection = builder.run_sync(prompt).output

        # Add new edges (if any)
        added = 0
        for t in reflection.triples:
            logfire.info(f"New edge added: {(t.subject, t.predicate, t.object)}")
            key = (t.subject, t.predicate, t.object)
            existing = {(e.subject, e.predicate, e.object) for e in graph.edges}
            if key not in existing:
                graph.add_triple(t)
                added += 1

        print(f"Iteration {iteration}: added {added} new edges. Total now {len(graph.edges)}.")

    return graph

NEWGRAPH = reflect_and_expand(deepcopy(GRAPH))
pretty_print_graph(NEWGRAPH)

11:31:24.225 builder run
11:31:24.226   chat google/gemini-2.5-flash
11:31:25.875 New edge added: ('Raichu', 'WEAK_AGAINST', 'Ground')
Iteration 1: added 1 new edges. Total now 11.
11:31:25.891 builder run
11:31:25.891   chat google/gemini-2.5-flash
11:31:27.201 New edge added: ('Raichu', 'RESISTS', 'Flying')
11:31:27.201 New edge added: ('Raichu', 'RESISTS', 'Steel')
Iteration 2: added 2 new edges. Total now 13.
11:31:27.205 builder run
11:31:27.209   chat google/gemini-2.5-flash
11:31:30.294 New edge added: ('Pikachu', 'RESISTS', 'Flying')
11:31:30.294 New edge added: ('Pikachu', 'RESISTS', 'Steel')
11:31:30.294 New edge added: ('Pikachu', 'RESISTS', 'Electric')
11:31:30.294 New edge added: ('Pikachu', 'WEAK_AGAINST', 'Ground')
Iteration 3: added 3 new edges. Total now 16.


  (Pichu) -[EVOLVES_TO]-> (Pikachu)  | fact: Pichu that later evolves into Pikachu using friendship  [conf=0.90]
  (Pikachu) -[HAS_TYPE]-> (Electric)  | fact: Pikachu is an Electric-type  [conf=0.90]
  (Pikachu) -[LEARNS_MOVE]-> (Thunderbolt)  | fact: often uses Thunderbolt  [conf=0.90]
  (Pikachu) -[WEAK_AGAINST]-> (Ground)  [conf=0.90]
  (Raichu) -[HAS_TYPE]-> (Electric)  [conf=0.90]
  (Pikachu) -[EVOLVES_TO]-> (Raichu)  [conf=0.90]
  (Pikachu) -[NEEDS_ITEM]-> (Thunder Stone)  [conf=0.90]
  (Pikachu) -[LEARNS_MOVE]-> (Quick Attack)  [conf=0.90]
  (Electric) -[RESISTS]-> (Flying)  [conf=0.90]
  (Electric) -[RESISTS]-> (Steel)  [conf=0.90]
  (Raichu) -[WEAK_AGAINST]-> (Ground)  | fact: Because Pikachu is Electric-type and weak to Ground, and Raichu is Electric-type, it is inferrable that Raichu is also weak to Ground.  [conf=0.90]
  (Raichu) -[RESISTS]-> (Flying)  | fact: Because Pikachu is Electric-type and resists Flying, and Raichu is Electric-type, it is inferrable that Raichu also

In this example, our **builder agent** initially extracted direct facts from the Pok√©mon episodes ‚Äî things like *‚ÄúPikachu evolves to Raichu‚Äù* or *‚ÄúPikachu has type Electric.‚Äù*  

When we introduced the **reflection loop**, the agent began to **review its own graph output** and infer **missing or implicit relations**. For instance, it noticed that Pikachu also **resists Flying and Steel** ‚Äî facts implied by its Electric typing but not explicitly mentioned in the text.

This reflective step acts as a lightweight **self-consistency check**:
- It helps the model **fill small gaps** in knowledge by reasoning over the structure it already built.  
- It can correct omissions or low-confidence facts without requiring another dataset.  
- It converges automatically ‚Äî once the graph stabilizes (no new edges are added), the loop stops.

In a larger system, this is the foundation of **agentic knowledge refinement** ‚Äî the same principle used by **Graphiti** and other **GraphRAG** frameworks to keep the knowledge graph both **complete** and **consistent** over time.

### üéØ Embedding and Semantic Search

Now that our mini knowledge graph is built and refined, the next step is to make it **searchable**. We‚Äôll embed both **nodes** (entity names) and **edges** (relationships) into vector space using the **OpenRouter embedding API** (`text-embedding-3-large` by OpenAI, accessed via OpenRouter).

This lets us perform **semantic search** over the graph ‚Äî so instead of keyword lookups, we can find conceptually related entities and relationships.

In this section:
1. We define helper functions to **embed** text and **compute cosine similarity**.  
2. Build vector indexes for all nodes and edges.  
3. Implement simple **search functions** that return the top-K most semantically similar nodes or edges for any query.

This is conceptually similar to what happens in **traditional RAG**, except here we are embedding **graph elements** instead of text chunks ‚Äî a key building block for **GraphRAG** reasoning.

In [19]:
from pprint import pprint

def embed_texts(texts: List[str]) -> List[List[float]]:
    if not texts:
        return []
    resp = openai.embeddings.create(model=EMBED_MODEL, input=texts)
    return [d.embedding for d in resp.data]

def normalize(v: np.ndarray) -> np.ndarray:
    n = np.linalg.norm(v) + 1e-12
    return v / n

def cosine_sim(a: np.ndarray, b: np.ndarray) -> float:
    return float(np.dot(normalize(a), normalize(b)))

def build_vector_indexes():
    GRAPH.node_texts = GRAPH.node_corpus()
    GRAPH.node_vecs = embed_texts(GRAPH.node_texts)
    GRAPH.edge_texts = GRAPH.edge_corpus()
    GRAPH.edge_vecs = embed_texts(GRAPH.edge_texts)

def search_nodes(query: str, k: int = 5) -> List[Tuple[str, float]]:
    if not GRAPH.node_texts:
        return []
    qv = embed_texts([query])[0]
    sims = [cosine_sim(np.array(qv), np.array(v)) for v in GRAPH.node_vecs]
    ranked = sorted(zip(GRAPH.node_texts, sims), key=lambda x: x[1], reverse=True)
    return ranked[:k]

def search_edges(query: str, k: int = 5) -> List[Tuple[str, float]]:
    if not GRAPH.edge_texts:
        return []
    qv = embed_texts([query])[0]
    sims = [cosine_sim(np.array(qv), np.array(v)) for v in GRAPH.edge_vecs]
    ranked = sorted(zip(GRAPH.edge_texts, sims), key=lambda x: x[1], reverse=True)
    return ranked[:k]

build_vector_indexes()

print("Top nodes for 'Pikachu evolution item':")
pprint(search_nodes("Pikachu evolution item"))

print("Top edges for 'type disadvantage against Ground':")
pprint(search_edges("type disadvantage against Ground"))


Top nodes for 'Pikachu evolution item':
[('Pikachu', 0.6038109820985103),
 ('Pichu', 0.5137344454239156),
 ('Raichu', 0.49602330682813606),
 ('Thunder Stone', 0.4325935945057776),
 ('Quick Attack', 0.34751407732015965)]
Top edges for 'type disadvantage against Ground':
[('(Pikachu)-[WEAK_AGAINST]->(Ground) :: ', 0.6187038417597363),
 ('(Electric)-[RESISTS]->(Flying) :: ', 0.4096086571588773),
 ('(Electric)-[RESISTS]->(Steel) :: ', 0.38052656641427385),
 ('(Pikachu)-[LEARNS_MOVE]->(Thunderbolt) :: often uses Thunderbolt',
  0.34112547965992024),
 ('(Pikachu)-[HAS_TYPE]->(Electric) :: Pikachu is an Electric-type',
  0.31840167619153525)]


### üß≠ What is an Ontology (and why it matters for Graph/GraphRAG)?

An **ontology** is a formal, shared specification of the **concepts (classes)** in a domain, their **attributes (properties)**, and the **relationships** among them.  
In graph terms, it defines:
- **Entity types** (e.g., `Pokemon`, `Type`, `Move`, `Item`)
- **Attributes** on entities (e.g., `Pokemon.stage`, `Move.power`)
- **Relation types** (e.g., `HAS_TYPE`, `EVOLVES_TO`, `LEARNS_MOVE`)
- **Domain/Range constraints** (what can connect to what) and sometimes **cardinalities** (e.g., `Pokemon HAS_TYPE Type`)

A good ontology:
- **Reduces hallucinations** by constraining what can be asserted
- **Improves explainability** because answers refer to explicit entities/relations
- **Enables reusable reasoning** across tasks (querying, validation, analytics)

See more on [FalkorDB's blog](https://www.falkordb.com/blog/understanding-ontologies-knowledge-graph-schemas/).

**Pragmatic recipe to design one**
1. List **core entities** and the questions you must answer.  
2. Define **relations** that connect those entities (domain/range).  
3. Add **attributes** needed for reasoning (and keep the rest out).  
4. Start small; **iterate** with real data; add constraints as you go.  

Allthough ontologies can be created by LLMs like below:

In [27]:
class OntologyAttribute(BaseModel):
    name: str
    dtype: Literal["string","int","float","bool","datetime","enum","id"] = "string"
    description: Optional[str] = None
    required: bool = False

class OntologyClass(BaseModel):
    name: str
    description: Optional[str] = None
    attributes: List[OntologyAttribute] = Field(default_factory=list)

class OntologyRelation(BaseModel):
    name: str
    description: Optional[str] = None
    domain: str  # class name
    range: str   # class name

class OntologyProposal(BaseModel):
    classes: List[OntologyClass]
    relations: List[OntologyRelation]
    notes: Optional[str] = None

ontology_suggester = Agent(
    model=CHAT_MODEL,
    system_prompt=(
        "You are an ontology engineer. Given example domain text, propose a SMALL, "
        "pragmatic ontology capturing key classes, attributes, and relations. "
        "Keep it minimal but sufficient for QA and reasoning. Prefer concise names. "
        "Return structured JSON matching OntologyProposal."
    ),
    output_type=OntologyProposal,
)

def suggest_ontology_from_examples(texts: List[str]) -> OntologyProposal:
    corpus = "\n\n---\n\n".join(texts)
    prompt = (
        "Domain examples:\n"
        f"{corpus}\n\n"
        "Requirements:\n"
        "- Classes should include Pokemon, Type, Move, and Item if present.\n"
        "- Add minimal attributes that are useful for Q&A (e.g., power for moves, stage for pokemon).\n"
        "- Add relations like HAS_TYPE, EVOLVES_TO, NEEDS_ITEM, LEARNS_MOVE, WEAK_AGAINST, RESISTS.\n"
        "- You may add brief descriptions.\n"
        "- Keep it compact. Avoid unnecessary ontology."
    )
    return ontology_suggester.run_sync(prompt).output

proposal = suggest_ontology_from_examples(EPISODES)

print("=== Ontology Proposal ===")
print(json.dumps(proposal.model_dump(), indent=2))

11:34:01.671 ontology_suggester run
11:34:01.674   chat google/gemini-2.5-flash
=== Ontology Proposal ===
{
  "classes": [
    {
      "name": "Pokemon",
      "description": "A creature with unique abilities and types.",
      "attributes": [
        {
          "name": "name",
          "dtype": "string",
          "description": "The name of the Pokemon",
          "required": false
        },
        {
          "name": "stage",
          "dtype": "int",
          "description": "Evolution stage of the Pokemon",
          "required": false
        }
      ]
    },
    {
      "name": "Type",
      "description": "A category that defines a Pokemon's strengths and weaknesses.",
      "attributes": [
        {
          "name": "name",
          "dtype": "string",
          "description": "The name of the type (e.g., Electric, Ground)",
          "required": false
        }
      ]
    },
    {
      "name": "Move",
      "description": "An action a Pokemon can perform in battle.",
  

## üß© Graphiti abstraction

All these embedding and search operations are automatically handled inside **Graphiti**.  
It provides:
- Configurable **embedders** and **cross-encoders** for reranking  
- Persistent **vector indexes** linked to graph nodes  
- Integration with real graph backends (e.g., **FalkorDB**, **Neo4j**)  
- APIs to search, rank, and traverse the graph directly  

So while we‚Äôre writing these utilities manually here to understand the mechanics, in the next section we‚Äôll switch to **Graphiti**, which abstracts away all this boilerplate and provides a much more powerful, production-ready interface.

Let's first create the FalkorDB as a backend for Graphiti. 

In [None]:
from graphiti_core import Graphiti
from graphiti_core.cross_encoder.openai_reranker_client import OpenAIRerankerClient
from graphiti_core.llm_client.openai_client import OpenAIClient
from graphiti_core.embedder.openai import OpenAIEmbedder, OpenAIEmbedderConfig
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core.llm_client.config import LLMConfig

from dotenv import load_dotenv
import os

from src.falkordb_setup import run_falkordb, save_db

load_dotenv()

falkordb_container = run_falkordb()

‚è¨ Pulling falkordb/falkordb:latest (if needed)...
üöÄ Starting FalkorDB with persistence at C:\Users\SHRESHTH\Desktop\build-your-own-super-agents\db\falkordb_data
   - Container: 6ee093a3de57
   - UI: http://localhost:3000  |  Redis: localhost:6379


In **Graphiti**, these three components play the same roles as in a traditional RAG pipeline ‚Äî but for **graph-based reasoning** instead of plain text retrieval.

1. **LLM Client (`OpenAIGenericClient`)**  
   - This wraps the language model endpoint (in our case, **Gemini 2.5 Flash** via OpenRouter).  
   - It‚Äôs used for all generative tasks inside Graphiti ‚Äî such as extracting triples, summarizing nodes, or generating context-aware graph queries.

2. **Embedder (`OpenAIEmbedder`)**  
   - Similar to the vector embedder in RAG, it converts text, entity names, or relationships into dense embeddings for **semantic similarity search** within the graph.  
   - We use `text-embedding-3-large` from OpenAI to create these embeddings, allowing Graphiti to find related nodes or documents efficiently.

3. **Cross-Encoder / Re-ranker (`OpenAIRerankerClient`)**  
   - After retrieval, multiple candidate nodes or subgraphs may be found.  
   - The reranker uses a small LLM to **score and reorder** these candidates based on their semantic relevance to the query, improving precision.  
   - This is analogous to the reranking step in advanced RAG setups.

Together, these components form the **reasoning and retrieval core** of Graphiti. *The embedder finds relevant graph pieces, the reranker prioritizes them, and the LLM client performs reasoning over the final context.*

In [9]:
from graphiti_core.utils.maintenance.graph_data_operations import clear_data
from datetime import datetime

llm_config = LLMConfig(api_key=os.getenv("OPENROUTER_API_KEY"), 
                       base_url="https://openrouter.ai/api/v1", 
                       model="x-ai/grok-4-fast",
                       small_model="x-ai/grok-4-fast")
client = OpenAIClient(config=llm_config, reasoning='medium')

embedder_config = OpenAIEmbedderConfig(api_key=os.getenv("OPENROUTER_API_KEY"),
                                       base_url="https://openrouter.ai/api/v1",
                                       embedding_model="openai/text-embedding-3-large")
embedder = OpenAIEmbedder(embedder_config)

reranker = OpenAIRerankerClient(llm_config)

driver = FalkorDriver()

### üß± Defining Entity and Relationship Schemas for Graphiti

Now that we understand how knowledge graphs can be built manually, let‚Äôs formalize our Pok√©mon world using **Graphiti‚Äôs structured schema definitions**.

We define:
- **Entity types** like `Pokemon`, `Type`, `Move`, and `Item` ‚Äî each with optional attributes (e.g., `stage`, `power`, `effect`).
- **Edge types** like `HAS_TYPE`, `EVOLVES_TO`, `LEARNS_MOVE`, etc. ‚Äî describing allowed relationships between entities.

The `edge_type_map` explicitly specifies **which relationships are permitted** between each entity pair (e.g., `Pokemon ‚Üí Type` can have `HAS_TYPE`, `WEAK_AGAINST`, or `RESISTS`).

Finally, we initialize a **Graphiti instance** connected to the FalkorDB driver,  
and load our Pok√©mon episode data into it using `add_episode()`.  
This automatically handles:
- LLM-based triple extraction  
- Schema validation  
- Embedding and storage in the graph backend  

Essentially, this is the **Graphiti abstraction** over everything we built manually earlier ‚Äî offering schema-aware KG construction, persistence, and reasoning in one unified interface.

For our use-case, we define a fixed ontology as follows. We will use [Graphiti's custom entities and edges](https://help.getzep.com/graphiti/core-concepts/custom-entity-and-edge-types) to encode this ontology for our knowledge graph.

In [8]:
from graphiti_core.nodes import EpisodeType
from pathlib import Path
import os 

# Entities
class Pokemon(BaseModel):
    """A Pokemon species or evolutionary form."""
    stage: Optional[int] = Field(None, description="Evolution stage number (e.g., Pichu is 1, Pikachu is 2, Raichu is 3)")

class Type(BaseModel):
    """Elemental typing such as Electric, Ground, Flying."""
    category: Optional[str] = Field(None, description="Damage class or grouping if applicable")

class Move(BaseModel):
    """A move a Pokemon can learn or use."""
    power: Optional[int] = Field(None, description="Base power if applicable")
    move_type: Optional[str] = Field(None, description="Type of the move, e.g., Electric")

class Item(BaseModel):
    """An evolution or battle item."""
    effect: Optional[str] = Field(None, description="Short description of the item effect")

# Edges
class HasType(BaseModel):
    """Pokemon ‚Üí Type"""
    pass

class EvolvesTo(BaseModel):
    """Pokemon ‚Üí Pokemon"""
    method: Optional[str] = Field(None, description="Evolution method (friendship, level, etc.)")

class NeedsItem(BaseModel):
    """Pokemon ‚Üí Item"""
    reason: Optional[str] = Field(None, description="Why the item is required (e.g., evolve)")

class LearnsMove(BaseModel):
    """Pokemon ‚Üí Move"""
    learn_method: Optional[str] = Field(None, description="TM/TR/Level-up/etc.")
    level: Optional[int] = Field(None, description="Level when learned, if applicable")

class WeakAgainst(BaseModel):
    """Pokemon ‚Üí Type"""
    note: Optional[str] = Field(None, description="Context note")

class Resists(BaseModel):
    """Pokemon ‚Üí Type"""
    note: Optional[str] = Field(None, description="Context note")

# Entity and edge registries
entity_types: Dict[str, type] = {
    "Pokemon": Pokemon,
    "Type": Type,
    "Move": Move,
    "Item": Item,
}

edge_types: Dict[str, type] = {
    "HAS_TYPE": HasType,
    "EVOLVES_TO": EvolvesTo,
    "NEEDS_ITEM": NeedsItem,
    "LEARNS_MOVE": LearnsMove,
    "WEAK_AGAINST": WeakAgainst,
    "RESISTS": Resists,
}

# Which edge types are allowed between which entity pairs
edge_type_map: Dict[Tuple[str, str], List[str]] = {
    ("Pokemon", "Type"): ["HAS_TYPE", "WEAK_AGAINST", "RESISTS"],
    ("Pokemon", "Pokemon"): ["EVOLVES_TO"],
    ("Pokemon", "Item"): ["NEEDS_ITEM"],
    ("Pokemon", "Move"): ["LEARNS_MOVE"],
}

### üé¨ Splitting Text into Self-Contained Episodes

Before adding data into the knowledge graph, we need to break long Pok√©mon narratives  
(such as full transcripts or story summaries) into smaller, **coherent segments** called *episodes*.

Each `Episode` should represent a complete scene or event ‚Äî containing enough context  
for the LLM to extract entities and relationships without depending on other segments.

In this step:
- We define a **Pydantic model** `Episode` with fields for `name`, `episode_body`, and `source_description`.  
  A `field_validator` ensures titles are short and clean.
- We create a `EpisodesResult` wrapper to hold multiple episodes.
- We then use a **PydanticAI Agent**, `episode_generator`, which takes a long Pok√©mon text and  
  splits it into coherent `Episode` objects.

This segmentation step ensures the graph builder later works on **focused, semantically consistent chunks**,  
just like scene segmentation in a movie ‚Äî enabling better entity extraction and cleaner graph structure.


In [11]:
from pydantic import BaseModel, Field, field_validator

class Episode(BaseModel):
    """A coherent segment suitable for graph extraction."""
    name: str = Field(..., description="Short, unique episode title (e.g., 'Gym Battle in Pewter').")
    episode_body: str = Field(..., min_length=120, description="Self-contained text of the episode.")
    source_description: Optional[str] = Field("episode", description="Provenance label (default: 'episode').")

    @field_validator("name")
    @classmethod
    def strip_name(cls, v: str) -> str:
        v = v.strip()
        if len(v) > 120:
            v = v[:117] + "..."
        return v

class EpisodesResult(BaseModel):
    episodes: List[Episode]

episode_generator = Agent(
    model=CHAT_MODEL,
    system_prompt=(
        f"""
        You are an expert segmenter. Split a long Pok√©mon-related context into coherent EPISODES.

        Rules:
        - Prioritize coherence over exact length.
        - Each episode must be self-contained: enough detail so downstream IE can extract entities/relations without cross-episode references.
        - Prefer semantic boundaries: scene changes, locations, battles, new characters/pokemon, or topic shifts.
        - Titles should be short, unique, and descriptive.
        - Respect chronology if provided; otherwise, group by topical coherence.
        - Keep `source_description="episode"` unless the input explicitly suggests otherwise (e.g., 'movie recap', 'blog post', etc.).
        - NEVER fabricate content beyond the given text. If info is uncertain, omit it.

        Output strictly as EpisodesResult JSON.
        """
    ),
    output_type=EpisodesResult
)

response = episode_generator.run_sync(EPISODES[0])
rprint(response.output)

17:24:10.483 episode_generator run
17:24:10.483   chat google/gemini-2.5-flash
17:24:12.554   chat google/gemini-2.5-flash


### ‚öôÔ∏è Loading and Processing Pok√©mon Episodes into Graphiti

Now that we‚Äôve defined our **episode segmentation agent** and **graph schema**,  
we can put everything together to build a **complete knowledge graph** using **Graphiti**.

In this step:
1. **Initialize Graphiti** with:
   - `graph_driver` ‚Üí our FalkorDB backend  
   - `llm_client`, `embedder`, and `cross_encoder` ‚Üí for triple extraction, embedding, and reranking  
   - `store_raw_episode_content=False` ‚Üí skips saving large text blobs to keep storage light

2. **Prepare the environment**:
   - `clear_data()` wipes any existing graph data.  
   - `build_indices_and_constraints()` sets up indexes and schema-level constraints in the database.

3. **Process local Pok√©mon markdown files**:
   - Each file in `data/pokemon_md/` contains a text segment describing Pok√©mon interactions or battles.  
   - We pass each file through the `episode_generator`, which splits it into coherent episodes.  
   - Then each `Episode` is passed to `graphiti.add_episode()`, which:
     - Extracts entities and relationships,  
     - Embeds and links them,  
     - Inserts them into the graph database with timestamps and group metadata.

üîÅ This creates a **structured, queryable knowledge graph** from unstructured Pok√©mon text ‚Äî and demonstrates how Graphiti unifies the full workflow (segmentation ‚Üí extraction ‚Üí embedding ‚Üí persistence) that we previously built manually in separate steps.


In [None]:
await graphiti.search('')

In [None]:
from tqdm import tqdm_notebook as tqdm

graphiti = Graphiti(graph_driver=driver, llm_client=client, embedder=embedder, cross_encoder=reranker, store_raw_episode_content=False)

await clear_data(graphiti.driver)
await graphiti.build_indices_and_constraints(delete_existing=True)

DB_FILES = "data/pokemon_md/"

episodes = []
for filename in os.listdir(DB_FILES):
    episodes.append(Path(DB_FILES + filename).read_text(encoding='utf-8'))

for episode in tqdm(episodes[:2], "Processing Files"):
    response = episode_generator.run_sync(episode)
    for gep in tqdm(response.output.episodes[:1], desc="Processing episodes"):
        await graphiti.add_episode(name=gep.name, 
                                episode_body=gep.episode_body, 
                                source_description=gep.source_description, 
                                source=EpisodeType.text, 
                                reference_time=datetime.now(),
                                group_id="pokemon_data_tmp", 
                                entity_types=entity_types, 
                                edge_types=edge_types, 
                                edge_type_map=edge_type_map)
    

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for episode in tqdm(episodes[:2], "Processing Files"):


Processing Files:   0%|          | 0/2 [00:00<?, ?it/s]

17:24:19.568 episode_generator run
17:24:19.571   chat google/gemini-2.5-flash


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for gep in tqdm(response.output.episodes[:1], desc="Processing episodes"):


Processing episodes:   0%|          | 0/1 [00:00<?, ?it/s]

17:25:45.096 episode_generator run
17:25:45.099   chat google/gemini-2.5-flash


Processing episodes:   0%|          | 0/1 [00:00<?, ?it/s]

In [15]:
save_db(falkordb_container)

‚úÖ Saved dump.rdb in db/falkordb_data/dump.rdb


In [37]:
from src.graphiti_utils import export_graph

export_graph()

‚úÖ Exported 3 nodes with label 'Entity' to db/falkordb_data/nodes_Entity.csv
‚úÖ Exported 2 nodes with label 'Pokemon' to db/falkordb_data/nodes_Pokemon.csv
‚úÖ Exported 1 nodes with label 'Episodic' to db/falkordb_data/nodes_Episodic.csv
‚úÖ Exported 5 edges of type 'RELATES_TO' to db/falkordb_data/edges_RELATES_TO.csv
‚úÖ Exported 3 edges of type 'MENTIONS' to db/falkordb_data/edges_MENTIONS.csv

üìä Summary:
   Node labels exported: 3
   Edge types exported: 2


In [14]:
await graphiti.close()

In [None]:
from graphiti_core.edges import EntityEdge

results = await graphiti.search('Charizard')

def pretty_print(entity_edge: EntityEdge):
    e_dict = entity_edge.model_dump().items()
    return {"source": e_dict}

for result in results[:2]:
    pretty_print(result)

In [None]:
from src.graphiti_utils import pretty_print

print("\nSearching for: 'Pikachu evolution item'")
results = await graphiti.search('Pikachu evolution item')

print('\nSearch Results:')
for result in results[:2]:
    pretty_print(result)

print("\nSearching for: 'type disadvantage against Ground'")
results = await graphiti.search('type disadvantage against Ground')

print('\nSearch Results:')
for result in results[:2]:
    pretty_print(result)


Searching for: 'Pikachu evolution item'

Search Results:



Searching for: 'type disadvantage against Ground'

Search Results:


## 2. Schema: Entities & Relationships

We‚Äôll constrain the LLM to produce **triples** under a small ontology.  
This reduces hallucinations and keeps the graph clean.

**Entities**
- `Pokemon(name)`
- `Type(name)`
- `Move(name)`
- `Item(name)`

**Relationships (directed)**
- `HAS_TYPE(Pokemon ‚Üí Type)`
- `EVOLVES_TO(Pokemon ‚Üí Pokemon)`
- `NEEDS_ITEM(Pokemon ‚Üí Item)` *(for evolutions that need an item)*
- `LEARNS_MOVE(Pokemon ‚Üí Move)`
- `WEAK_AGAINST(Pokemon ‚Üí Type)`
- `RESISTS(Pokemon ‚Üí Type)`

We‚Äôll create **Pydantic models** for the structured output the builder agent must return.

In [None]:
from typing import Literal

ValidPredicates = Literal[
    "HAS_TYPE", "EVOLVES_TO", "NEEDS_ITEM", "LEARNS_MOVE", "WEAK_AGAINST", "RESISTS"
]

class Triple(BaseModel):
    subject: str = Field(description="Entity name (e.g., 'Pikachu')")
    predicate: ValidPredicates
    object: str = Field(description="Entity name (e.g., 'Electric')")
    fact: Optional[str] = Field(default=None, description="Optional natural language gloss for the edge")
    # optional metadata
    confidence: float = Field(ge=0.0, le=1.0, default=0.9)

class BuildKGResult(BaseModel):
    entities: List[str] = Field(description="All entity names referenced in triples")
    triples: List[Triple]

## 3. KG Builder Agent (with self‚Äëreflection)

We create two agents:
1. **Builder** ‚Äî extracts **schema‚Äëvalid triples** for a user query, constrained to our tiny dataset.
2. **Critic** ‚Äî validates the builder‚Äôs output for **schema, consistency, and data grounding**; suggests a corrected set if needed.

We‚Äôll run a simple **reflect‚Äërevise loop** up to 2 rounds.

In [None]:
from pydantic_ai.models.openai import OpenAIModel

model = OpenAIModel(model=MODEL)

builder = Agent[None, BuildKGResult](
    model=model,
    system_prompt=(
        "You are a strict KG builder. Extract schema-conformant triples ONLY from the provided Pikachu dataset. "
        "Never invent new pokemon, types, or moves. The schema is:\n"
        "- Entities: Pokemon, Type, Move, Item (use names as strings)\n"
        "- Rels: HAS_TYPE(Pokemon‚ÜíType), EVOLVES_TO(Pokemon‚ÜíPokemon), NEEDS_ITEM(Pokemon‚ÜíItem), "
        "LEARNS_MOVE(Pokemon‚ÜíMove), WEAK_AGAINST(Pokemon‚ÜíType), RESISTS(Pokemon‚ÜíType)\n"
        "Return entities + triples as structured JSON. Keep subjects/objects as plain names."
    ),
    result_type=BuildKGResult,
)

class Critique(BaseModel):
    ok: bool
    reasons: List[str] = []
    corrected: Optional[BuildKGResult] = None

critic = Agent[BuildKGResult, Critique](
    model=model,
    system_prompt=(
        "You are a KG critic. Given a candidate BuildKGResult and the Pikachu dataset, verify:\n"
        "1) predicates are from the allowed set, 2) all entities appear in dataset, 3) no contradictions, "
        "4) triples grounded in data. If any problem, set ok=false and return a corrected BuildKGResult."
    ),
    deps_type=BuildKGResult,  # the input to the agent
    result_type=Critique,
)

In [None]:
def dataset_entities() -> set:
    names = set()
    for p in PIKACHU_DATA["pokemon"]:
        names.add(p["name"])
        for t in p["types"]:
            names.add(t)
        for a in p["abilities"]:
            names.add(a)  # we won't use Ability as an entity, but keep for checks
        for w in p["weak_against"]:
            names.add(w)
        for r in p["resists"]:
            names.add(r)
        if p["evolves_to"]:
            names.add(p["evolves_to"])
        if p["evolution_item"]:
            names.add(p["evolution_item"])
    for m in PIKACHU_DATA["moves"]:
        names.add(m["name"]); names.add(m["type"])
    return names

ALL_KNOWN = dataset_entities()

def seed_moves_for(pokemon_name: str) -> List[str]:
    # For demo, give a couple of canonical moves
    base = {
        "Pichu": ["Thunder Wave", "Quick Attack"],
        "Pikachu": ["Thunderbolt", "Quick Attack"],
        "Raichu": ["Thunderbolt"]
    }
    return base.get(pokemon_name, [])

def build_kg_for_query(query: str, max_reflections: int = 2) -> BuildKGResult:
    # 1) Initial draft
    draft = builder.run_sync(f"Query: {query}\nDataset (JSON): {json.dumps(PIKACHU_DATA)}").output

    # 2) Critique loop
    current = draft
    for i in range(max_reflections):
        critique = critic.run_sync(
            deps=current,
            user_message=f"Check this candidate against dataset. Return ok and corrected if needed."
        ).output
        if critique.ok:
            return current
        if critique.corrected is not None:
            current = critique.corrected
    return current

# quick smoke test
res = build_kg_for_query("How does Pikachu evolve and what item is needed? Also list its type and a typical move.")
print("Triples:", len(res.triples))
print(res.triples[:3])

## 4. Persisting to Graphiti

We‚Äôll keep a single graph instance for the demo and add triples to it.  
If Graphiti is not installed here, we‚Äôll use a small in‚Äëmemory shim with a compatible `add_triplet()` / `neighborhood()` API.

In [None]:
GRAPH = Graph()

def add_build_result_to_graph(result: BuildKGResult):
    for tri in result.triples:
        GRAPH.add_triplet(tri.subject, tri.predicate, tri.object, fact=tri.fact or "", confidence=tri.confidence)

result = build_kg_for_query("Create a small KG for Pikachu: type, evolution chain, weaknesses, and one move.")
add_build_result_to_graph(result)
print("Graph nodes:", getattr(GRAPH, "nodes", "N/A (backend-managed)"))
try:
    print("Edges stored:", len(GRAPH.edges))
except Exception:
    print("Edges stored: backend-managed")

## 5. KG Answering Agent (with **KG builder tool**)

The answering agent can:
1. **Query the existing KG** for facts, and
2. **If missing**, call the **`build_kg_for_query` tool** to generate & persist the relevant subgraph, then answer.

This keeps answers **precise and grounded**.

In [None]:
class Answer(BaseModel):
    answer: str
    used_builder: bool = False
    evidence: List[Tuple[str, str, str]] = Field(default_factory=list, description="subset of triples (s,p,o) used")

answer_agent = Agent[None, Answer](
    model=model,
    system_prompt=(
        "You answer questions strictly from the Knowledge Graph.\n"
        "If the graph does not have sufficient edges to answer, call `make_or_update_graph` tool first, then answer.\n"
        "Cite a few (s,p,o) triples used in `evidence`. Be concise and precise."
    ),
    result_type=Answer
)

@answer_agent.tool
def make_or_update_graph(ctx: RunContext[None], query: str) -> str:
    \"\"\"Generate/augment the knowledge graph for the user query and persist it; returns a short status string.\"\"\"
    result = build_kg_for_query(query)
    add_build_result_to_graph(result)
    return f\"Added {len(result.triples)} triples for query: {query}\"

def query_graph_edges(subject=None, predicate=None, object=None):
    try:
        return GRAPH.find_edges(subject=subject, predicate=predicate, object=object)
    except Exception:
        # If using a real backend, you'd use Graphiti's query API here
        return []

def answer_with_graph(question: str) -> Answer:
    # Optional: pre-check simple availability to guide the LLM
    neigh = getattr(GRAPH, "neighborhood", None)
    hint = ""
    if neigh is not None:
        # naive hint: do we have any neighbors for the main mentioned entity?
        if "Pikachu" in question:
            neighborhood = GRAPH.neighborhood("Pikachu")
            if not neighborhood:
                hint = "(Graph appears sparse for Pikachu; consider calling the builder tool.)"
    return answer_agent.run_sync(f\"{hint}\\nQuestion: {question}\").output

demo = answer_with_graph("What item evolves Pikachu, and what type is Pikachu?")
print(demo.answer, "\\nUsed builder:", demo.used_builder, "\\nEvidence:", demo.evidence[:3])

## 6. Demos

Try a few queries. The agent will use the graph if possible and call the builder tool if necessary.

In [None]:
for q in [
    "List Pikachu's evolution chain.",
    "Is Pikachu weak against Ground?",
    "Give Pikachu's type and one move it commonly uses.",
    "What item is needed to evolve Pikachu?"
]:
    out = answer_with_graph(q)
    print("Q:", q)
    print("A:", out.answer)
    print("Used builder tool:", out.used_builder)
    print("Evidence:", out.evidence[:2])
    print("-"*60)

## 7. Reliability: lightweight checks

You can add more *guardrails*:
- **Result validator** to clip/normalize fields or reject low‚Äëconfidence triples.
- **Critic iterations** > 2 for tougher tasks.
- **Schema hardening**: restrict entity strings to known lists, require certain edges per query type, etc.

In [None]:
# Example: clip confidence to [0.0, 1.0] and drop very low-confidence edges before persisting
def sanitize(result: BuildKGResult, min_conf: float = 0.4) -> BuildKGResult:
    keep = []
    for tri in result.triples:
        tri.confidence = max(0.0, min(1.0, tri.confidence))
        if tri.confidence >= min_conf:
            keep.append(tri)
    return BuildKGResult(entities=sorted(set(result.entities)), triples=keep)

# Use it like:
r = build_kg_for_query("Ensure one move and type for Pikachu")
r = sanitize(r, min_conf=0.5)
add_build_result_to_graph(r)
print("Sanitized & persisted", len(r.triples), "triples.")

## 8. Backend choices & performance notes

- Graphiti supports **Neo4j, FalkorDB, K√πzu, Amazon Neptune** backends and uses **OpenSearch** for full‚Äëtext where relevant.  
  See Graphiti README **Requirements** and quickstart.  
- Try **FalkorDB** with Docker for a zero‚Äësetup local graph; or use **Neo4j Desktop**.

> If using FalkorDB/Neo4j, replace the shim here with real Graphiti usage and its query utilities; method names like `add_triplet` and neighborhood/edge lookup map onto Graphiti‚Äôs graph API.

## 9. References & further reading

- Graphiti overview & install (README) ‚Äî *requirements, structured output note, backends, MCP server*:  
  - https://github.com/getzep/graphiti  
  - https://help.getzep.com/graphiti/getting-started/overview  
  - MCP server intro: https://help.getzep.com/graphiti/getting-started/mcp-server

- PydanticAI ‚Äî *tools & structured outputs*:  
  - Tools: https://ai.pydantic.dev/tools/  
  - Output / structured results: https://ai.pydantic.dev/output/

- Why KG for agents (temporal, dynamic):  
  - Neo4j blog summary of Graphiti: https://neo4j.com/blog/developer/graphiti-knowledge-graph-memory/

## 10. What you learned üí°

- Designing a **schema-first** KG eliminates a lot of mess.
- A **builder‚Üícritic** loop catches schema violations & hallucinations early.
- An **answering agent with a KG-builder tool** gives **on‚Äëdemand graphing** + **precise answers**.
- This pattern scales to larger domains and document corpora (‚Üí **GraphRAG**). In the next tutorial, we‚Äôll compare **dynamic Graphiti** with **precomputed GraphRAG** and hybridize them.