In [97]:
graph_description = '''
- (:Bird) nodes have the following attributes: {order: STRING, genus: STRING, family: STRING, species: STRING, name: STRING}

- (:Image) nodes:
-- are connected to birds by (:Bird)-[:HAS_IMAGE]->(:Image)
-- have an {embedding: LIST} attribute, which allows the images to be matched to text or descriptions.
-- have a VECTOR INDEX called "image_embedding_idx" on the "embedding" field, which has cosine similarity, to search for images based on text.

- (:Genus), (:Order), and (:Family) nodes are connected to (:Bird) nodes by:
-- (:Bird)-[:IN_ORDER]->(:Order)
-- (:Bird)-[:IN_FAMILY]->(:Family)
-- (:Bird)-[:IN_GENUS]->(:Genus)

- (:Fact) nodes:
-- are connected to birds by (:Bird)-[:HAS_FACT]->(:Fact)
-- have the attributes: {bird_name: STRING, title: STRING, embedding: LIST, text: STRING}
-- have a VECTOR INDEX called "fact_embedding_idx" on the "embedding" field, which has cosine similarity, to search for fact text that is semantically similar to key words or phrases.
-- the "title" field describes what the fact is about.
-- here are the possible values for the "title" field: ['Introduction', 'Identification', 'Similar_Species', 'Systematics_History', 'Geographic_Variation', 'Subspecies', 'Distribution', 'General_Habitat', 'Movements_and_Migration', 'Diet_and_Foraging', 'Sounds_and_Vocal_Behavior', 'Breeding', 'Conservation_Status', 'Vernacular_Names', 'Hybridization', 'Eggs', 'Parental_Care', 'Plumages', 'Migration_Overview', 'Other', 'Nest', 'Predation', 'Measures_of_Breeding_Activity', 'Causes_of_Mortality', 'Population_Regulation', 'Historical_Changes_to_the_Distribution', 'Sexual_Behavior', 'Population_Spatial_Metrics', 'Social_and_Interspecific_Behavior', 'Diet', 'Agonistic_Behavior', 'Phenology', 'Nest_Site', 'Incubation', 'Hatching', 'Cooperative_Breeding', 'Population_Status', 'Effects_of_Human_Activity', 'Young_Birds', 'Life_Span_and_Survivorship', 'Bare_Parts', 'Feeding', 'Vocalizations', 'Fledgling_Stage', 'Management', 'Related_Species', 'Fossils', 'Locomotion', 'Molts', 'Measurements', 'Nonvocal_Sounds', 'Behavior', 'Demography_and_Populations', 'Priorities_for_Future_Research', 'Food_Selection_and_Storage', 'Nutrition_and_Energetics', 'Metabolism_and_Temperature_Regulation', 'Drinking_Pellet_Casting_and_Defecation', 'Brood_Parasitism', 'Similar_Species_Summary', 'Dispersal_and_Site_Fidelity', 'Pathogens_and_Parasites', 'Self_Maintenance', 'Habitat', 'Nonmigratory_Movements', 'Habitat_in_Breeding_Range', 'Habitat_in_Nonbreeding_Range', 'Timing_and_Routes_of_Migration', 'Migratory_Behavior', 'Control_and_Physiology_of_Migration', 'Field_Identification']
'''

In [47]:
from typing import List
import torch
from transformers.utils.import_utils import is_flash_attn_2_available

from colpali_engine.models import BiQwen2_5, BiQwen2_5_Processor

embed_model_name = "nomic-ai/nomic-embed-multimodal-3b"

embed_model = BiQwen2_5.from_pretrained(
    embed_model_name,
    torch_dtype=torch.bfloat16,
    device_map="cuda:0",  # or "mps" if on Apple Silicon
    attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
).eval()

embed_processor = BiQwen2_5_Processor.from_pretrained(embed_model_name)

def generate_txt_embeddings(queries: List[str]) -> List[List[float]]:
	with torch.no_grad():
		batch_queries = embed_processor.process_queries(queries).to(embed_model.device)
		return embed_model(**batch_queries).tolist()

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [109]:
import os
import re
import math
import random
import json
from dataclasses import dataclass, field
from pydantic import BaseModel, Field
from typing import (
	TypedDict,
	Annotated,
	Optional,
	Union,
	Any,
	Dict,
	List,
	Tuple
)
from langchain_core.messages import (
    BaseMessage,
	SystemMessage,
	AIMessage,
	HumanMessage
)
from neo4j import GraphDatabase
from neo4j.exceptions import CypherSyntaxError, ClientError
from langchain_ollama import ChatOllama
from langgraph.graph.state import CompiledStateGraph
from langgraph.graph import StateGraph, START, END
from dotenv import load_dotenv

