## Agent With Text Cypher Retrieval Via MCP

In [1]:
import os
import json
import getpass
from dotenv import load_dotenv
from expert_tools import ExpertTools

#get env setup
load_dotenv('.env', override=True)

if not os.environ.get('NEO4J_URI'):
    os.environ['NEO4J_URI'] = getpass.getpass('NEO4J_URI:\n')
if not os.environ.get('NEO4J_USERNAME'):
    os.environ['NEO4J_USERNAME'] = getpass.getpass('NEO4J_USERNAME:\n')
if not os.environ.get('NEO4J_PASSWORD'):
    os.environ['NEO4J_PASSWORD'] = getpass.getpass('NEO4J_PASSWORD:\n')

NEO4J_URI = os.getenv('NEO4J_URI')
NEO4J_USERNAME = os.getenv('NEO4J_USERNAME')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD')

In [3]:
from AgentRunner import AgentRunner
from expert_tools import ExpertTools
# build adk agent with neo4j mcp
from neo4j_podcast_episode import PersonRel, PodcastEpisodeRel, EpisodeTopicRel, EpisodeChunkRel, TopicConceptRel, TopicTechnologyRel
from google.adk.models.lite_llm import LiteLlm
from google.adk.agents import Agent
from google.adk.tools.mcp_tool.mcp_toolset import MCPToolset, StdioServerParameters
from prompts import MENTAL_MODEL_AI_INSTRUCTION
from vector_graph_tool import VectorGraphSearchTool

expert = ExpertTools()
database_agent = Agent(
    name="graph_database_agent",
    # model="gemini-2.0-flash-exp",
    model=LiteLlm(model="openai/gpt-4.1-mini"),
    # model=LiteLlm(model="anthropic/claude-sonnet-4-20250514"),
    description="""
    Agent to access knowledge graph stored in graph database
    """,
    instruction=MENTAL_MODEL_AI_INSTRUCTION,
    tools=[expert.find_episodes_by_people,
           expert.find_episodes_by_concept,
           expert.find_episodes_by_technology,
           expert.find_episodes_by_topic,
           MCPToolset(
        connection_params=StdioServerParameters(
            command='uvx',
            args=[
                "mcp-neo4j-cypher",
            ],
            env={ k: os.environ[k] for k in ["NEO4J_URI","NEO4J_USERNAME","NEO4J_PASSWORD"] }
        ),
        tool_filter=['get_neo4j_schema','read_neo4j_cypher']
    )]
)

db_agent_runner = AgentRunner(app_name='db_agent', user_id='Sang', agent=database_agent)
await db_agent_runner.start_session()

  MCPToolset(
StdioServerParameters is not recommended. Please use StdioConnectionParams.


Session started successfully with ID: b1938784-874e-43cb-a902-7298520cea7a


True

In [4]:
res = await db_agent_runner.run("what is Sangeetha Ramadurai learning now?")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_pPbKhiW95xfb3klEPoZFSh3e' args={'question': 'Sangeetha Ramadurai'} name='find_episodes_by_people' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_pPbKhiW95xfb3klEPoZFSh3e' name='find_episodes_by_people' response={'result': '[\n  {\n    "person_name": "Sangeetha Ramadurai",\n    "relationship_type": "LISTENS_TO_EPISODE",\n    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",\n    "episode_number": 477,\n    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477"\n  },\n  {\n    "person_name": "Sangeetha Ramadurai",\n    "relationship_type": "LEARNING_FROM",\n    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",\n    "episode_number": 477,\n    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477"\n  },\n  {\n    "person_name": "Sangeetha Ramadurai",\n    "relationship_type": "LISTENS_TO_EPISODE",\n    "episode_name": "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics",\n    "episo

In [4]:
res = await db_agent_runner.run("What topics are discussed in the episodes?")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_PC3OPtINaDzOCbh5PCT0IQuF' args={'query': "MATCH (p:Person {name: 'Sangeetha Ramadurai'})-[:LEARNING_FROM]->(e:Episode)-[:HAS_TOPIC]->(t:Topic) RETURN e.name AS episodeName, collect(t.name) AS topics ORDER BY episodeName LIMIT 10"} name='read_neo4j_cypher' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_PC3OPtINaDzOCbh5PCT0IQuF' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"episodeName": "High Performance And Low Overhead Graphs With KuzuDB", "topics": ["Embeddable Graph Databases with KuzuDB"]}, {"episodeName": "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics", "topics": ["Delayed View Semantics and Incremental Processing"]}]', annotations=None, meta=None)], structuredContent=None, isError=False)}
The topics discussed in the episodes that Sangeetha Ramadurai is learning from are:

1. "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics"  
   - Topic: Delayed View Semantics and Incremental Processing

2. "High Performance And Low Overhead Graphs With KuzuDB"  
   - Topic: Embeddable Graph Databases with KuzuDB

These topics were retrieved by finding the topics linked to the episodes

In [4]:
res = await db_agent_runner.run("How many episodes do I have?")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_IJXnn2mRKAcbIgsaGMK9HFSb' args={} name='get_neo4j_schema' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_IJXnn2mRKAcbIgsaGMK9HFSb' name='get_neo4j_schema' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='{"Characteristic": {"type": "node", "count": 291, "properties": {"id": {"indexed": true, "type": "STRING"}, "embedding": {"indexed": false, "type": "LIST"}}, "relationships": {"USES_STORAGE_TYPE": {"direction": "in", "labels": ["__Entity__", "Product", "Company", "Database", "System", "GraphDatabase"]}, "STANDS_FOR": {"direction": "in", "labels": ["__Entity__", "Protocol"]}, "HAS_APPROACH": {"direction": "in", "labels": ["Document", "__Entity__"]}, "STORES": {"direction": "in", "labels": ["__Entity__", "Product", "Technology"]}, "HAS_IMPLEMENTATION": {"direction": "in", "labels": ["__Entity__", "Product"]}, "HAS_FUNCTION": {"direction": "in", "labels": ["__Entity__", "Product", "System"]}, "SOLVES_PROBLEM": {"direction": "in", "labels": ["__Entity__", "Product", "Project", "Platform", "Protocol"]}, 

auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_5e1P2d3DXu4iLI0IeC3EURjD' args={'query': 'MATCH (e:Episode) RETURN COUNT(e) AS episodeCount'} name='read_neo4j_cypher' None
None None will_continue=None scheduling=None id='call_5e1P2d3DXu4iLI0IeC3EURjD' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"episodeCount": 2}]', annotations=None, meta=None)], structuredContent=None, isError=False)}
You have 2 episodes in your knowledge graph. This count was obtained by querying nodes labeled as "Episode" in the database. Let me know if you want details about these episodes or anything else! None None


In [6]:
res = await db_agent_runner.run("Summarize snowflake capabilities discussed in the episodes?")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_m2W9UqOERTBNnL0SXNh4ctk4' args={'query': "MATCH (p:Person {name: 'Sangeetha Ramadurai'})-[:LEARNING_FROM]->(e:Episode)-[:HAS_CHUNK]->(c:Chunk) WHERE e.name CONTAINS 'Snowflake' RETURN c.text AS content LIMIT 10"} name='read_neo4j_cypher' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_m2W9UqOERTBNnL0SXNh4ctk4' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[]', annotations=None, meta=None)], structuredContent=None, isError=False)}


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_gwSaE9wmRSxojrVuhJKucxy9' args={'query': "MATCH (e:Episode)-[:HAS_CHUNK]->(c:Chunk) WHERE e.name = 'Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics' RETURN c.text AS content LIMIT 10"} name='read_neo4j_cypher' None
None None will_continue=None scheduling=None id='call_gwSaE9wmRSxojrVuhJKucxy9' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"content": "Tobias Macey:  Hello, and welcome to the Data Engineering Podcast, the show about modern data management.   Data migrations are brutal. They drag on for months, sometimes years, burning through resources and crushing team morale.   DataFolds AI powered migration agent changes all that.   Their unique combination of AI code translation and automated data validation has helped companies complete migrations up to 10 times faster than manual approaches.   And theyre so confident in their solution, theyll actually guarant

In [None]:
# find_episodes_discussing_topics
res = await db_agent_runner.run("Find episodes discussing database and return one liner about the episode")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_PPyTtLBTnxU9Jgb58N39cEgh' args={'query': "MATCH (e:Episode)-[:HAS_TOPIC]->(t:Topic) WHERE toLower(t.name) CONTAINS 'database' OR toLower(t.name) CONTAINS 'dynamic views' RETURN DISTINCT e.name AS episodeName, e.description AS oneLiner LIMIT 10"} name='read_neo4j_cypher' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_PPyTtLBTnxU9Jgb58N39cEgh' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"episodeName": "High Performance And Low Overhead Graphs With KuzuDB", "oneLiner": "In this episode of the Data Engineering Podcast Prashanth Rao, an AI engineer at KuzuDB, talks about their embeddable graph database. Prashanth explains how KuzuDB addresses performance shortcomings in existing solutions through columnar storage and novel join algorithms. He discusses the usability and scalability of KuzuDB, emphasizing its open-source nature and potential for various graph applications. The conversation explores the growing interest in graph databases due to their AI and data engineering applications, and Prashanth highlights KuzuDB\'s potential in edge computing, ephemeral workloads, and integration with other formats like Iceberg and Parquet."}]', annotations=None, meta=None)], structuredConten

