Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,17 @@ def _verify_routing_connectivity(self):
routing_info = {}
for ix in list(table.routers):
try:
routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0],
database=self._default_workspace_config.database,
timeout=self._default_workspace_config.connection_acquisition_timeout)
routing_info[ix] = self._pool.fetch_routing_info(
address=table.routers[0],
database=self._default_workspace_config.database,
bookmarks=None,
timeout=self._default_workspace_config
.connection_acquisition_timeout
)
except BoltHandshakeError as error:
routing_info[ix] = None

for key, val in routing_info.items():
if val is not None:
return routing_info
raise ServiceUnavailable("Could not connect to any routing servers.")

def update_routing_table(self, database=None):
if database is None:
database = self._pool.workspace_config.database
self._pool.update_routing_table(database=database)
72 changes: 51 additions & 21 deletions neo4j/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,16 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.close()

def route(self, database):
def route(self, database=None, bookmarks=None):
""" Fetch a routing table from the server for the given
`database`. For Bolt 4.3 and above, this appends a ROUTE
message; for earlier versions, a procedure call is made via
the regular Cypher execution mechanism. In all cases, this is
sent to the network, and a response is fetched.

:param database: database for which to fetch a routing table
:param bookmarks: iterable of bookmark values after which this
transaction should begin
:return: dictionary of raw routing data
"""

Expand Down Expand Up @@ -480,12 +482,14 @@ def time_remaining():
raise ClientError("Failed to obtain a connection from pool "
"within {!r}s".format(timeout))

def acquire(self, access_mode=None, timeout=None, database=None):
def acquire(self, access_mode=None, timeout=None, database=None,
bookmarks=None):
""" Acquire a connection to a server that can satisfy a set of parameters.

:param access_mode:
:param timeout:
:param database:
:param bookmarks:
"""