load_dotenv()

import logging

#MODEL_NAME = 'qwen3:30b'
MODEL_NAME = 'qwen3-coder:30b'

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    force=True,  # reset handlers if notebook re-runs
)

logger = logging.getLogger("notebook")


@dataclass
class ReasoningNode:
	reasoning_step: str
	cypher_query: Optional[str] = None
	results: List[Dict[str, Any]] = field(default_factory=list)
	execution_feedback: Optional[str] = None
	is_evaluated: bool = False

	parent: Optional["ReasoningNode"] = None
	children: List["ReasoningNode"] = field(default_factory=list)
	visits: int = 0
	value: int = 0.0


	@property
	def average_score(self):
		return self.value / self.visits if self.visits > 0 else 0


	def get_lineage(self) -> List['ReasoningNode']:
		chain = []
		curr = self
		while curr:
			chain.append(curr)
			curr = curr.parent
		return list(reversed(chain))


	def uct_score(self, exploration_weight=1.41):
		if self.visits == 0:
			return float('inf') # Force visitation of unvisited nodes
		
		exploitation = self.average_score
		exploration = exploration_weight * math.sqrt(
			math.log(self.parent.visits) / self.visits
		)
		return exploitation + exploration


class CypherGeneratorState(TypedDict):
	query: str
	root: ReasoningNode
	current_node: ReasoningNode
	iterations: int 
	max_iterations: int
	num_leaves: int
	embeddings: Dict[str, List[float]]
	last_reward: float
	final_answer: Optional[str]


class EmbeddingField(BaseModel):
	var_name: str = Field(
		description='The name of the variable, as used in the cypher query, e.g. "embed1"'
	)
	text_to_embed: str = Field(
		description='The text to be embedded that will be referenced by the variable in cypher, e.g. "text for semantic similarity search"'
	)


class CypherOutput(BaseModel):
	cypher: str = Field(
		description=(
			'The cypher of the query you want to execute, based on the input and reasoning steps so far. '
			'Use any text/image embeddings here as variables, e.g. $embed1. '
			'All text and image embeddings will be generated with a multimodal embedding model. '
		)
	)
	fields_to_embed: List[EmbeddingField] = Field(
		description=(
      		'A list of the parameters in the query that require vector embeddings. '
			'The key of each field will be used as the parameter name, and the value will be what is embedded and passed into the query. '
        )
	)


def generate_cypher_for_step(node_lineage: List[ReasoningNode], retry_messages: Optional[List[BaseMessage]] = []) -> CypherOutput:
	cypher_llm = ChatOllama(
		model=MODEL_NAME,
		temperature=0.2,
		base_url='http://127.0.0.1:11434',
		keep_alive='0',
		format='json'
	).with_structured_output(CypherOutput)

	messages = [
     	SystemMessage(content=(
			'You are a cypher query generator. Your goal is to create a query that interacts with a Neo4j database, modifying it step-by-step based on user inputs.\n'
			'This is a description of the graph that you are interacting with:\n\n'	
			f'{graph_description}\n\n'
			'Use Neo4j 5.0+, DO NOT USE APOC, and DO NOT embed any text directly in the query itself. Pass embeddings in as parameters.\n'
			'The ONLY vector search function you should use is db.index.vector.queryNodes.'
		))
	]

	for node_ind in range(1, len(node_lineage)): # ignores root node
		curr_node = node_lineage[node_ind]
		prev_node = node_lineage[node_ind-1]

		if prev_node.cypher_query:
			messages.append(AIMessage(content=prev_node.cypher_query))

		human_message = f'Execution Feedback: {prev_node.execution_feedback}\n\n' \
      		if prev_node.execution_feedback \
			else ''
		human_message += f'Reasoning: {curr_node.reasoning_step}'

		messages.append(HumanMessage(content=human_message))

	messages = messages + retry_messages
	logger.info(f'\n\nCYPHER MESSAGES:\n')
	for message in messages:
		logger.info(f'\n{message}')

	return cypher_llm.invoke(messages)


