diff --git a/neo4j/io/__init__.py b/neo4j/io/__init__.py index 2ed57bc78..716170e6d 100644 --- a/neo4j/io/__init__.py +++ b/neo4j/io/__init__.py @@ -847,7 +847,7 @@ def __init__(self, opener, pool_config, workspace_config, address): :param opener: :param pool_config: :param workspace_config: - :param addresses: + :param address: """ super(Neo4jPool, self).__init__(opener, pool_config, workspace_config) # Each database have a routing table, the default database is a special case. diff --git a/tests/integration/test_neo4j_driver.py b/tests/integration/test_neo4j_driver.py deleted file mode 100644 index 805e03dee..000000000 --- a/tests/integration/test_neo4j_driver.py +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) "Neo4j" -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pytest - -from neo4j import ( - GraphDatabase, - Neo4jDriver, - Version, - READ_ACCESS, - ResultSummary, - ServerInfo, -) -from neo4j.exceptions import ( - ServiceUnavailable, - ConfigurationError, - ClientError, -) -from neo4j._exceptions import ( - BoltHandshakeError, -) -from neo4j.conf import ( - RoutingConfig, -) -from neo4j.io._bolt3 import Bolt3 - -# python -m pytest tests/integration/test_neo4j_driver.py -s -v - - -def test_neo4j_multi_database_test_routing_table_creates_new_if_deleted(neo4j_uri, auth, target, requires_bolt_4x): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_creates_new_if_deleted - with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: - with driver.session(database="system") as session: - result = session.run("DROP DATABASE test IF EXISTS") - result.consume() - result = session.run("SHOW DATABASES") - databases = set() - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j"} - - result = session.run("CREATE DATABASE test") - result.consume() - result = session.run("SHOW DATABASES") - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j", "test"} - with driver.session(database="test") as session: - result = session.run("RETURN 1 AS x") - result.consume() - del driver._pool.routing_tables["test"] - with driver.session(database="test") as session: - result = session.run("RETURN 1 AS x") - result.consume() - with driver.session(database="system") as session: - result = session.run("DROP DATABASE test IF EXISTS") - result.consume() - - -def test_neo4j_multi_database_test_routing_table_updates_if_stale(neo4j_uri, auth, target, requires_bolt_4x): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_updates_if_stale - with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: - with driver.session(database="system") as session: - result = session.run("DROP DATABASE test IF EXISTS") - result.consume() - result = session.run("SHOW DATABASES") - databases = set() - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j"} - - result = session.run("CREATE DATABASE test") - result.consume() - result = session.run("SHOW DATABASES") - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j", "test"} - with driver.session(database="test") as session: - result = session.run("RETURN 1 AS x") - result.consume() - driver._pool.routing_tables["test"].ttl = 0 - old_value = driver._pool.routing_tables["test"].last_updated_time - with driver.session(database="test") as session: - result = session.run("RETURN 1 AS x") - result.consume() - with driver.session(database="system") as session: - result = session.run("DROP DATABASE test IF EXISTS") - result.consume() - assert driver._pool.routing_tables["test"].last_updated_time > old_value - - -def test_neo4j_multi_database_test_routing_table_removes_aged(neo4j_uri, auth, target, requires_bolt_4x): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_multi_database_test_routing_table_removes_aged - with GraphDatabase.driver(neo4j_uri, auth=auth) as driver: - with driver.session(database="system") as session: - result = session.run("DROP DATABASE testa IF EXISTS") - result.consume() - result = session.run("DROP DATABASE testb IF EXISTS") - result.consume() - result = session.run("SHOW DATABASES") - databases = set() - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j"} - - result = session.run("CREATE DATABASE testa") - result.consume() - result = session.run("CREATE DATABASE testb") - result.consume() - result = session.run("SHOW DATABASES") - for record in result: - databases.add(record.get("name")) - assert databases == {"system", "neo4j", "testa", "testb"} - with driver.session(database="testa") as session: - result = session.run("RETURN 1 AS x") - result.consume() - with driver.session(database="testb") as session: - result = session.run("RETURN 1 AS x") - result.consume() - driver._pool.routing_tables["testa"].ttl = 0 - driver._pool.routing_tables["testb"].ttl = -1 * RoutingConfig.routing_table_purge_delay - old_value = driver._pool.routing_tables["testa"].last_updated_time - with driver.session(database="testa") as session: - # This will refresh the routing table for "testa" and the refresh will trigger a cleanup of aged routing tables - result = session.run("RETURN 1 AS x") - result.consume() - with driver.session(database="system") as session: - result = session.run("DROP DATABASE testa IF EXISTS") - result.consume() - result = session.run("DROP DATABASE testb IF EXISTS") - result.consume() - assert driver._pool.routing_tables["testa"].last_updated_time > old_value - assert "testb" not in driver._pool.routing_tables - - -def test_neo4j_driver_fetch_size_config_autocommit_normal_case(neo4j_uri, auth): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_driver_fetch_size_config_autocommit_normal_case - try: - with GraphDatabase.driver(neo4j_uri, auth=auth, user_agent="test") as driver: - assert isinstance(driver, Neo4jDriver) - with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - for record in result: - expected.append(record["x"]) - - assert expected == [1, 2, 3, 4] - except ServiceUnavailable as error: - if error.args[0] == "Server does not support routing": - # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call - pytest.skip(error.args[0]) - elif isinstance(error.__cause__, BoltHandshakeError): - pytest.skip(error.args[0]) - - -def test_neo4j_driver_fetch_size_config_autocommit_consume_case(neo4j_uri, auth): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_driver_fetch_size_config_autocommit_consume_case - try: - with GraphDatabase.driver(neo4j_uri, auth=auth, user_agent="test") as driver: - assert isinstance(driver, Neo4jDriver) - with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session: - result = session.run("UNWIND [1,2,3,4] AS x RETURN x") - result_summary_consume = result.consume() - - assert isinstance(result_summary_consume, ResultSummary) - except ServiceUnavailable as error: - if error.args[0] == "Server does not support routing": - # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call - pytest.skip(error.args[0]) - elif isinstance(error.__cause__, BoltHandshakeError): - pytest.skip(error.args[0]) - - -def test_neo4j_driver_fetch_size_config_explicit_transaction(neo4j_uri, auth): - # python -m pytest tests/integration/test_neo4j_driver.py -s -v -k test_neo4j_driver_fetch_size_config_explicit_transaction - try: - with GraphDatabase.driver(neo4j_uri, auth=auth, user_agent="test") as driver: - assert isinstance(driver, Neo4jDriver) - with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session: - expected = [] - tx = session.begin_transaction() - result = tx.run("UNWIND [1,2,3,4] AS x RETURN x") - for record in result: - expected.append(record["x"]) - tx.commit() - - assert expected == [1, 2, 3, 4] - except ServiceUnavailable as error: - if error.args[0] == "Server does not support routing": - # This is because a single instance Neo4j 3.5 does not have dbms.routing.cluster.getRoutingTable() call - pytest.skip(error.args[0]) - elif isinstance(error.__cause__, BoltHandshakeError): - pytest.skip(error.args[0]) diff --git a/tests/unit/io/test_neo4j_pool.py b/tests/unit/io/test_neo4j_pool.py new file mode 100644 index 000000000..0ce2af492 --- /dev/null +++ b/tests/unit/io/test_neo4j_pool.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +from unittest.mock import Mock + +import pytest + +from ..work import FakeConnection + +from neo4j import READ_ACCESS +from neo4j.addressing import ResolvedAddress +from neo4j.conf import ( + PoolConfig, + RoutingConfig, + WorkspaceConfig +) +from neo4j.io import Neo4jPool + + +@pytest.fixture() +def opener(): + def open_(*_, **__): + connection = FakeConnection() + route_mock = Mock() + route_mock.return_value = [{ + "ttl": 1000, + "servers": [ + {"addresses": ["1.2.3.1:9001"], "role": "ROUTE"}, + { + "addresses": ["1.2.3.10:9010", "1.2.3.11:9011"], + "role": "READ" + }, + { + "addresses": ["1.2.3.20:9020", "1.2.3.21:9021"], + "role": "WRITE" + }, + ], + }] + connection.attach_mock(route_mock, "route") + opener_.connections.append(connection) + return connection + + opener_ = Mock() + opener_.connections = [] + opener_.side_effect = open_ + return opener_ + + +def test_acquires_new_routing_table_if_deleted(opener): + address = ResolvedAddress(("1.2.3.1", 9001), host_name="host") + pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), address) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx) + assert pool.routing_tables.get("test_db") + + del pool.routing_tables["test_db"] + + cx = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx) + assert pool.routing_tables.get("test_db") + + +def test_acquires_new_routing_table_if_stale(opener): + address = ResolvedAddress(("1.2.3.1", 9001), host_name="host") + pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), address) + cx = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx) + assert pool.routing_tables.get("test_db") + + old_value = pool.routing_tables["test_db"].last_updated_time + pool.routing_tables["test_db"].ttl = 0 + + cx = pool.acquire(READ_ACCESS, 30, "test_db", None) + pool.release(cx) + assert pool.routing_tables["test_db"].last_updated_time > old_value + + +def test_removes_old_routing_table(opener): + address = ResolvedAddress(("1.2.3.1", 9001), host_name="host") + pool = Neo4jPool(opener, PoolConfig(), WorkspaceConfig(), address) + cx = pool.acquire(READ_ACCESS, 30, "test_db1", None) + pool.release(cx) + assert pool.routing_tables.get("test_db1") + cx = pool.acquire(READ_ACCESS, 30, "test_db2", None) + pool.release(cx) + assert pool.routing_tables.get("test_db2") + + old_value = pool.routing_tables["test_db1"].last_updated_time + pool.routing_tables["test_db1"].ttl = 0 + pool.routing_tables["test_db2"].ttl = \ + -RoutingConfig.routing_table_purge_delay + + cx = pool.acquire(READ_ACCESS, 30, "test_db1", None) + pool.release(cx) + assert pool.routing_tables["test_db1"].last_updated_time > old_value + assert "test_db2" not in pool.routing_tables + diff --git a/tests/unit/work/__init__.py b/tests/unit/work/__init__.py index e69de29bb..238e61d3f 100644 --- a/tests/unit/work/__init__.py +++ b/tests/unit/work/__init__.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +# Copyright (c) "Neo4j" +# Neo4j Sweden AB [http://neo4j.com] +# +# This file is part of Neo4j. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._fake_connection import ( + FakeConnection, + fake_connection, +) diff --git a/tests/unit/work/_fake_connection.py b/tests/unit/work/_fake_connection.py index de0c49925..4d4550627 100644 --- a/tests/unit/work/_fake_connection.py +++ b/tests/unit/work/_fake_connection.py @@ -20,17 +20,29 @@ import inspect -from unittest.mock import NonCallableMagicMock +from unittest import mock import pytest from neo4j import ServerInfo -class FakeConnection(NonCallableMagicMock): +class FakeConnection(mock.NonCallableMagicMock): callbacks = [] server_info = ServerInfo("127.0.0.1", (4, 3)) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.attach_mock(mock.PropertyMock(return_value=True), "is_reset") + self.attach_mock(mock.Mock(return_value=False), "defunct") + self.attach_mock(mock.Mock(return_value=False), "stale") + self.attach_mock(mock.Mock(return_value=False), "closed") + + def close_side_effect(): + self.closed.return_value = True + + self.attach_mock(mock.Mock(side_effect=close_side_effect), "close") + def fetch_message(self, *args, **kwargs): if self.callbacks: cb = self.callbacks.pop(0) @@ -74,8 +86,6 @@ def callback(): return build_message_handler(name) return parent.__getattr__(name) - def defunct(self): - return False @pytest.fixture