diff --git a/neo4j/__init__.py b/neo4j/__init__.py index d087a54b0..9aa75c0a3 100644 --- a/neo4j/__init__.py +++ b/neo4j/__init__.py @@ -442,9 +442,13 @@ def _verify_routing_connectivity(self): routing_info = {} for ix in list(table.routers): try: - routing_info[ix] = self._pool.fetch_routing_info(address=table.routers[0], - database=self._default_workspace_config.database, - timeout=self._default_workspace_config.connection_acquisition_timeout) + routing_info[ix] = self._pool.fetch_routing_info( + address=table.routers[0], + database=self._default_workspace_config.database, + bookmarks=None, + timeout=self._default_workspace_config + .connection_acquisition_timeout + ) except BoltHandshakeError as error: routing_info[ix] = None @@ -452,8 +456,3 @@ def _verify_routing_connectivity(self): if val is not None: return routing_info raise ServiceUnavailable("Could not connect to any routing servers.") - - def update_routing_table(self, database=None): - if database is None: - database = self._pool.workspace_config.database - self._pool.update_routing_table(database=database) diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 51c3db4a9..934cf4927 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -293,7 +293,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): self.close() - def route(self, database): + def route(self, database=None, bookmarks=None): """ Fetch a routing table from the server for the given `database`. For Bolt 4.3 and above, this appends a ROUTE message; for earlier versions, a procedure call is made via @@ -301,6 +301,8 @@ def route(self, database): sent to the network, and a response is fetched. :param database: database for which to fetch a routing table + :param bookmarks: iterable of bookmark values after which this + transaction should begin :return: dictionary of raw routing data """ @@ -480,12 +482,14 @@ def time_remaining(): raise ClientError("Failed to obtain a connection from pool " "within {!r}s".format(timeout)) - def acquire(self, access_mode=None, timeout=None, database=None): + def acquire(self, access_mode=None, timeout=None, database=None, + bookmarks=None): """ Acquire a connection to a server that can satisfy a set of parameters. :param access_mode: :param timeout: :param database: + :param bookmarks: """ def release(self, *connections): @@ -587,7 +591,8 @@ def __init__(self, opener, pool_config, workspace_config, routing_context, addre def __repr__(self): return "<{} address={!r}>".format(self.__class__.__name__, self.address) - def acquire(self, access_mode=None, timeout=None, database=None): + def acquire(self, access_mode=None, timeout=None, database=None, + bookmarks=None): # The access_mode and database is not needed for a direct connection, its just there for consistency. return self._acquire(self.address, timeout) @@ -673,11 +678,13 @@ def create_routing_table(self, database): if database not in self.routing_tables: self.routing_tables[database] = RoutingTable(database=database, routers=self.get_default_database_initial_router_addresses()) - def fetch_routing_info(self, address, database, timeout): + def fetch_routing_info(self, address, database, bookmarks, timeout): """ Fetch raw routing info from a given router address. :param address: router address :param database: the database name to get routing table for + :param bookmarks: iterable of bookmark values after which the routing + info should be fetched :param timeout: connection acquisition timeout in seconds :return: list of routing records, or None if no connection @@ -687,7 +694,9 @@ def fetch_routing_info(self, address, database, timeout): """ try: with self._acquire(address, timeout) as cx: - routing_table = cx.route(database or self.workspace_config.database) + routing_table = cx.route( + database or self.workspace_config.database, bookmarks + ) except BoltRoutingError as error: # Connection was successful, but routing support is # broken. This may indicate that the routing procedure @@ -709,13 +718,14 @@ def fetch_routing_info(self, address, database, timeout): self.deactivate(address) return routing_table - def fetch_routing_table(self, *, address, timeout, database): + def fetch_routing_table(self, *, address, timeout, database, bookmarks): """ Fetch a routing table from a given router address. :param address: router address :param timeout: seconds :param database: the database name :type: str + :param bookmarks: bookmarks used when fetching routing table :return: a new RoutingTable instance or None if the given router is currently unable to provide routing information @@ -723,7 +733,8 @@ def fetch_routing_table(self, *, address, timeout, database): :raise neo4j.exceptions.ServiceUnavailable: if no writers are available :raise neo4j._exceptions.BoltProtocolError: if the routing information received is unusable """ - new_routing_info = self.fetch_routing_info(address, database, timeout) + new_routing_info = self.fetch_routing_info(address, database, bookmarks, + timeout) if new_routing_info is None: return None elif not new_routing_info: @@ -752,7 +763,8 @@ def fetch_routing_table(self, *, address, timeout, database): # At least one of each is fine, so return this table return new_routing_table - def update_routing_table_from(self, *routers, database=None): + def update_routing_table_from(self, *routers, database=None, + bookmarks=None): """ Try to update routing tables with the given routers. :return: True if the routing table is successfully updated, @@ -760,18 +772,22 @@ def update_routing_table_from(self, *routers, database=None): """ log.debug("Attempting to update routing table from {}".format(", ".join(map(repr, routers)))) for router in routers: - new_routing_table = self.fetch_routing_table(address=router, timeout=self.pool_config.connection_timeout, database=database) + new_routing_table = self.fetch_routing_table( + address=router, timeout=self.pool_config.connection_timeout, + database=database, bookmarks=bookmarks + ) if new_routing_table is not None: self.routing_tables[database].update(new_routing_table) log.debug("[#0000] C: address={!r} ({!r})".format(router, self.routing_tables[database])) return True return False - def update_routing_table(self, *, database): + def update_routing_table(self, *, database, bookmarks): """ Update the routing table from the first router able to provide valid routing information. :param database: The database name + :param bookmarks: bookmarks used when fetching routing table :raise neo4j.exceptions.ServiceUnavailable: """ @@ -782,15 +798,22 @@ def update_routing_table(self, *, database): if self.routing_tables[database].missing_fresh_writer(): # TODO: Test this state has_tried_initial_routers = True - if self.update_routing_table_from(self.first_initial_routing_address, database=database): + if self.update_routing_table_from( + self.first_initial_routing_address, database=database, + bookmarks=bookmarks + ): # Why is only the first initial routing address used? return - - if self.update_routing_table_from(*existing_routers, database=database): + if self.update_routing_table_from(*existing_routers, database=database, + bookmarks=bookmarks): return - if not has_tried_initial_routers and self.first_initial_routing_address not in existing_routers: - if self.update_routing_table_from(self.first_initial_routing_address, database=database): + if (not has_tried_initial_routers + and self.first_initial_routing_address not in existing_routers): + if self.update_routing_table_from( + self.first_initial_routing_address, database=database, + bookmarks=bookmarks + ): # Why is only the first initial routing address used? return @@ -804,7 +827,8 @@ def update_connection_pool(self, *, database): if address not in servers: super(Neo4jPool, self).deactivate(address) - def ensure_routing_table_is_fresh(self, *, access_mode, database): + def ensure_routing_table_is_fresh(self, *, access_mode, database, + bookmarks): """ Update the routing table if stale. This method performs two freshness checks, before and after acquiring @@ -823,7 +847,7 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database): return False with self.refresh_lock: - self.update_routing_table(database=database) + self.update_routing_table(database=database, bookmarks=bookmarks) self.update_connection_pool(database=database) for database in list(self.routing_tables.keys()): @@ -835,12 +859,14 @@ def ensure_routing_table_is_fresh(self, *, access_mode, database): return True - def _select_address(self, *, access_mode, database): + def _select_address(self, *, access_mode, database, bookmarks): from neo4j.api import READ_ACCESS """ Selects the address with the fewest in-use connections. """ self.create_routing_table(database) - self.ensure_routing_table_is_fresh(access_mode=access_mode, database=database) + self.ensure_routing_table_is_fresh( + access_mode=access_mode, database=database, bookmarks=bookmarks + ) log.debug("[#0000] C: %r", self.routing_tables) if access_mode == READ_ACCESS: addresses = self.routing_tables[database].readers @@ -856,7 +882,8 @@ def _select_address(self, *, access_mode, database): raise WriteServiceUnavailable("No write service currently available") return choice(addresses_by_usage[min(addresses_by_usage)]) - def acquire(self, access_mode=None, timeout=None, database=None): + def acquire(self, access_mode=None, timeout=None, database=None, + bookmarks=None): if access_mode not in (WRITE_ACCESS, READ_ACCESS): raise ClientError("Non valid 'access_mode'; {}".format(access_mode)) if not timeout: @@ -867,7 +894,10 @@ def acquire(self, access_mode=None, timeout=None, database=None): while True: try: # Get an address for a connection that have the fewest in-use connections. - address = self._select_address(access_mode=access_mode, database=database) + address = self._select_address( + access_mode=access_mode, database=database, + bookmarks=bookmarks + ) log.debug("[#0000] C: database=%r address=%r", database, address) except (ReadServiceUnavailable, WriteServiceUnavailable) as err: raise SessionExpired("Failed to obtain connection towards '%s' server." % access_mode) from err diff --git a/neo4j/io/_bolt3.py b/neo4j/io/_bolt3.py index e81c6aea9..e2fd362f6 100644 --- a/neo4j/io/_bolt3.py +++ b/neo4j/io/_bolt3.py @@ -159,7 +159,7 @@ def hello(self): self.fetch_all() check_supported_server_product(self.server_info.agent) - def route(self, database): + def route(self, database=None, bookmarks=None): if database is not None: # default database raise ConfigurationError("Database name parameter for selecting database is not " "supported in Bolt Protocol {!r}. Database name {!r}. " @@ -176,6 +176,9 @@ def fail(md): else: raise BoltRoutingError("Routing support broken on server", self.unresolved_address) + # Ignoring database and bookmarks because there is no multi-db support. + # The bookmarks are only relevant for making sure a previously created + # db exists before querying a routing table for it. self.run( "CALL dbms.cluster.routing.getRoutingTable($context)", # This is an internal procedure call. Only available if the Neo4j 3.5 is setup with clustering. {"context": self.routing_context}, diff --git a/neo4j/io/_bolt4.py b/neo4j/io/_bolt4.py index 4968332c3..2e3849fca 100644 --- a/neo4j/io/_bolt4.py +++ b/neo4j/io/_bolt4.py @@ -158,7 +158,7 @@ def hello(self): self.fetch_all() check_supported_server_product(self.server_info.agent) - def route(self, database): + def route(self, database=None, bookmarks=None): metadata = {} records = [] @@ -174,6 +174,7 @@ def fail(md): "CALL dbms.routing.getRoutingTable($context)", {"context": self.routing_context}, mode="r", + bookmarks=bookmarks, db=SYSTEM_DATABASE, on_success=metadata.update, on_failure=fail ) @@ -182,6 +183,7 @@ def fail(md): "CALL dbms.routing.getRoutingTable($context, $database)", {"context": self.routing_context, "database": database}, mode="r", + bookmarks=bookmarks, db=SYSTEM_DATABASE, on_success=metadata.update, on_failure=fail ) @@ -508,7 +510,7 @@ class Bolt4x3(Bolt4x2): PROTOCOL_VERSION = Version(4, 3) - def route(self, database): + def route(self, database=None, bookmarks=None): def fail(md): from neo4j._exceptions import BoltRoutingError @@ -520,7 +522,11 @@ def fail(md): routing_context = self.routing_context or {} log.debug("[#%04X] C: ROUTE %r %r", self.local_port, routing_context, database) metadata = {} - self._append(b"\x66", (routing_context, database), + if bookmarks is None: + bookmarks = [] + else: + bookmarks = list(bookmarks) + self._append(b"\x66", (routing_context, bookmarks, database), response=Response(self, on_success=metadata.update, on_failure=fail)) self.send_all() self.fetch_all() diff --git a/neo4j/work/simple.py b/neo4j/work/simple.py index 0ee46e7b8..47bc13b2a 100644 --- a/neo4j/work/simple.py +++ b/neo4j/work/simple.py @@ -113,7 +113,12 @@ def _connect(self, access_mode, database): self._connection.send_all() self._connection.fetch_all() self._disconnect() - self._connection = self._pool.acquire(access_mode=access_mode, timeout=self._config.connection_acquisition_timeout, database=database) + self._connection = self._pool.acquire( + access_mode=access_mode, + timeout=self._config.connection_acquisition_timeout, + database=database, + bookmarks=self._bookmarks + ) def _disconnect(self): if self._connection: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 952fa0c29..671fe5070 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -299,7 +299,7 @@ def bolt_driver(target, auth): def neo4j_driver(target, auth): try: driver = GraphDatabase.neo4j_driver(target, auth=auth) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) except ServiceUnavailable as error: if isinstance(error.__cause__, BoltHandshakeError): pytest.skip(error.args[0]) diff --git a/tests/integration/test_autocommit.py b/tests/integration/test_autocommit.py index a3c738a49..deabb54eb 100644 --- a/tests/integration/test_autocommit.py +++ b/tests/integration/test_autocommit.py @@ -55,7 +55,6 @@ def test_can_run_simple_statement_with_params(session): assert count == 1 -@pytest.mark.skip(reason="BOOKMARK, AttributeError: 'Session' object has no attribute 'last_bookmark'") def test_autocommit_transactions_use_bookmarks(neo4j_driver): bookmarks = [] # Generate an initial bookmark @@ -66,7 +65,7 @@ def test_autocommit_transactions_use_bookmarks(neo4j_driver): bookmarks.append(bookmark) # Propagate into another session with neo4j_driver.session(bookmarks=bookmarks) as session: - assert list(session.next_bookmarks()) == bookmarks + assert list(session._bookmarks) == bookmarks session.run("CREATE ()").consume() bookmark = session.last_bookmark() assert bookmark is not None diff --git a/tests/stub/scripts/v4x0/router_with_one_bookmark.script b/tests/stub/scripts/v4x0/router_with_one_bookmark.script new file mode 100644 index 000000000..6ec5dc8e3 --- /dev/null +++ b/tests/stub/scripts/v4x0/router_with_one_bookmark.script @@ -0,0 +1,11 @@ +!: BOLT 4 +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO RESET +!: PORT 9001 + +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:1"]} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"} diff --git a/tests/stub/scripts/v4x0/router_with_two_bookmarks.script b/tests/stub/scripts/v4x0/router_with_two_bookmarks.script new file mode 100644 index 000000000..f87befa63 --- /dev/null +++ b/tests/stub/scripts/v4x0/router_with_two_bookmarks.script @@ -0,0 +1,11 @@ +!: BOLT 4 +!: AUTO HELLO +!: AUTO GOODBYE +!: AUTO RESET +!: PORT 9001 + +C: RUN "CALL dbms.routing.getRoutingTable($context)" {"context": {"address": "localhost:9001"}} {"mode": "r", "db": "system", "bookmarks": ["bookmark:0", "bookmark:1"]} + PULL {"n": -1} +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [300, [{"role":"ROUTE","addresses":["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"]},{"role":"READ","addresses":["127.0.0.1:9004","127.0.0.1:9005"]},{"role":"WRITE","addresses":["127.0.0.1:9006"]}]] + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "s", "t_last": 15, "db": "system"} diff --git a/tests/stub/test_bookmarking.py b/tests/stub/test_bookmarking.py index 0aee5757a..ed87daa6b 100644 --- a/tests/stub/test_bookmarking.py +++ b/tests/stub/test_bookmarking.py @@ -83,7 +83,10 @@ def test_should_be_able_to_set_multiple_bookmarks(driver_info, test_script): "test_scripts", [ ("v3/router.script", "v3/bookmark_chain.script"), - ("v4x0/router.script", "v4x0/tx_bookmark_chain.script"), + ( + "v4x0/router_with_two_bookmarks.script", + "v4x0/tx_bookmark_chain.script" + ), ] ) def test_should_automatically_chain_bookmarks(driver_info, test_scripts): @@ -104,7 +107,10 @@ def test_should_automatically_chain_bookmarks(driver_info, test_scripts): "test_scripts", [ ("v3/router.script", "v3/bookmark_chain_with_autocommit.script"), - ("v4x0/router.script", "v4x0/tx_bookmark_chain_with_autocommit.script"), + ( + "v4x0/router_with_one_bookmark.script", + "v4x0/tx_bookmark_chain_with_autocommit.script" + ), ] ) def test_autocommit_transaction_included_in_chain(driver_info, test_scripts): diff --git a/tests/stub/test_routingdriver.py b/tests/stub/test_routingdriver.py index 877c4db72..da0483476 100644 --- a/tests/stub/test_routingdriver.py +++ b/tests/stub/test_routingdriver.py @@ -201,7 +201,7 @@ def test_cannot_discover_servers_on_non_router(driver_info, test_script): with pytest.raises(ServiceUnavailable): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) @pytest.mark.parametrize( @@ -217,7 +217,7 @@ def test_cannot_discover_servers_on_silent_router(driver_info, test_script): with pytest.raises(BoltRoutingError): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) @pytest.mark.parametrize( @@ -232,7 +232,7 @@ def test_should_discover_servers_on_driver_construction(driver_info, test_script with StubCluster(test_script): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) table = driver._pool.routing_tables[DEFAULT_DATABASE] assert table.routers == {('127.0.0.1', 9001), ('127.0.0.1', 9002), ('127.0.0.1', 9003)} @@ -535,7 +535,7 @@ def test_should_error_when_missing_reader(driver_info, test_script): with pytest.raises(BoltRoutingError): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) @pytest.mark.parametrize( @@ -602,7 +602,7 @@ def test_forgets_address_on_service_unavailable_error(driver_info, test_scripts, with StubCluster(*test_scripts): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: pool = driver._pool @@ -638,7 +638,7 @@ def test_forgets_address_on_database_unavailable_error(driver_info, test_scripts with StubCluster(*test_scripts): with GraphDatabase.driver(driver_info["uri_neo4j"], auth=driver_info["auth_token"]) as driver: assert isinstance(driver, Neo4jDriver) - driver.update_routing_table() + driver._pool.update_routing_table(database=None, bookmarks=None) with driver.session(default_access_mode=READ_ACCESS, fetch_size=-1) as session: pool = driver._pool