In [15]:
res = await db_agent_runner.run("Find episodes with reference linking them to any database e.g. snowflake, kuzu, etc. and return one liner about the episode")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_wvZUjrdnm0PToBQiqKOidgHo' args={'query': "MATCH (e:Episode)-[:HAS_REFERENCE_LINK]->(r:ReferenceLink) WHERE toLower(r.text) CONTAINS 'snowflake' OR toLower(r.text) CONTAINS 'kuzu' RETURN e.name AS episodeName, e.description AS oneLiner LIMIT 10"} name='read_neo4j_cypher' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_wvZUjrdnm0PToBQiqKOidgHo' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"episodeName": "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics", "oneLiner": "In this episode of the Data Engineering Podcast Dan Sotolongo from Snowflake talks about the complexities of incremental data processing in warehouse environments. Dan discusses the challenges of handling continuously evolving datasets and the importance of incremental data processing for optimized resource use and reduced latency. He explains how delayed view semantics can address these challenges by maintaining up-to-date results with minimal work, leveraging Snowflake\'s dynamic tables feature. The conversation also explores the broader landscape of data processing, comparing batch and streaming systems, and highlights the trade-offs between them. Dan emphasizes the need f

In [5]:
res = await db_agent_runner.run("Find episodes where snowflake is referenced")

  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None id='call_CCadOd1D1XIpNFHC1n73L9im' args={'question': 'Snowflake'} name='find_episodes_by_technology' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_CCadOd1D1XIpNFHC1n73L9im' name='find_episodes_by_technology' response={'result': '[]'}
I searched for episodes referencing the technology "Snowflake" but did not find any in your current knowledge base. It may be that Snowflake is not explicitly tagged or discussed in the episodes indexed here.

If you want, I can try to find episodes discussing related topics or technologies around data warehousing or incremental processing, which might implicitly relate to Snowflake. Would you like me to do that? None None


In [None]:
from IPython.core.display import Markdown

display(Markdown(res))

## Adding Expert Tools

### Tool - Ask free form question for Vector + graph response