def generate_reasoning_options_for_step(query: str, num_leaves: int, node_lineage: List[ReasoningNode]) -> List[str]:
	class ReasoningOptions(BaseModel):
		possibilities: Annotated[List[str], Field(min_length=num_leaves, max_length=num_leaves)] = Field(
			description=(
				f'A list of {num_leaves} POSSIBILITIES for what the next step could be to edit the query further. '
				'Do not include labels like "1.", "Step 1.", bullet points, or any other label. '
				'Do not return cypher, only return high-level descriptions of the potential next steps. '
				'Next steps do not have to entirely answer the QUERY, they just need to be the next step towards the goal.'
			)
		)

	reasoning_options_llm = ChatOllama(
		model=MODEL_NAME,
		temperature=0.5,
		base_url='http://127.0.0.1:11434',
		keep_alive='0',
		format='json'	
	).with_structured_output(ReasoningOptions)

	output = reasoning_options_llm.invoke([
		SystemMessage(content=(
			'You are an expert graph database architect. You are working with a graph database with the following description:\n\n'
			f'{graph_description}\n\n'
			'You will accept the following JSON as input:\n'
			'{\n'
			'\t"query": str - the original user query that the reasoning steps are working towards,\n'
			'\t"history": list - the previous steps that have been taken, with this structure: [\n'
			'\t\t{\n'
			'\t\t\t"step_number": int - the number of the step\n'
			'\t\t\t"reasoning": str - the reasoning behind the step\n'
			'\t\t\t"execution_feedback": str - a description of what the result of the step was, after executing on the database\n'
   			'\t\t}'
			'\t\t...\n'
       		'\t]\n'
			'}\n\n'
			f'Return a list of {num_leaves} POSSIBILITIES for what the next step could be. '
			'Every item in this list should represent a DIFFERENT approach for continuing to reason through this problem. '
		)),
		HumanMessage(content=json.dumps({
			'query': query,
			'history': [
				{
					'step_number': ind+1,
					'reasoning': step.reasoning_step,
					'execution_feedback': step.execution_feedback	
				} for ind, step in enumerate(node_lineage)
			]
		}))
	])
	return output.possibilities


class FeedbackOutput(BaseModel):
	feedback: str = Field(
		description=(
			'A short synapsis (less than 100 words) that describes if the results are expected, given the cypher and reasoning, and if there are any inconsistencies to note. '
			'Also add notes for anything that could be improved, or additional features that could be used to refine the results. '
			'Do NOT speculate on what the next steps should be, just explain how it is going so far.'
		)
	)
	score: float = Field(
		ge=0.0,
        le=1.0,
		description=(
			'A value between 0.0 and 1.0 that determines how well the reasoning is progressing towards its goal. '
			'A score of 0.0 means that the query is on the wrong track. It has failed in what it set out to do, and another approach should be used. '
			'A score of 1.0 means that the query has completely fulfilled its purpose, and COMPLETELY answers the user\'s query. '
			'A score between 0.0 and 1.0 meands that the reasoning may be on the right track, but the full question is not answered yet. '
			'Give a lower score if the results are unexpected or seem erroneous, given the reasoning and cypher. '
			'Give a higher score (but below 1.0) if the results seem sound, but the original question is not yet answered.'
		)
	)
		

def _serialize_results(node: ReasoningNode) -> Dict[str, Any]:
    return [
		({
			k: v
			for k, v in row.items()
			if k not in ['embedding', 'url', 'created_at', 'copyright', 'uuid']
		} # if nodes are returned, remove irrelevant fields that may confuse agent
   		if type(row) == dict else row) 
  		for row in node.results
	]


def generate_execution_feedback(query: str, node_lineage: List[ReasoningNode]) -> FeedbackOutput:
	latest_results = _serialize_results(node_lineage[-1])
	num_sample = 5
	latest_sample = random.sample(latest_results, k=num_sample) \
		if len(latest_results) > num_sample \
		else latest_results

	eval_llm = ChatOllama(
		model=MODEL_NAME,
		temperature=0.1,
		base_url='http://127.0.0.1:11434',
		keep_alive='0',
		format='json'
	).with_structured_output(FeedbackOutput)

	return eval_llm.invoke([
		SystemMessage(content=(
			'You are an execution feedback machine. You have a series of reasoning steps that have lead to particular cypher being executed on a neo4j graph database.\n' 
			'It is is your goal to evaluate whether or not the line of reasoning is on the right track.\n\n'
			'You will accept the following JSON as input:\n'
			'{\n'
			'\t"query": str - the original user query that the reasoning steps are working towards,\n'
			'\t"history": list - the previous steps that have been taken, with this structure: [\n'
			'\t\t{\n'
			'\t\t\t"step_number": int - the number of the step,\n'
			'\t\t\t"reasoning": str - the reasoning behind the step,\n'
			'\t\t\t"execution_feedback": str - a description of what the result of the step was, after executing on the database\n'
   			'\t\t}'
			'\t\t...\n'
       		'\t],\n'
			'\t"cypher": str - this is the latest cypher query that was run against the database, corresponding to the last reasoning step,\n'
			'\t"results_count": int - the number of results that the cypher query returned,\n'
			f'\t"results_sample": list - a sample of at maximum {num_sample} items that were returned by the cypher query,\n'
			'}\n\n'
			'Your goal is to evaluate these results, as it pertains to the reasoning steps so far, using a short description and a score between 0.0 and 1.0.'
		)),
		HumanMessage(content=json.dumps({
			'query': query,
			'history': [
				{
					'step_number': ind+1,
					'reasoning': step.reasoning_step,
					'execution_feedback': step.execution_feedback	
				} for ind, step in enumerate(node_lineage)
			],
			'cypher': node_lineage[-1].cypher_query,
			'results_count': len(latest_results),
			'results_sample': latest_sample
		}))
	])


