# Lesson 5 - Knowledge Graph Construction

With all the plans in place, it's time to construct the knowledge graph.

## Agent

- An agent that proposes a schema for the knowledge graph, based on the established user goal.
- Input: `approved_user_goal`, `approved_files`, `approved_construction_plan`
- Output: `approved_construction_plan`, a dictionary containing the construction plan for the knowledge graph.
- Tools: `get_approved_user_goal`, `get_approved_files`, `sample_file`, 
        `propose_entity_extraction`, `propose_relationship_extraction`, `approve_proposed_construction_plan`

## Workflow

1. The context is initialized with an `approved_user_goal` and `approved_files`
2. For each file, determine whether it represents a node or a relationship.
3. For each node file, propose an entity extraction (file --> label, properties).
4. For each relationship file, propose a relationship extraction (file --> source and target nodes, relationship type and properties).
5. Present the construction proposal and ask for approval.
6. The user approves the construction proposal.
7. The construction proposal is saved in the context state as `approve_proposed_construction_plan`.


## Setup

The usual import of needed libraries, loading of environment variables, and connection to Neo4j.

In [None]:
# Import necessary libraries
import os
from pathlib import Path

from itertools import islice

from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm # For OpenAI support
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.tools import ToolContext
from google.genai import types # For creating message Content/Parts

# For type hints
from typing import Dict, Any, List

# Convenience libraries for working with Neo4j inside of Google ADK
from neo4j_for_adk import graphdb, tool_success, tool_error

import warnings
# Ignore all warnings
warnings.filterwarnings("ignore")

import logging
logging.basicConfig(level=logging.CRITICAL)

print("Libraries imported.")

In [None]:
# --- Define Model Constants for easier use ---
MODEL_GPT_5 = "openai/gpt-5"

llm = LiteLlm(model=MODEL_GPT_5)

# Test LLM with a direct call
print(llm.llm_client.completion(model=llm.model, messages=[{"role": "user", "content": "Are you ready?"}], tools=[]))

print("\nOpenAI ready.")

In [None]:
# Check connection to Neo4j by sending a query

neo4j_is_ready = graphdb.send_query("RETURN 'Neo4j is Ready!' as message")

print(neo4j_is_ready)

--- 

## Structured Data Agent

The structured data agent is responsible for constructing the "domain graph" from structured CSV files,
according to the approved construction plan.



First, you'll define some helper tools to explicitly work with a list of entity types.

- `proposed_entities` will be the result of agent analysis, offered to the user for approval
- `approved_entities` will be the final list of entity types to extract from unstructured text

In [None]:
# the approved constrction plan should look something like this...
approved_construction_plan = {
    "Assembly": {
        "construction_type": "node", 
        "source_file": "assemblies.csv", 
        "label": "Assembly", 
        "unique_column_name": "assembly_id", 
        "properties": ["assembly_name", "quantity", "product_id"]
    }, 
    "Part": {
        "construction_type": "node", 
        "source_file": "parts.csv", 
        "label": "Part", 
        "unique_column_name": "part_id", 
        "properties": ["part_name", "quantity", "assembly_id"]
    }, 
    "Product": {
        "construction_type": "node", 
        "source_file": "products.csv", 
        "label": "Product", 
        "unique_column_name": "product_id", 
        "properties": ["product_name", "price", "description"]
    }, 
    "Supplier": {
        "construction_type": "node", 
        "source_file": "suppliers.csv", 
        "label": "Supplier", 
        "unique_column_name": "supplier_id", 
        "properties": ["name", "specialty", "city", "country", "website", "contact_email"]
    }, 
    "Contains": {
        "construction_type": "relationship", 
        "source_file": "assemblies.csv", 
        "relationship_type": "Contains", 
        "from_node_label": "Product", 
        "from_node_column": "product_id", 
        "to_node_label": "Assembly", 
        "to_node_column": "assembly_id", 
        "properties": ["quantity"]
    }, 
    "Is_Part_Of": {
        "construction_type": "relationship", 
        "source_file": "parts.csv", 
        "relationship_type": "Is_Part_Of", 
        "from_node_label": "Part", 
        "from_node_column": "part_id", 
        "to_node_label": "Assembly", 
        "to_node_column": "assembly_id", 
        "properties": ["quantity"]
    }, 
    "Supplied_By": {
        "construction_type": "relationship", 
        "source_file": "part_supplier_mapping.csv", 
        "relationship_type": "Supplied_By", 
        "from_node_label": "Part", 
        "from_node_column": "part_id", 
        "to_node_label": "Supplier", 
        "to_node_column": "supplier_id", 
        "properties": ["supplier_name", "lead_time_days", "unit_cost", "minimum_order_quantity", "preferred_supplier"]
    }
}