In [17]:
# Test the new Vector + Graph Search Tool
print("Testing Vector + Graph Search Tool...")
res = await db_agent_runner.run("How does KuzuDB improve performance in graph databases? Use the vector search tool to find relevant content.")


  super().__init__(
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


Testing Vector + Graph Search Tool...
None id='call_3xIgfXmVemU4n62EBMTzqqGN' args={'query': "MATCH (p:Person {name: 'Sangeetha Ramadurai'})-[:LEARNING_FROM]->(e:Episode {name: 'High Performance And Low Overhead Graphs With KuzuDB'})-[:HAS_CHUNK]->(c:Chunk) RETURN c.text AS content LIMIT 10"} name='read_neo4j_cypher' None


auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.
auth_config or auth_config.auth_scheme is missing. Will skip authentication.Using FunctionTool instead if authentication is not required.


None None will_continue=None scheduling=None id='call_3xIgfXmVemU4n62EBMTzqqGN' name='read_neo4j_cypher' response={'result': CallToolResult(meta=None, content=[TextContent(type='text', text='[{"content": "Summary In this episode of the Data Engineering Podcast Prashanth Rao, an AI engineer at KuzuDB, talks about their embeddable graph database. Prashanth explains how KuzuDB addresses performance shortcomings in existing solutions through columnar storage and novel join algorithms. He discusses the usability and scalability of KuzuDB, emphasizing its open-source nature and potential for various graph applications. The conversation explores the growing interest in graph databases due to their AI and data engineering applications, and Prashanth highlights KuzuDBs potential in edge computing, ephemeral workloads, and integration with other formats like Iceberg and Parquet.   Hello, and welcome to the Data Engineering podcast, the show about modern data management.   Are you tired of data m

In [12]:
# Test search methods with JSON formatting
import json
from expert_tools import ExpertTools

expert = ExpertTools()




In [13]:
# Test the specific method you're using
print("=== Testing find_episodes_by_technology ===")
technology_res = expert.find_episodes_by_technology("Dynamic Tables, View")
print(technology_res)





=== Testing find_episodes_by_technology ===
[
  {
    "episode_name": "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics",
    "episode_number": 473,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/delayed-view-semantics-incremental-data-processing-episode-473",
    "topic_name": "Delayed View Semantics and Incremental Processing",
    "technology_name": "Materialized Views",
    "matched_term": "View"
  },
  {
    "episode_name": "Warehouse Native Incremental Data Processing With Dynamic Tables And Delayed View Semantics",
    "episode_number": 473,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/delayed-view-semantics-incremental-data-processing-episode-473",
    "topic_name": "Delayed View Semantics and Incremental Processing",
    "technology_name": "Dynamic Tables",
    "matched_term": "Dynamic Tables"
  },
  {
    "episode_name": "Warehouse Native Incremental Data Processing With Dynamic Tab

In [14]:
# Test other methods with JSON formatting
print("=== Testing find_episodes_by_topic ===")
topic_res = expert.find_episodes_by_topic("database, Graph")
print(topic_res)





=== Testing find_episodes_by_topic ===
[
  {
    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",
    "episode_number": 477,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477",
    "description": "In this episode of the Data Engineering Podcast Prashanth Rao, an AI engineer at KuzuDB, talks about their embeddable graph database. Prashanth explains how KuzuDB addresses performance shortcomings in existing solutions through columnar storage and novel join algorithms. He discusses the usability and scalability of KuzuDB, emphasizing its open-source nature and potential for various graph applications. The conversation explores the growing interest in graph databases due to their AI and data engineering applications, and Prashanth highlights KuzuDB's potential in edge computing, ephemeral workloads, and integration with other formats like Iceberg and Parquet.",
    "topics": [
      "Embeddable Graph Dat

In [15]:
print("=== Testing find_episodes_by_people ===")
people_res = expert.find_episodes_by_people("Prashanth, Sangeetha")
print(people_res)





=== Testing find_episodes_by_people ===
[
  {
    "person_name": "Sangeetha Ramadurai",
    "relationship_type": "LISTENS_TO_EPISODE",
    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",
    "episode_number": 477,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477",
    "matched_term": "Sangeetha"
  },
  {
    "person_name": "Sangeetha Ramadurai",
    "relationship_type": "LEARNING_FROM",
    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",
    "episode_number": 477,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477",
    "matched_term": "Sangeetha"
  },
  {
    "person_name": "Prashanth Rao",
    "relationship_type": "IS_A_GUEST",
    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",
    "episode_number": 477,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-em

In [16]:
print("=== Testing find_episodes_by_concept ===")
concept_res = expert.find_episodes_by_concept("graph database, database")
print(concept_res)



=== Testing find_episodes_by_concept ===
[
  {
    "episode_name": "High Performance And Low Overhead Graphs With KuzuDB",
    "episode_number": 477,
    "episode_link": "https://www.dataengineeringpodcast.com/episodepage/kuzudb-embedded-graph-database-episode-477",
    "topic_name": "Embeddable Graph Databases with KuzuDB",
    "concept_name": "Embeddable Graph Database",
    "concept_description": "The concept discussed in the Data Engineering Podcast episode about KuzuDB centers on its role as a modern, embeddable graph database designed to address performance shortcomings in existing graph solutions. The conversation highlights KuzuDB\u2019s use of columnar storage, novel join algorithms, and vectorization to improve performance and scalability. It also covers KuzuDB\u2019s open-source nature, ease of use for developers, and its applications in areas like AI, data engineering, edge computing, and ephemeral workloads. The discussion further explores KuzuDB\u2019s integration with fo

In [17]:
# Clean up
expert.close()

## Standalone Expert Tools Teting

In [None]:
from vector_graph_tool import VectorGraphSearchTool

# Direct usage of the Vector + Graph Search Tool
vector_tool = VectorGraphSearchTool()

# Test different types of questions
questions = [
    "How does KuzuDB improve performance in graph databases?",
    "What are the key features of Snowflake's dynamic tables?",
    "Who are the experts discussing data processing optimization?",
    "What technologies are mentioned in the episodes?"
]

for question in questions:
    print(f"\n{'='*60}")
    print(f"Question: {question}")
    print(f"{'='*60}")
    answer = vector_tool(question, top_k=3)
    print(answer)

# Clean up
vector_tool.close()


In [None]:
await db_agent_runner.end_session()