def generate_final_answer(query: str, results_to_sample: int, node_lineage: List[ReasoningNode]) -> str:
	latest_results = _serialize_results(node_lineage[-1])
	latest_sample = random.sample(latest_results, k=results_to_sample) \
		if len(latest_results) > results_to_sample \
		else None
	
	final_answer_llm = ChatOllama(
		model=MODEL_NAME,
		temperature=0.2,
		keep_alive='0',
		base_url='http://127.0.0.1:11434'
	)

	return final_answer_llm.invoke([
		SystemMessage(content=(
			'You are a question answering chatbot for Bird Watcher\'s Emporium, a one-stop shop for anything and everything related to bird watching. '	
			'After a user asked a question, results were queried from a graph database to help provide evidence to answer that question.\n\n'
			'You will receive the following JSON as input:\n'
			'{\n'
			'\t"query": str - the original question or message that the user asked\n'
			'\t"num_results": int - the number of results found by the database query\n'
			'\t"reasoning": list - the reasoning steps you took to get to the results.\n'
			'\t"cypher": str - the last cypher query you ran that returned the results.\n'
			f'\t"results": dict - a random sample of at max {results_to_sample} results from the query\n'
			'}\n\n'
			'Using the results, produce an answer for the query that is grounded in evidence found from the database. '
			'Make sure you stay faithful to the results. '
			'Be friendly and go above and beyond for the user, and be sure to offer any follow ups as needed. '
			'Only return your answer, address the user directly. '
		)),
		HumanMessage(content=json.dumps({
			'query': query,
			'num_results': len(latest_results),
			'reasoning': [ node.reasoning_step for node in node_lineage ],
			'cypher': node_lineage[-1].cypher_query,
			'results': latest_sample if latest_sample else latest_results
		}))
	]).content