In [None]:
from helper import get_neo4j_import_dir


In [None]:
def create_uniqueness_constraint(
    label: str,
    unique_property_key: str,
) -> Dict[str, Any]:
    """Creates a uniqueness constraint for a node label and property key.
    A uniqueness constraint ensures that no two nodes with the same label and property key have the same value.
    This improves the performance and integrity of data import and later queries.

    Args:
        label: The label of the node to create a constraint for.
        unique_property_key: The property key that should have a unique value.

    Returns:
        A dictionary with a status key ('success' or 'error').
        On error, includes an 'error_message' key.
    """    
    
    # Use string formatting since Neo4j doesn't support parameterization of labels and property keys when creating a constraint
    constraint_name = f"{label}_{unique_property_key}_constraint"
    query = f"""CREATE CONSTRAINT `{constraint_name}` IF NOT EXISTS
    FOR (n:`{label}`)
    REQUIRE n.`{unique_property_key}` IS UNIQUE"""
    results = graphdb.send_query(query)
    return results


In [None]:
def load_nodes_from_csv(
    source_file: str,
    label: str,
    unique_column_name: str,
    properties: list[str],
) -> Dict[str, Any]:
    # load nodes from CSV file by merging on the unique_column_name value 
    query = f"""LOAD CSV WITH HEADERS FROM "file:///" + $source_file AS row
    CALL (row) {{
        MERGE (n:$($label) {{ {unique_column_name} : row[$unique_column_name] }})
        FOREACH (k IN $properties | SET n[k] = row[k])
    }} IN TRANSACTIONS OF 1000 ROWS
    """

    results = graphdb.send_query(query, {
        "source_file": source_file,
        "label": label,
        "unique_column_name": unique_column_name,
        "properties": properties
    })
    return results


In [None]:

def import_nodes(node_construction: dict) -> dict:
    # {
    #     "construction_type": "node", 
    #     "source_file": "assemblies.csv", 
    #     "label": "Assembly", 
    #     "unique_column_name": "assembly_id", 
    #     "properties": ["assembly_name", "quantity", "product_id"]
    # }

    # create a uniqueness constraint for the unique_column
    uniqueness_result = create_uniqueness_constraint(
        node_construction["label"],
        node_construction["unique_column_name"]
    )

    if (uniqueness_result["status"] == "error"):
        return uniqueness_result

    # import nodes from csv
    load_nodes_result = load_nodes_from_csv(
        node_construction["source_file"],
        node_construction["label"],
        node_construction["unique_column_name"],
        node_construction["properties"]
    )

    return load_nodes_result




