Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 1 addition & 32 deletions neo4j/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,43 +686,12 @@ def fail(md):

try:
with self._acquire(address, timeout) as cx:
_, _, server_version = (cx.server_info.agent or "").partition("/")
log.debug("[#%04X] C: <ROUTING> query=%r", cx.local_port, self.routing_context or {})

if database is None:
database = self.workspace_config.database

# TODO: This logic should be inside the Bolt subclasses, because it can change depending on Bolt Protocol Version.
if cx.PROTOCOL_VERSION == Bolt3.PROTOCOL_VERSION:
if database != DEFAULT_DATABASE:
raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}. Server Agent {!r}.".format(
Bolt3.PROTOCOL_VERSION, database, cx.server_info.agent))
cx.run(
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
{"context": self.routing_context},
mode="r", # Bolt Protocol Version(3, 0) supports mode
on_success=metadata.update,
on_failure=fail,
)
elif cx.PROTOCOL_VERSION in (Bolt4x0.PROTOCOL_VERSION, Bolt4x1.PROTOCOL_VERSION):
if database == DEFAULT_DATABASE:
cx.run(
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
db=SYSTEM_DATABASE,
on_success=metadata.update,
on_failure=fail,
)
else:
cx.run(
"CALL dbms.routing.getRoutingTable($context, $database)",
{"context": self.routing_context, "database": database},
mode="r",
db=SYSTEM_DATABASE,
on_success=metadata.update,
on_failure=fail,
)
cx.run_get_routing_table(on_success=metadata.update, on_failure=fail, database=database)
cx.pull(on_success=metadata.update, on_records=records.extend)
cx.send_all()
cx.fetch_all()
Expand Down
12 changes: 12 additions & 0 deletions neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Version,
READ_ACCESS,
WRITE_ACCESS,
DEFAULT_DATABASE,
)
from neo4j.io._courier import MessageInbox
from neo4j.meta import get_user_agent
Expand Down Expand Up @@ -177,6 +178,17 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
self._append(b"\x10", fields, Response(self, **handlers))
self._is_reset = False

def run_get_routing_table(self, on_success, on_failure, database=DEFAULT_DATABASE):
if database != DEFAULT_DATABASE:
raise ConfigurationError("Database name parameter for selecting database is not supported in Bolt Protocol {!r}. Database name {!r}. Server Agent {!r}.".format(Bolt3.PROTOCOL_VERSION, database, self.server_info.agent))
self.run(
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
{"context": self.routing_context},
mode="r", # Bolt Protocol Version(3, 0) supports mode="r"
on_success=on_success,
on_failure=on_failure,
)

def discard(self, n=-1, qid=-1, **handlers):
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
log.debug("[#%04X] C: DISCARD_ALL", self.local_port)
Expand Down
22 changes: 22 additions & 0 deletions neo4j/io/_bolt4x0.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
Version,
READ_ACCESS,
WRITE_ACCESS,
DEFAULT_DATABASE,
SYSTEM_DATABASE,
)
from neo4j.io._courier import MessageInbox
from neo4j.meta import get_user_agent
Expand Down Expand Up @@ -176,6 +178,26 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
self._append(b"\x10", fields, Response(self, **handlers))
self._is_reset = False

def run_get_routing_table(self, on_success, on_failure, database=DEFAULT_DATABASE):
if database == DEFAULT_DATABASE:
self.run(
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
db=SYSTEM_DATABASE,
on_success=on_success,
on_failure=on_failure,
)
else:
self.run(
"CALL dbms.routing.getRoutingTable($context, $database)",
{"context": self.routing_context, "database": database},
mode="r",
db=SYSTEM_DATABASE,
on_success=on_success,
on_failure=on_failure,
)

def discard(self, n=-1, qid=-1, **handlers):
extra = {"n": n}
if qid != -1:
Expand Down
22 changes: 22 additions & 0 deletions neo4j/io/_bolt4x1.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
Version,
READ_ACCESS,
WRITE_ACCESS,
DEFAULT_DATABASE,
SYSTEM_DATABASE,
)
from neo4j.io._courier import MessageInbox
from neo4j.meta import get_user_agent
Expand Down Expand Up @@ -177,6 +179,26 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
self._append(b"\x10", fields, Response(self, **handlers))
self._is_reset = False

def run_get_routing_table(self, on_success, on_failure, database=DEFAULT_DATABASE):
if database == DEFAULT_DATABASE:
self.run(
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
db=SYSTEM_DATABASE,
on_success=on_success,
on_failure=on_failure,
)
else:
self.run(
"CALL dbms.routing.getRoutingTable($context, $database)",
{"context": self.routing_context, "database": database},
mode="r",
db=SYSTEM_DATABASE,
on_success=on_success,
on_failure=on_failure,
)

def discard(self, n=-1, qid=-1, **handlers):
extra = {"n": n}
if qid != -1:
Expand Down