class BirdGraphRetriever:
	def __init__(self,
		max_iterations: int,
		num_leaves: int
    ):
		self.max_iterations = max_iterations
		self.num_leaves = num_leaves
  
		self.graph = GraphDatabase.driver(
			os.environ['NEO4J_URI'],
			auth=(os.environ['NEO4J_USER'], os.environ['NEO4J_PWD'])
		)
		self.database = os.environ['NEO4J_DATABASE']
		self.app = self.compile_graph()

 
	def compile_graph(self) -> CompiledStateGraph:
		builder = StateGraph(CypherGeneratorState) 
		builder.add_node('select', self._select)
		builder.add_node('simulate', self._simulate)
		builder.add_node('evaluate', self._evaluate)
		builder.add_node('expand', self._expand)
		builder.add_node('simluate', self._simulate)
		builder.add_node('backpropagate', self._backpropagate)
		builder.add_node('format_final_answer', self._format_final_answer)

		builder.set_entry_point('select')
		builder.add_conditional_edges(
			'select',
			self._is_evaluated,
			{
				'True': 'expand',
				'False': 'simulate'
			}
		)
		builder.add_edge('expand', 'select')
		builder.add_edge('simulate', 'evaluate')
		builder.add_edge('evaluate', 'backpropagate')
		builder.add_conditional_edges(
			'backpropagate',
			self._is_done,
			{
				'True': 'format_final_answer',
				'False': 'select'
			}
		)
		builder.set_finish_point('format_final_answer')

		return builder.compile()
	

	def invoke(self, message: str, config: Optional[Dict[str, Any]] = {}) -> None:
		root = ReasoningNode(
			reasoning_step='Start',
			is_evaluated=True	
		)

		default_config = {
			'recursion_limit': 100
		}

		initial_state = {
			'query': message,
			'root': root,
			'current_node': root,
			'iterations': 0,
			'max_iterations': self.max_iterations,
			'num_leaves': self.num_leaves,
			'embeddings': {},
			'last_reward': 0
		}

		return self.app.invoke(initial_state, default_config | config)
	

	def _select(self, state: CypherGeneratorState) -> CypherGeneratorState:
		logger.info(f'STARTING ITERATION {state['iterations']}')
		current = state['root']

		while True:
			if not current.is_evaluated:
				logger.info(f'Node not yet evaluated, simulating: "{current.reasoning_step}"')
				return {'current_node': current}

			if not current.children:
				logger.info(f'Node not yet expanded, expanding: "{current.reasoning_step}"')
				return {'current_node': current}

			current = max(current.children, key=lambda node: node.uct_score())
	

	def _is_evaluated(self, state: CypherGeneratorState) -> CypherGeneratorState:
		current = state['current_node']
		return str(current.is_evaluated)

     
	def _expand(self, state: CypherGeneratorState) -> CypherGeneratorState:
		curr_node = state['current_node']
		logger.info(f'\n\nExpanding node "{curr_node.reasoning_step}"...')

		possible_next_steps = generate_reasoning_options_for_step(
        	query=state['query'], 
         	num_leaves=state['num_leaves'], 
          	node_lineage=curr_node.get_lineage()
		)
		logger.info(f'\n\nPossible next steps for node:\n{'\n'.join([ ' - '+p for p in possible_next_steps])}\n\n')

		generated_nodes = []
		for step in possible_next_steps:
			new_node = ReasoningNode(
				reasoning_step=step,
				parent=curr_node
			)
			curr_node.children.append(new_node)
			generated_nodes.append(new_node)
		
		return { 'current_node': curr_node, 'last_reward': 0 }


	def _simulate(self, state: CypherGeneratorState) -> CypherGeneratorState:
		logger.info(f'Simulating node: "{state['current_node'].reasoning_step}"')

		retry_messages = []
		tries = 3

		while tries > 0:
			try:
				cypher_output = generate_cypher_for_step(
					node_lineage=state['current_node'].get_lineage(),
					retry_messages=retry_messages
				)

				state['current_node'].cypher_query = cypher_output.cypher
				logger.info(f'\n\nCypher:\n{cypher_output.cypher}\n\n')

				query_parameters = {}
				logger.info(f'Embedding text: {[ dict(e) for e in cypher_output.fields_to_embed ]}...')

				for embed_field in cypher_output.fields_to_embed:
					var_name, text_to_embed = [embed_field.var_name, embed_field.text_to_embed]

					if text_to_embed in state['embeddings']:
						query_parameters[var_name] = state['embeddings'][text_to_embed]
					else:
						embeds = generate_txt_embeddings([text_to_embed])[0]
						state['embeddings'][text_to_embed] = embeds
						query_parameters[var_name] = embeds

				logger.info('Fields embedded')
				
				with self.graph.session(database=self.database) as session:	
					records = session.execute_read(
						lambda tx: tx.run(cypher_output.cypher, **query_parameters).data()
					)
					state['current_node'].results = records
					logger.info(f'Graph queried, found {len(records)} results\n\n')

				return state

			except (CypherSyntaxError, ClientError) as cypher_err:
				logger.warning(f'Error with cypher syntax (retries: {tries})\n{cypher_err}\n\n')
				tries -= 1
				retry_messages.append(AIMessage(content=state['current_node'].cypher_query))
				retry_messages.append(HumanMessage(content=f'That query throws the following error: "{cypher_err}"'))
				

		raise CypherSyntaxError('Retry Limit for Cypher Query Hit')


	def _evaluate(self, state: CypherGeneratorState) -> CypherGeneratorState:
		logger.info(f'Evaluating node...')
		feedback = generate_execution_feedback(
			query=state['query'],
			node_lineage=state['current_node'].get_lineage()
		)
		logger.info(f'Feedback: {feedback.feedback}\n\tScore: {feedback.score}\n\n')

		state['current_node'].execution_feedback = feedback.feedback
		state['current_node'].value = feedback.score
		state['current_node'].is_evaluated = True

		return { 'current_node': state['current_node'], 'last_reward': feedback.score }


	def _backpropagate(self, state: CypherGeneratorState) -> CypherGeneratorState:
		node = state['current_node']
		logger.info(f'Backpropagating node:...')
		while node:
			node.visits += 1
			node.value += state['last_reward']
			node = node.parent

		return { 
        	'iterations': state['iterations'] + 1,
			'root': state['root']
         }


	def _is_done(self, state: CypherGeneratorState) -> CypherGeneratorState:
		logger.info(f'LAST REWARD: {state['last_reward']}, ITERATIONS: {state['iterations']}/{self.max_iterations}')
		is_done = str(state['last_reward'] == 1.0 or state['iterations'] >= self.max_iterations)
		logger.info(f'Checking if done: {is_done}\n\n')
		return is_done


	def _format_final_answer(self, state: CypherGeneratorState) -> CypherGeneratorState:
		best_node = state['root']
		known_children = [ c for c in best_node.children if c.is_evaluated ]
		while known_children:
			best_node = max(known_children, key=lambda node: node.uct_score())
			known_children = [ c for c in best_node.children if c.is_evaluated ]
		
		final_answer = generate_final_answer(
			query=state['query'],
			results_to_sample=10,
			node_lineage=best_node.get_lineage()
		)

		return { 'final_answer': final_answer }