In [None]:
def import_relationships(relationship_construction: dict) -> Dict[str, Any]:
    # {
    #     "construction_type": "relationship", 
    #     "source_file": "parts.csv", 
    #     "relationship_type": "Is_Part_Of", 
    #     "from_node_label": "Part", 
    #     "from_node_column": "part_id", 
    #     "to_node_label": "Assembly", 
    #     "to_node_column": "assembly_id", 
    #     "properties": ["quantity"]
    # }, 

    # load nodes from CSV file by merging on the unique_column_name value 
    from_node_column = relationship_construction["from_node_column"]
    to_node_column = relationship_construction["to_node_column"]
    query = f"""LOAD CSV WITH HEADERS FROM "file:///" + $source_file AS row
    CALL (row) {{
        MATCH (from_node:$($from_node_label) {{ {from_node_column} : row[$from_node_column] }}),
              (to_node:$($to_node_label) {{ {to_node_column} : row[$to_node_column] }} )
        MERGE (from_node)-[r:$($relationship_type)]->(to_node)
        FOREACH (k IN $properties | SET r[k] = row[k])
    }} IN TRANSACTIONS OF 1000 ROWS
    """
    
    results = graphdb.send_query(query, {
        "source_file": relationship_construction["source_file"],
        "from_node_label": relationship_construction["from_node_label"],
        "from_node_column": relationship_construction["from_node_column"],
        "to_node_label": relationship_construction["to_node_label"],
        "to_node_column": relationship_construction["to_node_column"],
        "relationship_type": relationship_construction["relationship_type"],
        "properties": relationship_construction["properties"]
    })
    return results


In [None]:
def construct_domain_graph(construction_plan: dict) -> dict:
    # first, import nodes
    node_constructions = [value for value in construction_plan.values() if value['construction_type'] == 'node']
    for node_construction in node_constructions:
        import_nodes(node_construction)

    # second, import relationships
    relationship_constructions = [value for value in construction_plan.values() if value['construction_type'] == 'relationship']
    for relationship_construction in relationship_constructions:
        import_relationships(relationship_construction)


In [None]:
construct_domain_graph(approved_construction_plan)

In [None]:
ner_agent_instruction = """
    You are a top-tier algorithm designed for analyzing text files and proposing
    the kind of named entities that could be extracted which would be relevant 
    for a user's goal. 
    
    Entities are people, places, things and qualities, but not quantities. 
    Your goal is to propose a list of the kind of entities, not the actual instances
    of entities.

    There are two general approaches to identifying kinds of entities:
    - well-known entities: these closely correlate with approved node labels
    - discovered entities: these may not exist in the graph schema, but appear consistently in the source text

    Design rules for well-known entities:
    - always use approved node labels as the kind of entity. For example, if there is an approved label "Person", and people appear in the text, then propose "Person" as the kind of entity.
    - prefer reusing existing node labels rather than creating new ones
    
    Design rules for discovered entities:
    - discovered entities are consistently mentioned in the text and are highly relevant to the user's goal
    - always look for entities that would provide more depth or breadth to the existing graph
    - for example, if the user goal is to represent social communities and the graph has "Person" nodes, look through the text to discover entities that are relevant like "Location" or "Event"
    - avoid quantitive entities that may be better represented as a predicate with a property or an additional property on an existing entity.
    - for example, do not propose "Age" as a kind of entity. That is better represented as an additional property "age" on a "Person".

    Prepare for the task:
    - use the 'get_user_goal' tool to get the user goal
    - use the 'get_approved_files' tool to get the list of approved files
    - use the 'get_approved_labels' tool to get the approved node labels

    Think step by step:
    1. Sample some of the files using the 'sample_file' tool to understand the content
    2. Consider what well-known entities are mentioned in the text
    3. Discover entities that are frequently mentioned in the text that support the user's goal
    4. Use the 'set_proposed_entities' tool to save the list of well-known and discovered entities
    6. Present the recorded kinds of entities along with justification to the user
"""

Creating the agent is straightforward. Give it a name and description,
then use the instructions and tools you just defined.

In [None]:
NER_AGENT_NAME = "ner_schema_agent_v1"
ner_schema_agent = Agent(
    name=NER_AGENT_NAME,
    description="Proposes the kind of named entities that could be extracted from text files.",
    model=llm,
    instruction=ner_agent_instruction,
    tools=ner_agent_tools, 
)


The initial state is important in this phase, as the agent is designed to act
within a particular phase of an overall workflow.

The ner agent will need:

- the user goal, extended to mention product reviews and what to look for there
- a list of markdown files that have been pre-approved
- the approved construction plan from the structured data design phase