def release(self, *connections):
Expand Down Expand Up @@ -587,7 +591,8 @@ def __init__(self, opener, pool_config, workspace_config, routing_context, addre
def __repr__(self):
return "<{} address={!r}>".format(self.__class__.__name__, self.address)

def acquire(self, access_mode=None, timeout=None, database=None):
def acquire(self, access_mode=None, timeout=None, database=None,
bookmarks=None):
# The access_mode and database is not needed for a direct connection, its just there for consistency.
return self._acquire(self.address, timeout)

Expand Down Expand Up @@ -673,11 +678,13 @@ def create_routing_table(self, database):
if database not in self.routing_tables:
self.routing_tables[database] = RoutingTable(database=database, routers=self.get_default_database_initial_router_addresses())

def fetch_routing_info(self, address, database, timeout):
def fetch_routing_info(self, address, database, bookmarks, timeout):
""" Fetch raw routing info from a given router address.

:param address: router address
:param database: the database name to get routing table for
:param bookmarks: iterable of bookmark values after which the routing
info should be fetched
:param timeout: connection acquisition timeout in seconds

:return: list of routing records, or None if no connection
Expand All @@ -687,7 +694,9 @@ def fetch_routing_info(self, address, database, timeout):
"""
try:
with self._acquire(address, timeout) as cx:
routing_table = cx.route(database or self.workspace_config.database)
routing_table = cx.route(
database or self.workspace_config.database, bookmarks
)
except BoltRoutingError as error:
# Connection was successful, but routing support is
# broken. This may indicate that the routing procedure
Expand All @@ -709,21 +718,23 @@ def fetch_routing_info(self, address, database, timeout):
self.deactivate(address)
return routing_table

def fetch_routing_table(self, *, address, timeout, database):
def fetch_routing_table(self, *, address, timeout, database, bookmarks):
""" Fetch a routing table from a given router address.

:param address: router address
:param timeout: seconds
:param database: the database name
:type: str
:param bookmarks: bookmarks used when fetching routing table

:return: a new RoutingTable instance or None if the given router is
currently unable to provide routing information

:raise neo4j.exceptions.ServiceUnavailable: if no writers are available
:raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable
"""
new_routing_info = self.fetch_routing_info(address, database, timeout)
new_routing_info = self.fetch_routing_info(address, database, bookmarks,
timeout)
if new_routing_info is None:
return None
elif not new_routing_info:
Expand Down Expand Up @@ -752,26 +763,31 @@ def fetch_routing_table(self, *, address, timeout, database):
# At least one of each is fine, so return this table
return new_routing_table

def update_routing_table_from(self, *routers, database=None):
def update_routing_table_from(self, *routers, database=None,
bookmarks=None):
""" Try to update routing tables with the given routers.

:return: True if the routing table is successfully updated,
otherwise False
"""
log.debug("Attempting to update routing table from {}".format(", ".join(map(repr, routers))))
for router in routers:
new_routing_table = self.fetch_routing_table(address=router, timeout=self.pool_config.connection_timeout, database=database)
new_routing_table = self.fetch_routing_table(
address=router, timeout=self.pool_config.connection_timeout,
database=database, bookmarks=bookmarks
)
if new_routing_table is not None:
self.routing_tables[database].update(new_routing_table)
log.debug("[#0000] C: <UPDATE ROUTING TABLE> address={!r} ({!r})".format(router, self.routing_tables[database]))
return True
return False

def update_routing_table(self, *, database):
def update_routing_table(self, *, database, bookmarks):
""" Update the routing table from the first router able to provide
valid routing information.

:param database: The database name
:param bookmarks: bookmarks used when fetching routing table

:raise neo4j.exceptions.ServiceUnavailable:
"""
Expand All @@ -782,15 +798,22 @@ def update_routing_table(self, *, database):
if self.routing_tables[database].missing_fresh_writer():
# TODO: Test this state
has_tried_initial_routers = True
if self.update_routing_table_from(self.first_initial_routing_address, database=database):
if self.update_routing_table_from(
self.first_initial_routing_address, database=database,
bookmarks=bookmarks
):
# Why is only the first initial routing address used?
return

if self.update_routing_table_from(*existing_routers, database=database):
if self.update_routing_table_from(*existing_routers, database=database,
bookmarks=bookmarks):
return

if not has_tried_initial_routers and self.first_initial_routing_address not in existing_routers:
if self.update_routing_table_from(self.first_initial_routing_address, database=database):
if (not has_tried_initial_routers
and self.first_initial_routing_address not in existing_routers):
if self.update_routing_table_from(
self.first_initial_routing_address, database=database,
bookmarks=bookmarks
):
# Why is only the first initial routing address used?
return

Expand All @@ -804,7 +827,8 @@ def update_connection_pool(self, *, database):
if address not in servers:
super(Neo4jPool, self).deactivate(address)

def ensure_routing_table_is_fresh(self, *, access_mode, database):
def ensure_routing_table_is_fresh(self, *, access_mode, database,
bookmarks):
""" Update the routing table if stale.

This method performs two freshness checks, before and after acquiring
Expand All @@ -823,7 +847,7 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database):
return False
with self.refresh_lock:

self.update_routing_table(database=database)
self.update_routing_table(database=database, bookmarks=bookmarks)
self.update_connection_pool(database=database)

for database in list(self.routing_tables.keys()):
Expand All @@ -835,12 +859,14 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database):

return True

def _select_address(self, *, access_mode, database):
def _select_address(self, *, access_mode, database, bookmarks):
from neo4j.api import READ_ACCESS
""" Selects the address with the fewest in-use connections.
"""
self.create_routing_table(database)
self.ensure_routing_table_is_fresh(access_mode=access_mode, database=database)
self.ensure_routing_table_is_fresh(
access_mode=access_mode, database=database, bookmarks=bookmarks
)
log.debug("[#0000] C: <ROUTING TABLE ENSURE FRESH> %r", self.routing_tables)
if access_mode == READ_ACCESS:
addresses = self.routing_tables[database].readers
Expand All @@ -856,7 +882,8 @@ def _select_address(self, *, access_mode, database):
raise WriteServiceUnavailable("No write service currently available")
return choice(addresses_by_usage[min(addresses_by_usage)])

def acquire(self, access_mode=None, timeout=None, database=None):
def acquire(self, access_mode=None, timeout=None, database=None,
bookmarks=None):
if access_mode not in (WRITE_ACCESS, READ_ACCESS):
raise ClientError("Non valid 'access_mode'; {}".format(access_mode))
if not timeout:
Expand All @@ -867,7 +894,10 @@ def acquire(self, access_mode=None, timeout=None, database=None):
while True:
try:
# Get an address for a connection that have the fewest in-use connections.
address = self._select_address(access_mode=access_mode, database=database)
address = self._select_address(
access_mode=access_mode, database=database,
bookmarks=bookmarks
)
log.debug("[#0000] C: <ACQUIRE ADDRESS> database=%r address=%r", database, address)
except (ReadServiceUnavailable, WriteServiceUnavailable) as err:
raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err
Expand Down
5 changes: 4 additions & 1 deletion neo4j/io/_bolt3.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def hello(self):
self.fetch_all()
check_supported_server_product(self.server_info.agent)

def route(self, database):
def route(self, database=None, bookmarks=None):
if database is not None: # default database
raise ConfigurationError("Database name parameter for selecting database is not "
"supported in Bolt Protocol {!r}. Database name {!r}. "
Expand All @@ -176,6 +176,9 @@ def fail(md):
else:
raise BoltRoutingError("Routing support broken on server", self.unresolved_address)

# Ignoring database and bookmarks because there is no multi-db support.
# The bookmarks are only relevant for making sure a previously created
# db exists before querying a routing table for it.
self.run(
"CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering.
{"context": self.routing_context},
Expand Down
12 changes: 9 additions & 3 deletions neo4j/io/_bolt4.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def hello(self):
self.fetch_all()
check_supported_server_product(self.server_info.agent)

def route(self, database):
def route(self, database=None, bookmarks=None):
metadata = {}
records = []

Expand All @@ -174,6 +174,7 @@ def fail(md):
"CALL dbms.routing.getRoutingTable($context)",
{"context": self.routing_context},
mode="r",
bookmarks=bookmarks,
db=SYSTEM_DATABASE,
on_success=metadata.update, on_failure=fail
)
Expand All @@ -182,6 +183,7 @@ def fail(md):
"CALL dbms.routing.getRoutingTable($context, $database)",
{"context": self.routing_context, "database": database},
mode="r",
bookmarks=bookmarks,
db=SYSTEM_DATABASE,
on_success=metadata.update, on_failure=fail
)
Expand Down Expand Up @@ -508,7 +510,7 @@ class Bolt4x3(Bolt4x2):

PROTOCOL_VERSION = Version(4, 3)

def route(self, database):
def route(self, database=None, bookmarks=None):

def fail(md):
from neo4j._exceptions import BoltRoutingError
Expand All @@ -520,7 +522,11 @@ def fail(md):
routing_context = self.routing_context or {}
log.debug("[#%04X] C: ROUTE %r %r", self.local_port, routing_context, database)
metadata = {}
self._append(b"\x66", (routing_context, database),
if bookmarks is None:
bookmarks = []
else:
bookmarks = list(bookmarks)
self._append(b"\x66", (routing_context, bookmarks, database),
response=Response(self, on_success=metadata.update, on_failure=fail))
self.send_all()
self.fetch_all()
Expand Down
7 changes: 6 additions & 1 deletion neo4j/work/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ def _connect(self, access_mode, database):
self._connection.send_all()
self._connection.fetch_all()
self._disconnect()
self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=database)
self._connection = self._pool.acquire(
access_mode=access_mode,
timeout=self._config.connection_acquisition_timeout,
database=database,
bookmarks=self._bookmarks
)

def _disconnect(self):
if self._connection:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def bolt_driver(target, auth):
def neo4j_driver(target, auth):
try:
driver = GraphDatabase.neo4j_driver(target, auth=auth)
driver.update_routing_table()
driver._pool.update_routing_table(database=None, bookmarks=None)
except ServiceUnavailable as error:
if isinstance(error.__cause__, BoltHandshakeError):
pytest.skip(error.args[0])
Expand Down
3 changes: 1 addition & 2 deletions tests/integration/test_autocommit.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ def test_can_run_simple_statement_with_params(session):
assert count == 1


@pytest.mark.skip(reason="BOOKMARK, AttributeError: 'Session' object has no attribute 'last_bookmark'")
def test_autocommit_transactions_use_bookmarks(neo4j_driver):
bookmarks = []
# Generate an initial bookmark
Expand All @@ -66,7 +65,7 @@ def test_autocommit_transactions_use_bookmarks(neo4j_driver):
bookmarks.append(bookmark)
# Propagate into another session
with neo4j_driver.session(bookmarks=bookmarks) as session:
assert list(session.next_bookmarks()) == bookmarks
assert list(session._bookmarks) == bookmarks
session.run("CREATE ()").consume()
bookmark = session.last_bookmark()
assert bookmark is not None
Expand Down
11 changes: 11 additions & 0 deletions tests/stub/scripts/v4x0/router_with_one_bookmark.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
!: BOLT 4
!: AUTO HELLO
!: AUTO GOODBYE
!: AUTO RESET
!: PORT 9001

C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:1"]}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]]
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"}
11 changes: 11 additions & 0 deletions tests/stub/scripts/v4x0/router_with_two_bookmarks.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
!: BOLT 4
!: AUTO HELLO
!: AUTO GOODBYE
!: AUTO RESET
!: PORT 9001

C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:0", "bookmark:1"]}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]]
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"}
10 changes: 8 additions & 2 deletions tests/stub/test_bookmarking.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ def test_should_be_able_to_set_multiple_bookmarks(driver_info, test_script):
"test_scripts",
[
("v3/router.script", "v3/bookmark_chain.script"),
("v4x0/router.script", "v4x0/tx_bookmark_chain.script"),
(
"v4x0/router_with_two_bookmarks.script",
"v4x0/tx_bookmark_chain.script"
),
]
)
def test_should_automatically_chain_bookmarks(driver_info, test_scripts):
Expand All @@ -104,7 +107,10 @@ def test_should_automatically_chain_bookmarks(driver_info, test_scripts):
"test_scripts",
[
("v3/router.script", "v3/bookmark_chain_with_autocommit.script"),
("v4x0/router.script", "v4x0/tx_bookmark_chain_with_autocommit.script"),
(
"v4x0/router_with_one_bookmark.script",
"v4x0/tx_bookmark_chain_with_autocommit.script"
),
]
)
def test_autocommit_transaction_included_in_chain(driver_info, test_scripts):
Expand Down
Loading