In [110]:
questions = [
	#'What are the closest blue birds near me?',
	'What do I need to go to see an Ostrich?',
	#'What are some easy birds to start off with in my area?',
	'What are some easy birds to start off with?',
	'How many red birds are there in Australia?',
	'What equipment will I need to start bird watching?'
]

retriever = BirdGraphRetriever(max_iterations=5, num_leaves=2)
retriever.invoke(
    #'What is a bright yellow bird I can watch near Boulder, CO?',
	questions[2]
)['final_answer']

2025-12-08 13:06:25 INFO notebook: STARTING ITERATION 0
2025-12-08 13:06:25 INFO notebook: Node not yet expanded, expanding: "Start"
2025-12-08 13:06:25 INFO notebook: 

Expanding node "Start"...
2025-12-08 13:06:28 INFO httpx: HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
2025-12-08 13:06:32 INFO notebook: 

Possible next steps for node:
 - Use the VECTOR INDEX 'fact_embedding_idx' to search for facts related to 'red birds in Australia' by creating a text embedding of the query and performing a cosine similarity search on the Fact nodes' embedding field. This will identify relevant facts about red birds specifically in Australia.
 - First, identify all bird species that are red by searching for birds with 'red' in their name or description, then filter these birds by their distribution to find those in Australia, using the Distribution fact nodes and geographic information in the database.


2025-12-08 13:06:32 INFO notebook: STARTING ITERATION 0
2025-12-08 13:0

'Hey there! Great question about red birds in Australia!\n\nBased on the information I found, there is at least one red bird species mentioned in the context of Australia: the **Black-shouldered Kite**. While this bird is also known as the "Australian Black-shouldered Kite," it\'s worth noting that its name includes "black-shouldered" rather than "red." \n\nHowever, the database search specifically looked for facts mentioning "red" birds in Australia, and this was the only result that matched that criteria. It\'s possible there might be other red-colored birds in Australia that weren\'t captured in this particular dataset or search.\n\nWould you like me to look for more specific information about the Black-shouldered Kite or check for other red-colored birds in Australia? I\'m happy to help you explore further!'

In [11]:
graph = GraphDatabase.driver(
	os.environ['NEO4J_URI'],
	auth=(os.environ['NEO4J_USER'], os.environ['NEO4J_PWD'])
)
database = os.environ['NEO4J_DATABASE']

with graph.session(database=database) as session:
	records = session.execute_read(
		lambda tx: tx.run('MATCH (b:Bird) RETURN b LIMIT 3').data()
	)
	print(records)

[{'b': {'genus': 'Struthio', 'species': 'Camelus', 'name': 'Common Ostrich', 'family': 'Struthionidae', 'url': 'https://birdsoftheworld.org/bow/species/ostric2', 'order': 'Struthioniformes'}}, {'b': {'genus': 'Struthio', 'species': 'Molybdophanes', 'name': 'Somali Ostrich', 'family': 'Struthionidae', 'url': 'https://birdsoftheworld.org/bow/species/ostric3', 'order': 'Struthioniformes'}}, {'b': {'genus': 'Casuarius', 'species': 'Casuarius', 'name': 'Southern Cassowary', 'family': 'Casuariidae', 'url': 'https://birdsoftheworld.org/bow/species/soucas1', 'order': 'Casuariiformes'}}]