In [None]:
ner_agent_initial_state = {
    "approved_user_goal": {
        "kind_of_graph": "supply chain analysis",
        "description": """A multi-level bill of materials for manufactured products, useful for root cause analysis. 
        Add product reviews to start analysis from reported issues like quality, difficulty, or durability."""
    },
    "approved_files": [
        "product_reviews/gothenburg_table_reviews.md",
        "product_reviews/helsingborg_dresser_reviews.md",
        "product_reviews/jonkoping_coffee_table_reviews.md",
        "product_reviews/linkoping_bed_reviews.md",
        "product_reviews/malmo_desk_reviews.md",
        "product_reviews/norrkoping_nightstand_reviews.md",
        "product_reviews/orebro_lamp_reviews.md",
        "product_reviews/stockholm_chair_reviews.md",
        "product_reviews/uppsala_sofa_reviews.md",
        "product_reviews/vasteras_bookshelf_reviews.md"
    ],
    "approved_construction_plan": {
        "Product": {
            "construction_type": "node",
            "label": "Product",
        },
        "Assembly": {
            "construction_type": "node",
            "label": "Assembly",
        },
        "Component": {
            "construction_type": "node",
            "label": "Component",
        },
        "Supplier": {
            "construction_type": "node",
            "label": "Supplier",
        }
    }
}

OK, let's run the agent. 

- use the make_agent_caller to create an execution environment
- prompt the agent with a single message that should kick-off the analysis
- expect the result to be a proposed list of entity types
- but *not* a list of approved entity types

The entity types here may vary quite a bit. If you're not happy with the proposal,
you can run the cell again to get a new list.

In [None]:
from helper import make_agent_caller

ner_agent_caller = await make_agent_caller(ner_schema_agent, ner_agent_initial_state)

await ner_agent_caller.call("Add product reviews.")

session_end = await ner_agent_caller.get_session()

print("\n---\n")

print("\nSession state: ", session_end.state)

if PROPOSED_ENTITIES in session_end.state:
    print("\nProposed entities: ", session_end.state[PROPOSED_ENTITIES])

if APPROVED_ENTITIES in session_end.state:
    print("\nApproved entities: ", session_end.state[APPROVED_ENTITIES])
else:
    print("\nAwaiting approval.")

Once you're happy with the proposal, you can tell the agent that you approve.

In [None]:
await ner_agent_caller.call("Approve the proposed entities.")

session_end = await ner_agent_caller.get_session()

print("Session state: ", session_end.state)

if APPROVED_ENTITIES in session_end.state:
    print("\nApproved entities: ", session_end.state[APPROVED_ENTITIES])
else:
    print("\nAwaiting approval.")

# Fact Extraction


In [None]:
PROPOSED_FACTS = "proposed_fact"
APPROVED_FACTS = "approved_facts"

def add_proposed_fact(approved_subject_label:str,
                      proposed_predicate_label:str,
                      approved_object_label:str,
                      tool_context:ToolContext) -> dict:
    """Add a proposed type of fact that could be extracted from the files.

    A proposed fact type is a tuple of (subject, predicate, object) where
    the subject and object are approved entity types and the predicate 
    is a proposed relationship label.

    Args:
      approved_subject_label: approved label of the subject entity
      proposed_predicate_label: label of the predicate
      approved_object_label: approved label of the object entity
    """
    # Guard against invalid labels
    approved_entities = tool_context.state.get(APPROVED_ENTITIES, [])
    
    if approved_subject_label not in approved_entities:
        return tool_error(f"Approved subject label {approved_subject_label} not found. Try again.")
    if approved_object_label not in approved_entities:
        return tool_error(f"Approved object label {approved_object_label} not found. Try again.")
    
    current_predicates = tool_context.state.get(PROPOSED_FACTS, {})
    current_predicates[proposed_predicate_label] = {
        "subject_label": approved_subject_label,
        "predicate_label": proposed_predicate_label,
        "object_label": approved_object_label
    }
    tool_context.state[PROPOSED_FACTS] = current_predicates
    return tool_success(PROPOSED_FACTS, current_predicates)
    
def get_proposed_facts(tool_context:ToolContext) -> dict:
    """Get the proposed types of facts that could be extracted from the files."""
    return tool_context.state.get(PROPOSED_FACTS, {})


def approve_proposed_facts(tool_context:ToolContext) -> dict:
    """Approve the proposed fact types."""
    tool_context.state[APPROVED_FACTS] = tool_context.state.get(PROPOSED_FACTS)
    return tool_success(APPROVED_FACTS, tool_context.state[APPROVED_FACTS])


In [None]:
fact_agent_instruction = """
    You are a top-tier algorithm designed for analyzing text files and proposing
    the kind of facts that could be extracted from text that would be relevant 
    for a user's goal. 

    Do not propose specific individual facts, but instead propose the general kind 
    of facts that would be relevant for the user's goal. 

    For example, do not propose "ABK likes coffee" but the general type of fact "Person likes Beverage".
    
    Facts are triplets of (subject, predicate, object) where the subject and object are
    approved entity types, and the proposed predicate provides information about
    how they are related. For example, a fact type could be (Person, likes, Beverage).

    Design rules for facts:
    - only use approved entity types or node labels as subjects or objects. Do not propose new types of entities
    - the proposed predicate should describe the relationship between the approved subject and object
    - the predicate should optimize for information that is relevant to the user's goal
    - the predicate must appear in the source text. Do not guess.
    - use the 'add_proposed_fact' tool to record each proposed fact

    Prepare for the task:
    - use the 'get_approved_user_goal' tool to get the user goal
    - use the 'get_approved_files' tool to get the list of approved files
    - use the 'get_approved_entities' tool to get the list of approved entities

    Think step by step:
    1. Use the 'get_approved_user_goal' tool to get the user goal
    2. Sample some of the approved files using the 'sample_file' tool to understand the content
    3. Consider how subjects and objects are related in the text
    4. Call the 'add_proposed_fact' tool for each fact you propose
    5. Present the proposed types of facts to the user, along with an explanation
"""

In [None]:
fact_agent_tools = [
    get_approved_user_goal, get_approved_files, 
    get_approved_entities,
    sample_file,
    add_proposed_fact,
    get_proposed_facts,
    approve_proposed_facts
]

In [None]:
FACT_AGENT_NAME = "relevant_fact_agent_v1"
relevant_fact_agent = Agent(
    name=FACT_AGENT_NAME,
    description="Proposes the kind of relevant facts that could be extracted from text files.",
    model=llm,
    instruction=fact_agent_instruction,
    tools=fact_agent_tools, 
)


In [None]:
# extend the initial state of the fact agent
fact_agent_initial_state = ner_agent_initial_state

fact_agent_initial_state.update({
    APPROVED_ENTITIES: [
        'Product', 'Issue', 'User', 'Review'
    ]
})

In [None]:
fact_agent_caller = await make_agent_caller(relevant_fact_agent, fact_agent_initial_state)

await fact_agent_caller.call("Propose facts.", True)

session_end = await fact_agent_caller.get_session()

print("\n---\n")

print("\nSession state: ", session_end.state)

if PROPOSED_FACTS in session_end.state:
    print("\nProposed facts: ", session_end.state[PROPOSED_FACTS])
else:
    print("\nProposed facts not found in session state. Try again.")

if APPROVED_FACTS in session_end.state:
    print("\nApproved facts: ", session_end.state[APPROVED_FACTS])
else:
    print("\nApproved facts not found in session state.")

In [None]:
await fact_agent_caller.call("Approve the proposed fact types.")

session_end = await fact_agent_caller.get_session()

print("Session state: ", session_end.state)

if APPROVED_FACTS in session_end.state:
    print("\nApproved fact types: ", session_end.state[APPROVED_FACTS])
else:
    print("\nFailed to approve fact types.")