From 39f9f47efa25f1b226a51a97a4499f3bcf183e8b Mon Sep 17 00:00:00 2001 From: martin bendsoe Date: Fri, 21 Feb 2020 11:13:46 +0100 Subject: [PATCH 1/2] pruned asyncio --- neo4j/__init__.py | 106 --- neo4j/aio/__init__.py | 950 ---------------------- neo4j/aio/__main__.py | 66 -- neo4j/aio/_bolt3.py | 630 -------------- neo4j/aio/_collections.py | 43 - neo4j/aio/_mixins.py | 56 -- neo4j/packstream.py | 52 -- neo4j/work/__init__.py | 15 - neo4j/work/aio.py | 54 -- tests/integration/aio/__init__.py | 0 tests/integration/aio/conftest.py | 44 - tests/integration/aio/test_bolt.py | 510 ------------ tests/integration/aio/test_concurrency.py | 51 -- tests/integration/conftest.py | 23 - tests/integration/test_bolt_driver.py | 12 - tests/requirements.txt | 1 - tests/stub/aio/__init__.py | 0 tests/stub/aio/test_bolt.py | 188 ----- 18 files changed, 2801 deletions(-) delete mode 100644 neo4j/aio/__init__.py delete mode 100644 neo4j/aio/__main__.py delete mode 100644 neo4j/aio/_bolt3.py delete mode 100644 neo4j/aio/_collections.py delete mode 100644 neo4j/aio/_mixins.py delete mode 100644 neo4j/work/aio.py delete mode 100644 tests/integration/aio/__init__.py delete mode 100644 tests/integration/aio/conftest.py delete mode 100644 tests/integration/aio/test_bolt.py delete mode 100644 tests/integration/aio/test_concurrency.py delete mode 100644 tests/stub/aio/__init__.py delete mode 100644 tests/stub/aio/test_bolt.py diff --git a/neo4j/__init__.py b/neo4j/__init__.py index eb91d1cf4..51812e5c7 100644 --- a/neo4j/__init__.py +++ b/neo4j/__init__.py @@ -73,21 +73,6 @@ def driver(cls, uri, *, auth=None, acquire_timeout=None, **config): else: raise ValueError("Unknown URI scheme {!r}".format(parsed.scheme)) - @classmethod - async def async_driver(cls, uri, *, auth=None, loop=None, **config): - """ Create a Neo4j driver that uses async I/O and task-based - concurrency. - """ - parsed = urlparse(uri) - if parsed.scheme == "bolt": - return await cls.async_bolt_driver(parsed.netloc, auth=auth, loop=loop, **config) - elif parsed.scheme == "neo4j" or parsed.scheme == "bolt+routing": - rc = cls._parse_routing_context(parsed.query) - return await cls.async_neo4j_driver(parsed.netloc, auth=auth, routing_context=rc, - loop=loop, **config) - else: - raise ValueError("Unknown URI scheme {!r}".format(parsed.scheme)) - @classmethod def bolt_driver(cls, target, *, auth=None, acquire_timeout=None, **config): """ Create a driver for direct Bolt server access that uses @@ -95,13 +80,6 @@ def bolt_driver(cls, target, *, auth=None, acquire_timeout=None, **config): """ return BoltDriver.open(target, auth=auth, acquire_timeout=acquire_timeout, **config) - @classmethod - async def async_bolt_driver(cls, target, *, auth=None, loop=None, **config): - """ Create a driver for direct Bolt server access that uses - async I/O and task-based concurrency. - """ - return await AsyncBoltDriver.open(target, auth=auth, loop=loop, **config) - @classmethod def neo4j_driver(cls, *targets, auth=None, routing_context=None, acquire_timeout=None, **config): @@ -111,13 +89,6 @@ def neo4j_driver(cls, *targets, auth=None, routing_context=None, acquire_timeout return Neo4jDriver.open(*targets, auth=auth, routing_context=routing_context, acquire_timeout=acquire_timeout, **config) - @classmethod - async def async_neo4j_driver(cls, *targets, auth=None, loop=None, **config): - """ Create a driver for routing-capable Neo4j service access - that uses async I/O and task-based concurrency. - """ - return await AsyncNeo4jDriver.open(*targets, auth=auth, loop=loop, **config) - @classmethod def _parse_routing_context(cls, query): """ Parse the query portion of a URI to generate a routing @@ -254,34 +225,6 @@ def verify_connectivity(self, **config): raise NotImplementedError -class AsyncDriver: - - @classmethod - async def open(cls, uri, **config): - raise NotImplementedError - - def __init__(self, pool): - self._pool = pool - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() - - async def session(self, **config): - """ Create a reactive session. - - :param config: session configuration - (see :class:`.RxSessionConfig` for details) - :returns: new :class:`.RxSession` object - """ - raise NotImplementedError - - async def close(self): - await self._pool.close() - - class BoltDriver(Direct, Driver): """ A :class:`.BoltDriver` is created from a ``bolt`` URI and addresses a single database machine. This may be a standalone server or could be a @@ -380,52 +323,3 @@ def _verify_routing_connectivity(self): if val is not None: return routing_info raise ServiceUnavailable("Could not connect to any routing servers.") - - -class AsyncBoltDriver(Direct, AsyncDriver): - - @classmethod - async def open(cls, target, *, auth=None, loop=None, **config): - from neo4j.aio import BoltPool - from neo4j.work import WorkspaceConfig - address = cls.parse_target(target) - pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, - WorkspaceConfig) - pool = await BoltPool.open(address, auth=auth, loop=loop, **pool_config) - return cls(pool, default_workspace_config) - - def __init__(self, pool, default_workspace_config): - Direct.__init__(self, pool.address) - AsyncDriver.__init__(self, pool) - self._default_workspace_config = default_workspace_config - - async def session(self, **config): - from neo4j.work.aio import AsyncSession, AsyncSessionConfig - session_config = AsyncSessionConfig(self._default_workspace_config, - AsyncSessionConfig.consume(config)) - return AsyncSession(self._pool, session_config) - - -class AsyncNeo4jDriver(Routing, AsyncDriver): - - @classmethod - async def open(cls, *targets, auth=None, routing_context=None, loop=None, **config): - from neo4j.aio import Neo4jPool - from neo4j.work import WorkspaceConfig - addresses = cls.parse_targets(*targets) - pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig, - WorkspaceConfig) - pool = await Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context, - loop=loop, **pool_config) - return cls(pool, default_workspace_config) - - def __init__(self, pool, default_workspace_config): - Routing.__init__(self, pool.routing_table.initial_routers) - AsyncDriver.__init__(self, pool) - self._default_workspace_config = default_workspace_config - - async def session(self, **config): - from neo4j.work.aio import AsyncSession, AsyncSessionConfig - session_config = AsyncSessionConfig(self._default_workspace_config, - AsyncSessionConfig.consume(config)) - return AsyncSession(self._pool, session_config) diff --git a/neo4j/aio/__init__.py b/neo4j/aio/__init__.py deleted file mode 100644 index 56cbe4a59..000000000 --- a/neo4j/aio/__init__.py +++ /dev/null @@ -1,950 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -:mod:`.neo4j.aio` -- Asynchronous Bolt protocol I/O -=================================================== - -The :mod:`.neo4j.aio` module contains classes for working with the Bolt -protocol via Python's asynchronous I/O capabilities. - -Three classes are provided: - -- :class:`.Bolt` - single connection to a server -- :class:`.BoltPool` - pool of connections to a server -- :class:`.Neo4jPool` - pool of connections to a cluster of servers - -""" - - -__all__ = [ - "Bolt", - "BoltPool", - "Neo4jPool", -] - - -from asyncio import ( - IncompleteReadError, - Lock, - StreamReader, - StreamReaderProtocol, - StreamWriter, - get_event_loop, - wait, -) -from collections import deque -from logging import getLogger -from os import strerror -from random import choice -from ssl import SSLError -from sys import platform, version_info -from time import perf_counter - -from neo4j.addressing import Address -from neo4j.aio._collections import WaitingList -from neo4j.aio._mixins import Addressable, Breakable -from neo4j._exceptions import ( - BoltError, - BoltConnectionError, - BoltSecurityError, - BoltConnectionBroken, - BoltHandshakeError, -) -from neo4j.exceptions import ( - RoutingServiceUnavailable, - ReadServiceUnavailable, - WriteServiceUnavailable, -) -from neo4j.api import Version -from neo4j.conf import PoolConfig -from neo4j.meta import version as neo4j_version -from neo4j.routing import RoutingTable - - -log = getLogger(__name__) - - -class Bolt(Addressable, object): - """ Connection to a Bolt-enabled server. - - >>> bolt = await Bolt.open(("localhost", 7687), auth=("neo4j", "password")) - >>> result = await bolt.run("RETURN 'hello, world' AS greeting") - >>> record = await result.single() - >>> record["greeting"] - 'hello, world' - >>> await bolt.close() - - """ - - #: - MAGIC_PREAMBLE = b"\x60\x60\xB0\x17" - - #: True if this instance uses secure communication, false - #: otherwise. - secure = None - - #: As a class attribute, this denotes the version of Bolt handled - #: by that subclass. As an instance attribute, this represents the - #: version of the protocol in use. - protocol_version = () - - # Record of the time at which this connection was opened. - __t_opened = None - - # Handle to the StreamReader object. - __reader = None - - # Handle to the StreamWriter object, which can be used on close. - __writer = None - - # Flag to indicate that the connection is closed - __closed = False - - @classmethod - def default_user_agent(cls): - """ Return the default user agent string for a connection. - """ - template = "neo4j-python/{} Python/{}.{}.{}-{}-{} ({})" - fields = (neo4j_version,) + tuple(version_info) + (platform,) - return template.format(*fields) - - @classmethod - def protocol_handlers(cls, protocol_version=None): - """ Return a dictionary of available Bolt protocol handlers, - keyed by version tuple. If an explicit protocol version is - provided, the dictionary will contain either zero or one items, - depending on whether that version is supported. If no protocol - version is provided, all available versions will be returned. - - :param protocol_version: tuple identifying a specific protocol - version (e.g. (3, 5)) or None - :return: dictionary of version tuple to handler class for all - relevant and supported protocol versions - :raise TypeError: if protocol version is not passed in a tuple - """ - - # Carry out subclass imports locally to avoid circular - # dependency issues. - from neo4j.aio._bolt3 import Bolt3 - - handlers = {bolt.protocol_version: bolt for bolt in [ - # This list can be updated as protocol - # versions are added and removed. - Bolt3, - # TODO: Bolt4 - ]} - - if protocol_version is None: - return handlers - if not isinstance(protocol_version, tuple): - raise TypeError("Protocol version must be specified as a tuple") - return {version: handler - for version, handler in handlers.items() - if version == protocol_version} - - @classmethod - async def open(cls, address, *, auth=None, loop=None, **config): - """ Open a socket connection and perform protocol version - negotiation, in order to construct and return a Bolt client - instance for a supported Bolt protocol version. - - :param address: tuples of host and port, such as - ("127.0.0.1", 7687) - :param auth: - :param loop: - :param config: - :return: instance of a Bolt subclass - :raise BoltConnectionError: if a connection could not be - established - :raise BoltConnectionLost: if an I/O error occurs on the - underlying socket connection - :raise BoltHandshakeError: if handshake completes without a - successful negotiation - :raise TypeError: if any of the arguments provided are passed - as incompatible types - :raise ValueError: if any of the arguments provided are passed - with unsupported values - """ - - # Args - address = Address(address) - if loop is None: - loop = get_event_loop() - config = PoolConfig.consume(config) - - # Connect - reader, writer = await cls._connect(address, loop, config) - - try: - - # Handshake - subclass = await cls._handshake(reader, writer, config.protocol_version) - - # Instantiation - obj = subclass(reader, writer) - obj.secure = bool(config.secure) - assert hasattr(obj, "__ainit__") - await obj.__ainit__(auth) - return obj - - except BoltError: - writer.write_eof() - writer.close() - raise - - @classmethod - async def _connect(cls, address, loop, config): - """ Attempt to establish a TCP connection to the address - provided. - - :param address: - :param loop: - :param config: - :return: a 3-tuple of reader, writer and security settings for - the new connection - :raise BoltConnectionError: if a connection could not be - established - """ - assert isinstance(address, Address) - assert loop is not None - assert isinstance(config, PoolConfig) - connection_args = { - "host": address.host, - "port": address.port, - "family": address.family, - # TODO: other args - } - ssl_context = config.get_ssl_context() - if ssl_context: - connection_args["ssl"] = ssl_context - connection_args["server_hostname"] = address.host - log.debug("[#0000] C: %s", address) - try: - reader = BoltStreamReader(loop=loop) - protocol = StreamReaderProtocol(reader, loop=loop) - transport, _ = await loop.create_connection(lambda: protocol, **connection_args) - writer = BoltStreamWriter(transport, protocol, reader, loop) - except SSLError as err: - log.debug("[#%04X] S: %s (%d %s)", 0, address, - err.errno, strerror(err.errno)) - raise BoltSecurityError("Failed to establish a secure connection", address) from err - except OSError as err: - log.debug("[#%04X] S: %s (%d %s)", 0, address, - err.errno, strerror(err.errno)) - raise BoltConnectionError("Failed to establish a connection", address) from err - else: - local_address = Address(transport.get_extra_info("sockname")) - remote_address = Address(transport.get_extra_info("peername")) - log.debug("[#%04X] S: %s -> %s", - local_address.port_number, local_address, remote_address) - return reader, writer - - @classmethod - async def _handshake(cls, reader, writer, protocol_version): - """ Carry out a Bolt handshake, optionally requesting a - specific protocol version. - - :param reader: - :param writer: - :param protocol_version: - :return: - :raise BoltConnectionLost: if an I/O error occurs on the - underlying socket connection - :raise BoltHandshakeError: if handshake completes without a - successful negotiation - """ - local_address = Address(writer.transport.get_extra_info("sockname")) - remote_address = Address(writer.transport.get_extra_info("peername")) - - handlers = cls.protocol_handlers(protocol_version) - if not handlers: - raise ValueError("No protocol handlers available (requested Bolt %r)", protocol_version) - offered_versions = sorted(handlers.keys(), reverse=True)[:4] - - request_data = cls.MAGIC_PREAMBLE + b"".join( - v.to_bytes() for v in offered_versions).ljust(16, b"\x00") - log.debug("[#%04X] C: %r", local_address.port_number, request_data) - writer.write(request_data) - await writer.drain() - response_data = await reader.readexactly(4) - log.debug("[#%04X] S: %r", local_address.port_number, response_data) - try: - agreed_version = Version.from_bytes(response_data) - except ValueError as err: - writer.close() - raise BoltHandshakeError("Unexpected handshake response %r" % response_data, - remote_address, request_data, response_data) from err - try: - subclass = handlers[agreed_version] - except KeyError: - log.debug("Unsupported Bolt protocol version %s", agreed_version) - raise BoltHandshakeError("Unsupported Bolt protocol version", - remote_address, request_data, response_data) - else: - return subclass - - def __new__(cls, reader, writer): - obj = super().__new__(cls) - obj.__t_opened = perf_counter() - obj.__reader = reader - obj.__writer = writer - Addressable.set_transport(obj, writer.transport) - return obj - - def __repr__(self): - return "" % (self.remote_address, - self.protocol_version) - - async def __ainit__(self, auth): - """ Asynchronous initializer for implementation by subclasses. - - :param auth: - """ - - @property - def age(self): - """ The age of this connection in seconds. - """ - return perf_counter() - self.__t_opened - - @property - def broken(self): - """ Flag to indicate whether this connection has been broken - by the network or remote peer. - """ - return self.__reader.broken or self.__writer.broken - - @property - def closed(self): - """ Flag to indicate whether this connection has been closed - locally.""" - return self.__closed - - async def close(self): - """ Close the connection. - """ - if self.closed: - return - if not self.broken: - log.debug("[#%04X] S: ", self.local_address.port_number) - self.__writer.write_eof() - self.__writer.close() - try: - await self.__writer.wait_closed() - except BoltConnectionBroken: - pass - self.__closed = True - - async def reset(self, force=False): - """ Reset the connection to a clean state. - - By default, a RESET message will only be sent if required, i.e. - if the connection is not already in a clean state. If forced, - this check will be overridden and a RESET will be sent - regardless. - """ - - async def run(self, cypher, parameters=None, discard=False, readonly=False, - bookmarks=None, timeout=None, metadata=None): - """ Run an auto-commit transaction. - - :param cypher: - :param parameters: - :param discard: - :param readonly: - :param bookmarks: - :param timeout: - :param metadata: - :raise BoltTransactionError: if a transaction cannot be carried - out at this time - """ - - async def begin(self, readonly=False, bookmarks=None, - timeout=None, metadata=None): - """ Begin an explicit transaction. - - :param readonly: - :param bookmarks: - :param timeout: - :param metadata: - :return: - """ - - async def run_tx(self, f, args=None, kwargs=None, readonly=False, - bookmarks=None, timeout=None, metadata=None): - """ Run a transaction function and return the return value from - that function. - """ - - async def get_routing_table(self, context=None): - """ Fetch a new routing table. - - :param context: the routing context to use for this call - :return: a new RoutingTable instance or None if the given router is - currently unable to provide routing information - :raise ServiceUnavailable: if no writers are available - :raise BoltProtocolError: if the routing information received is unusable - """ - - -class BoltStreamReader(Addressable, Breakable, StreamReader): - """ Wrapper for asyncio.streams.StreamReader - """ - - def set_transport(self, transport): - Addressable.set_transport(self, transport) - StreamReader.set_transport(self, transport) - - async def readuntil(self, separator=b'\n'): # pragma: no cover - assert False # not used by current implementation - - async def read(self, n=-1): # pragma: no cover - assert False # not used by current implementation - - async def readexactly(self, n): - try: - return await super().readexactly(n) - except IncompleteReadError as err: - message = ("Network read incomplete (received {} of {} " - "bytes)".format(len(err.partial), err.expected)) - log.debug("[#%04X] S: ", self.local_address.port_number) - Breakable.set_broken(self) - raise BoltConnectionBroken(message, self.remote_address) from err - except OSError as err: - log.debug("[#%04X] S: %d %s", err.errno, strerror(err.errno)) - Breakable.set_broken(self) - raise BoltConnectionBroken("Network read failed", self.remote_address) from err - - -class BoltStreamWriter(Addressable, Breakable, StreamWriter): - """ Wrapper for asyncio.streams.StreamWriter - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - Addressable.set_transport(self, self.transport) - - async def drain(self): - try: - await super().drain() - except OSError as err: - log.debug("[#%04X] S: (%s)", self.local_address.port_number, err) - Breakable.set_broken(self) - raise BoltConnectionBroken("Network write failed", self.remote_address) from err - - async def wait_closed(self): - try: - await super().wait_closed() - except AttributeError: # pragma: no cover - # This is a dirty hack for Python 3.6, which didn't include - # 'wait_closed'. The code polls waiting for the stream - # reader inside the protocol to go away which, by the - # implementation of 3.6, occurs on 'connection_lost'. This - # hack is likely safe unless the implementation of 3.6 - # changes in a subsequent patch, and can be removed when - # Python 3.6 support is no longer required. - # - from asyncio import sleep - try: - while self._protocol._stream_reader is not None: - await sleep(0.1) - except AttributeError: - pass - - -class BoltPool: - """ A pool of connections to a single address. - - :param opener: a function to which an address can be passed that - returns an open and ready Bolt connection - :param address: the remote address for which this pool operates - :param max_size: the maximum permitted number of simultaneous - connections that may be owned by this pool, both in-use and - free - :param max_age: the maximum permitted age, in seconds, for - connections to be retained in this pool - """ - - @classmethod - async def open(cls, address, *, auth=None, loop=None, **config): - """ Create a new connection pool, with an option to seed one - or more initial connections. - """ - pool_config = PoolConfig.consume(config) - - def opener(addr): - return Bolt.open(addr, auth=auth, loop=loop, **pool_config) - - pool = cls(loop, opener, pool_config, address) - seeds = [await pool.acquire() for _ in range(pool_config.init_size)] - for seed in seeds: - await pool.release(seed) - return pool - - def __init__(self, loop, opener, config, address): - if loop is None: - self._loop = get_event_loop() - else: - self._loop = loop - self._opener = opener - self._address = Address(address) - self._max_size = config.max_size - self._max_age = config.max_age - self._in_use_list = deque() - self._free_list = deque() - self._waiting_list = WaitingList(loop=self._loop) - - def __repr__(self): - return "<{} addr'{}' [{}{}{}]>".format( - self.__class__.__name__, - self.address, - "|" * len(self._in_use_list), - "." * len(self._free_list), - " " * (self.max_size - self.size), - ) - - def __contains__(self, cx): - return cx in self._in_use_list or cx in self._free_list - - def __len__(self): - return self.size - - @property - def address(self): - """ The remote address for which this pool operates. - """ - return self._address - - @property - def max_size(self): - """ The maximum permitted number of simultaneous connections - that may be owned by this pool, both in-use and free. - """ - return self._max_size - - @max_size.setter - def max_size(self, value): - old_value = self._max_size - self._max_size = value - if value > old_value: - # The maximum size has grown, so new slots have become - # available. Notify any waiting acquirers of this extra - # capacity. - self._waiting_list.notify() - - @property - def max_age(self): - """ The maximum permitted age, in seconds, for connections to - be retained in this pool. - """ - return self._max_age - - @property - def in_use(self): - """ The number of connections in this pool that are currently - in use. - """ - return len(self._in_use_list) - - @property - def size(self): - """ The total number of connections (both in-use and free) - currently owned by this connection pool. - """ - return len(self._in_use_list) + len(self._free_list) - - async def _sanitize(self, cx, *, force_reset): - """ Attempt to clean up a connection, such that it can be - reused. - - If the connection is broken or closed, it can be discarded. - Otherwise, the age of the connection is checked against the - maximum age permitted by this pool, consequently closing it - on expiry. - - Should the connection be neither broken, closed nor expired, - it will be reset (optionally forcibly so) and the connection - object will be returned, indicating success. - """ - if cx.broken or cx.closed: - return None - expired = self.max_age is not None and cx.age > self.max_age - if expired: - await cx.close() - return None - await cx.reset(force=force_reset) - return cx - - async def acquire(self, *, force_reset=False): - """ Acquire a connection from the pool. - - In the simplest case, this will return an existing open - connection, if one is free. If not, and the pool is not full, - a new connection will be created. If the pool is full and no - free connections are available, this will block until a - connection is released, or until the acquire call is cancelled. - - :param force_reset: if true, the connection will be forcibly - reset before being returned; if false, this will only occur - if the connection is not already in a clean state - :return: a Bolt connection object - """ - log.debug("Acquiring connection from pool %r", self) - cx = None - while cx is None or cx.broken or cx.closed: - try: - # Plan A: select a free connection from the pool - cx = self._free_list.popleft() - except IndexError: - if self.size < self.max_size: - # Plan B: if the pool isn't full, open - # a new connection - cx = await self._opener(self.address) - else: - # Plan C: wait for more capacity to become - # available, then try again - log.debug("Joining waiting list") - await self._waiting_list.join() - else: - cx = await self._sanitize(cx, force_reset=force_reset) - self._in_use_list.append(cx) - return cx - - async def release(self, cx, *, force_reset=False): - """ Release a Bolt connection, putting it back into the pool - if the connection is healthy and the pool is not already at - capacity. - - :param cx: the connection to release - :param force_reset: if true, the connection will be forcibly - reset before being released back into the pool; if false, - this will only occur if the connection is not already in a - clean state - :raise ValueError: if the connection is not currently in use, - or if it does not belong to this pool - """ - log.debug("Releasing connection %r", cx) - if cx in self._in_use_list: - self._in_use_list.remove(cx) - if self.size < self.max_size: - # If there is spare capacity in the pool, attempt to - # sanitize the connection and return it to the pool. - cx = await self._sanitize(cx, force_reset=force_reset) - if cx: - # Carry on only if sanitation succeeded. - if self.size < self.max_size: - # Check again if there is still capacity. - self._free_list.append(cx) - self._waiting_list.notify() - else: - # Otherwise, close the connection. - await cx.close() - else: - # If the pool is full, simply close the connection. - await cx.close() - elif cx in self._free_list: - raise ValueError("Connection is not in use") - else: - raise ValueError("Connection does not belong to this pool") - - async def prune(self): - """ Close all free connections. - """ - await self.__close(self._free_list) - - async def close(self): - """ Close all connections immediately. - - This does not permanently disable the connection pool, it - merely shuts down all open connections, including those in - use. Depending on the applications, it may be perfectly - acceptable to re-acquire connections after pool closure, - which will have the implicit affect of reopening the pool. - - To close gracefully, allowing work in progress to continue - until connections are released, use the following sequence - instead: - - pool.max_size = 0 - pool.prune() - - This will force all future connection acquisitions onto the - waiting list, and released connections will be closed instead - of being returned to the pool. - """ - await self.prune() - await self.__close(self._in_use_list) - - async def __close(self, connections): - """ Close all connections in the given list. - """ - closers = deque() - while True: - try: - cx = connections.popleft() - except IndexError: - break - else: - closers.append(cx.close()) - if closers: - await wait(closers, loop=self._loop) - - -class Neo4jPool: - """ Connection pool with routing table. - """ - - @classmethod - async def open(cls, *addresses, auth=None, routing_context=None, loop=None, **config): - pool_config = PoolConfig.consume(config) - - def opener(addr): - return Bolt.open(addr, auth=auth, **pool_config) - - obj = cls(loop, opener, config, addresses, routing_context) - await obj._ensure_routing_table_is_fresh() - return obj - - def __init__(self, loop, opener, config, addresses, routing_context): - if loop is None: - self._loop = get_event_loop() - else: - self._loop = loop - self._opener = opener - self._config = config - self._pools = {} - self._missing_writer = False - self._refresh_lock = Lock(loop=self._loop) - self._routing_context = routing_context - self._max_size_per_host = config.max_size - self._initial_routers = addresses - self._routing_table = RoutingTable(addresses) - self._activate_new_pools_in(self._routing_table) - - def _activate_new_pools_in(self, routing_table): - """ Add pools for addresses that exist in the given routing - table but which don't already have pools. - """ - for address in routing_table.servers(): - if address not in self._pools: - self._pools[address] = BoltPool(self._loop, self._opener, self._config, address) - - async def _deactivate_pools_not_in(self, routing_table): - """ Deactivate any pools that aren't represented in the given - routing table. - """ - for address in self._pools: - if address not in routing_table: - await self._deactivate(address) - - async def _get_routing_table_from(self, *routers): - """ Try to update routing tables with the given routers. - - :return: True if the routing table is successfully updated, - otherwise False - """ - log.debug("Attempting to update routing table from " - "{}".format(", ".join(map(repr, routers)))) - for router in routers: - pool = self._pools[router] - cx = await pool.acquire() - try: - new_routing_table = await cx.get_routing_table(self._routing_context) - except BoltError: - await self._deactivate(router) - else: - num_routers = len(new_routing_table.routers) - num_readers = len(new_routing_table.readers) - num_writers = len(new_routing_table.writers) - - # No writers are available. This likely indicates a temporary state, - # such as leader switching, so we should not signal an error. - # When no writers available, then we flag we are reading in absence of writer - self._missing_writer = (num_writers == 0) - - # No routers - if num_routers == 0: - continue - - # No readers - if num_readers == 0: - continue - - log.debug("Successfully updated routing table from " - "{!r} ({!r})".format(router, self._routing_table)) - return new_routing_table - finally: - await pool.release(cx) - return None - - async def _get_routing_table(self): - """ Update the routing table from the first router able to provide - valid routing information. - """ - # copied because it can be modified - existing_routers = list(self._routing_table.routers) - - has_tried_initial_routers = False - if self._missing_writer: - has_tried_initial_routers = True - rt = await self._get_routing_table_from(self._initial_routers) - if rt: - return rt - - rt = await self._get_routing_table_from(*existing_routers) - if rt: - return rt - - if not has_tried_initial_routers and self._initial_routers not in existing_routers: - rt = await self._get_routing_table_from(self._initial_routers) - if rt: - return rt - - # None of the routers have been successful, so just fail - log.error("Unable to retrieve routing information") - raise RoutingServiceUnavailable("Unable to retrieve routing information") - - async def _ensure_routing_table_is_fresh(self, readonly=False): - """ Update the routing table if stale. - - This method performs two freshness checks, before and after acquiring - the refresh lock. If the routing table is already fresh on entry, the - method exits immediately; otherwise, the refresh lock is acquired and - the second freshness check that follows determines whether an update - is still required. - """ - if self._routing_table.is_fresh(readonly=readonly): - return - async with self._refresh_lock: - if self._routing_table.is_fresh(readonly=readonly): - if readonly: - # if reader is fresh but writers are not, then - # we are reading in absence of writer - self._missing_writer = not self._routing_table.is_fresh(readonly=False) - else: - rt = await self._get_routing_table() - self._activate_new_pools_in(rt) - self._routing_table.update(rt) - await self._deactivate_pools_not_in(rt) - - async def _select_pool(self, readonly=False): - """ Selects the pool with the fewest in-use connections. - """ - await self._ensure_routing_table_is_fresh(readonly=readonly) - if readonly: - addresses = self._routing_table.readers - else: - addresses = self._routing_table.writers - pools = [pool for address, pool in self._pools.items() if address in addresses] - pools_by_usage = {} - for pool in pools: - pools_by_usage.setdefault(pool.in_use, []).append(pool) - if not pools_by_usage: - if readonly: - raise ReadServiceUnavailable("No read service currently available") - else: - raise WriteServiceUnavailable("No write service currently available") - return choice(pools_by_usage[min(pools_by_usage)]) - - async def acquire(self, *, readonly=False, force_reset=False): - """ Acquire a connection to a server that can satisfy a set of parameters. - - :param readonly: true if a readonly connection is required, - otherwise false - :param force_reset: - """ - while True: - pool = await self._select_pool(readonly=readonly) - try: - cx = await pool.acquire(force_reset=force_reset) - except BoltError: - await self._deactivate(pool.address) - else: - if not readonly: - # If we're not acquiring a connection as - # readonly, then intercept NotALeader and - # ForbiddenOnReadOnlyDatabase errors to - # invalidate the routing table. - from neo4j.exceptions import ForbiddenOnReadOnlyDatabaseError, NotALeaderError - - def handler(failure): - """ Invalidate the routing table before raising the failure. - """ - log.debug("[#0000] C: Invalidating routing table") - self._routing_table.ttl = 0 - raise failure - - cx.set_failure_handler(NotALeaderError, handler) - cx.set_failure_handler(ForbiddenOnReadOnlyDatabaseError, handler) - return cx - - async def release(self, connection, *, force_reset=False): - """ Release a connection back into the pool. - This method is thread safe. - """ - for pool in self._pools.values(): - try: - await pool.release(connection, force_reset=force_reset) - except ValueError: - pass - else: - # Unhook any custom error handling and exit. - from neo4j.exceptions import ForbiddenOnReadOnlyDatabaseError, NotALeaderError - connection.del_failure_handler(NotALeaderError) - connection.del_failure_handler(ForbiddenOnReadOnlyDatabaseError) - break - else: - raise ValueError("Connection does not belong to this pool") - - async def _deactivate(self, address): - """ Deactivate an address from the connection pool, - if present, remove from the routing table and also closing - all idle connections to that address. - """ - log.debug("[#0000] C: Deactivating address %r", address) - # We use `discard` instead of `remove` here since the former - # will not fail if the address has already been removed. - self._routing_table.routers.discard(address) - self._routing_table.readers.discard(address) - self._routing_table.writers.discard(address) - log.debug("[#0000] C: table=%r", self._routing_table) - try: - pool = self._pools.pop(address) - except KeyError: - pass # assume the address has already been removed - else: - pool.max_size = 0 - await pool.prune() - - async def close(self, force=False): - """ Close all connections and empty the pool. If forced, in-use - connections will be closed immediately; if not, they will - remain open until released. - """ - pools = dict(self._pools) - self._pools.clear() - for address, pool in pools.items(): - if force: - await pool.close() - else: - pool.max_size = 0 - await pool.prune() diff --git a/neo4j/aio/__main__.py b/neo4j/aio/__main__.py deleted file mode 100644 index 65b5d931c..000000000 --- a/neo4j/aio/__main__.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from argparse import ArgumentParser -from asyncio import get_event_loop -from getpass import getpass -from os.path import basename -from sys import argv - -from neo4j.addressing import Address -from neo4j.aio import Bolt -from neo4j.debug import watch - - -async def a_main(prog): - parser = ArgumentParser(prog=prog) - parser.add_argument("cypher", - help="Cypher query to execute") - parser.add_argument("-a", "--auth", metavar="USER:PASSWORD", default="", - help="user name and password") - parser.add_argument("-s", "--server-addr", metavar="HOST:PORT", default=":7687", - help="address of server") - parser.add_argument("-v", "--verbose", action="store_true", - help="increase output verbosity") - parsed = parser.parse_args() - if parsed.verbose: - watch("neo4j") - addr = Address.parse(parsed.server_addr) - user, _, password = parsed.auth.partition(":") - if not password: - password = getpass() - auth = (user or "neo4j", password) - bolt = await Bolt.open(addr, auth=auth) - try: - result = await bolt.run(parsed.cypher) - print("\t".join(await result.fields())) - async for record in result: - print("\t".join(map(repr, record))) - finally: - await bolt.close() - - -def main(prog=None): - get_event_loop().run_until_complete(a_main(prog or basename(argv[0]))) - - -if __name__ == "__main__": - main("python -m neo4j.bolt") diff --git a/neo4j/aio/_bolt3.py b/neo4j/aio/_bolt3.py deleted file mode 100644 index 8e780fc9c..000000000 --- a/neo4j/aio/_bolt3.py +++ /dev/null @@ -1,630 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from collections import deque -from inspect import iscoroutinefunction -from logging import getLogger -from warnings import warn - -from neo4j.aio import Bolt -from neo4j.aio._mixins import Addressable -from neo4j.api import Bookmark, Version -from neo4j.data import Record -from neo4j._exceptions import ( - BoltError, - BoltFailure, - BoltConnectionBroken, - BoltConnectionClosed, - BoltTransactionError, - BoltRoutingError, -) -from neo4j.exceptions import Neo4jError -from neo4j.packstream import PackStream, Structure -from neo4j.routing import RoutingTable - - -log = getLogger("neo4j") - - -class IgnoredType: - - def __new__(cls): - return Ignored - - def __bool__(self): - return False - - def __repr__(self): - return "Ignored" - - -Ignored = object.__new__(IgnoredType) - - -class Summary: - - def __init__(self, metadata, success): - self._metadata = metadata - self._success = bool(success) - - def __bool__(self): - return self._success - - def __repr__(self): - return "<{} {}>".format( - self.__class__.__name__, - " ".join("{}={!r}".format(k, v) for k, v in sorted(self._metadata.items()))) - - @property - def metadata(self): - return self._metadata - - @property - def success(self): - return self._success - - -class Response: - """ Collector for response data, consisting of an optional - sequence of records and a mandatory summary. - """ - - result = None - - def __init__(self, courier): - self._courier = courier - self._records = deque() - self._summary = None - - def put_record(self, record): - """ Append a record to the end of the record deque. - - :param record: - """ - self._records.append(record) - - async def get_record(self): - """ Fetch and return the next record from the top of the - record deque. - - :return: - """ - - # R = has records - # S = has summary - # - # R=0, S=0 - fetch, check again - # R=1, S=0 - pop - # R=0, S=1 - raise stop - # R=1, S=1 - pop - while True: - try: - return self._records.popleft() - except IndexError: - if self._summary is None: - await self._courier.fetch(stop=lambda: bool(self._records)) - else: - return None - - def put_summary(self, summary): - """ Update the stored summary value. - - :param summary: - """ - self._summary = summary - - async def get_summary(self): - """ Fetch and return the summary value. - - :return: - """ - await self._courier.fetch(stop=lambda: self._summary is not None) - return self._summary - - -class Result: - """ The result of a Cypher execution. - """ - - def __init__(self, tx, head, body): - self._tx = tx - self._head = head - self._body = body - self._head.result = self - self._body.result = self - - def __aiter__(self): - return self - - async def __anext__(self): - try: - values = await self._body.get_record() - except BoltFailure as failure: - # FAILURE - await self._tx.fail(failure) - else: - # RECORD or end of records - if values is None: - raise StopAsyncIteration - else: - return Record(zip(await self.fields(), values)) - - @property - def transaction(self): - return self._tx - - async def get_header(self): - try: - header = await self._head.get_summary() - except BoltFailure as failure: - # FAILURE - await self._tx.fail(failure) - else: - # SUCCESS or IGNORED - return header - - async def consume(self): - try: - footer = await self._body.get_summary() - except BoltFailure as failure: - # FAILURE - await self._tx.fail(failure) - else: - # SUCCESS or IGNORED - # The return value of this function can be used as a - # predicate, since SUCCESS will return a Summary that - # coerces to True, and IGNORED will return Ignored, which - # coerces to False. - return footer - - async def fields(self): - header = await self.get_header() - return header.metadata.get("fields", ()) - - async def single(self): - """ Obtain the next and only remaining record from this result. - - A warning is generated if more than one record is available but - the first of these is still returned. - - :return: the next :class:`.Record` or :const:`None` if no - records remain - :warn: if more than one record is available - """ - records = [record async for record in self] - size = len(records) - if size == 0: - return None - if size != 1: - warn("Expected a result with a single record, but this result contains %d" % size) - return records[0] - - -class Transaction: - - @classmethod - async def begin(cls, courier, readonly=False, bookmarks=None, - timeout=None, metadata=None): - """ Begin an explicit transaction. - """ - tx = cls(courier, readonly=readonly, bookmarks=bookmarks, timeout=timeout, - metadata=metadata) - tx._autocommit = False - courier.write_begin(tx._extras) - if bookmarks: - # If bookmarks are passed, BEGIN should sync to the - # network. This ensures that any failures that occur are - # raised at an appropriate time, rather than later in the - # transaction. Conversely, if no bookmarks are passed, it - # should be fine to sync lazily. - await courier.send() - await courier.fetch() - return tx - - def _add_extra(self, key, coercion=lambda x: x, **values): - for name, value in values.items(): - if value: - try: - self._extras[key] = coercion(value) - except TypeError: - raise TypeError("Unsupported type for {} {!r}".format(name, value)) - - def __init__(self, courier, readonly=False, bookmarks=None, timeout=None, metadata=None): - """ - - :param courier: - :param readonly: if true, the transaction should be readonly, - otherwise it should have full read/write access - :param bookmarks: iterable of bookmarks which must all have - been seen by the server before this transaction begins - :param timeout: a transaction execution timeout, passed to the - database kernel on execution - :param metadata: application metadata tied to this transaction; - generally used for audit purposes - """ - self._courier = courier - self._autocommit = True - self._closed = False - self._failure = None - self._extras = {} - self._add_extra("mode", lambda x: "R" if x else None, readonly=readonly) - self._add_extra("bookmarks", list, bookmarks=bookmarks) - self._add_extra("tx_timeout", lambda x: int(1000 * x), timeout=timeout) - self._add_extra("tx_metadata", dict, metadata=metadata) - - @property - def autocommit(self): - return self._autocommit - - @property - def closed(self): - return self._closed - - @property - def failure(self): - return self._failure - - async def run(self, cypher, parameters=None, discard=False): - self._assert_open() - head = self._courier.write_run(cypher, dict(parameters or {}), - self._extras if self._autocommit else {}) - if discard: - body = self._courier.write_discard_all() - else: - body = self._courier.write_pull_all() - if self._autocommit: - try: - await self._courier.send() - finally: - self._closed = True - return Result(self, head, body) - - async def evaluate(self, cypher, parameters=None, key=0, default=None): - """ Run Cypher and return a single value (by default the first - value) from the first and only record. - """ - result = await self.run(cypher, parameters) - record = await result.single() - return record.value(key, default) - - async def commit(self): - self._assert_open() - if self._autocommit: - raise BoltTransactionError("Cannot explicitly commit an auto-commit " - "transaction", self._courier.remote_address) - try: - commit = self._courier.write_commit() - await self._courier.send() - await self._courier.fetch() - summary = await commit.get_summary() - return Bookmark(summary.metadata.get("bookmark")) - finally: - self._closed = True - - async def rollback(self): - self._assert_open() - if self._autocommit: - raise BoltTransactionError("Cannot explicitly rollback an auto-commit " - "transaction", self._courier.remote_address) - try: - self._courier.write_rollback() - await self._courier.send() - await self._courier.fetch() - finally: - self._closed = True - - async def fail(self, failure): - """ Called internally with a BoltFailure object when a FAILURE - message is received. This will reset the connection, close the - transaction and raise the failure exception. - - :param failure: - :return: - """ - if not self._failure: - self._courier.write_reset() - await self._courier.send() - await self._courier.fetch() - self._closed = True - self._failure = failure - raise self._failure - - def _assert_open(self): - if self.closed: - raise BoltTransactionError("Transaction is already " - "closed", self._courier.remote_address) - - -class Courier(Addressable, object): - - def __init__(self, reader, writer, on_failure): - self._stream = PackStream(reader, writer) - self._fail = on_failure - self._requests_to_send = 0 - self._responses = deque() - Addressable.set_transport(self, writer.transport) - - @property - def requests_to_send(self): - return self._requests_to_send - - @property - def responses_to_fetch(self): - return len(self._responses) - - @property - def connection_id(self): - return self.local_address.port_number - - def write_hello(self, extras): - logged_extras = dict(extras) - if "credentials" in logged_extras: - logged_extras["credentials"] = "*******" - log.debug("[#%04X] C: HELLO %r", self.connection_id, logged_extras) - return self._write(Structure(b"\x01", extras)) - - def write_goodbye(self): - log.debug("[#%04X] C: GOODBYE", self.connection_id) - return self._write(Structure(b"\x02")) - - def write_reset(self): - log.debug("[#%04X] C: RESET", self.connection_id) - return self._write(Structure(b"\x0F")) - - def write_run(self, cypher, parameters, extras): - parameters = dict(parameters or {}) - extras = dict(extras or {}) - log.debug("[#%04X] C: RUN %r %r %r", self.connection_id, cypher, parameters, extras) - return self._write(Structure(b"\x10", cypher, parameters, extras)) - - def write_begin(self, extras): - log.debug("[#%04X] C: BEGIN %r", self.connection_id, extras) - return self._write(Structure(b"\x11", extras)) - - def write_commit(self): - log.debug("[#%04X] C: COMMIT", self.connection_id) - return self._write(Structure(b"\x12")) - - def write_rollback(self): - log.debug("[#%04X] C: ROLLBACK", self.connection_id) - return self._write(Structure(b"\x13")) - - def write_discard_all(self): - log.debug("[#%04X] C: DISCARD_ALL", self.connection_id) - return self._write(Structure(b"\x2F")) - - def write_pull_all(self): - log.debug("[#%04X] C: PULL_ALL", self.connection_id) - return self._write(Structure(b"\x3F")) - - def _write(self, message): - self._stream.write_message(message) - self._requests_to_send += 1 - response = Response(self) - self._responses.append(response) - return response - - async def send(self): - log.debug("[#%04X] C: ", self.connection_id) - await self._stream.drain() - self._requests_to_send = 0 - - async def fetch(self, stop=lambda: None): - """ Fetch zero or more messages, stopping when no more pending - responses need to be populated, when the stop condition - is fulfilled, or when a failure is encountered (for which an - exception will be raised). - - :param stop: - """ - while self.responses_to_fetch and not stop(): - fetched = await self._read() - if isinstance(fetched, list): - self._responses[0].put_record(fetched) - else: - response = self._responses.popleft() - response.put_summary(fetched) - if isinstance(fetched, Summary) and not fetched.success: - code = fetched.metadata.get("code") - message = fetched.metadata.get("message") - failure = BoltFailure(message, self.remote_address, code, response) - self._fail(failure) - - async def _read(self): - message = await self._stream.read_message() - if not isinstance(message, Structure): - # TODO: log, signal defunct and close - raise BoltError("Received illegal message " - "type {}".format(type(message)), self.remote_address) - if message.tag == b"\x70": - metadata = message.fields[0] - log.debug("[#%04X] S: SUCCESS %r", self.connection_id, metadata) - return Summary(metadata, success=True) - elif message.tag == b"\x71": - data = message.fields[0] - log.debug("[#%04X] S: RECORD %r", self.connection_id, data) - return data - elif message.tag == b"\x7E": - log.debug("[#%04X] S: IGNORED", self.connection_id) - return Ignored - elif message.tag == b"\x7F": - metadata = message.fields[0] - log.debug("[#%04X] S: FAILURE %r", self.connection_id, metadata) - return Summary(metadata, success=False) - else: - # TODO: log, signal defunct and close - raise BoltError("Received illegal message structure " - "tag {}".format(message.tag), self.remote_address) - - -class Bolt3(Bolt): - - protocol_version = Version(3, 0) - - server_agent = None - - connection_id = None - - def __init__(self, reader, writer): - self._courier = Courier(reader, writer, self.fail) - self._tx = None - self._failure_handlers = {} - - async def __ainit__(self, auth): - args = { - "scheme": "none", - "user_agent": self.default_user_agent(), - } - if auth: - args.update({ - "scheme": "basic", - "principal": auth[0], # TODO - "credentials": auth[1], # TODO - }) - response = self._courier.write_hello(args) - await self._courier.send() - summary = await response.get_summary() - if summary.success: - self.server_agent = summary.metadata.get("server") - self.connection_id = summary.metadata.get("connection_id") - # TODO: verify genuine product - else: - await super().close() - code = summary.metadata.get("code") - message = summary.metadata.get("message") - failure = BoltFailure(message, self.remote_address, code, response) - self.fail(failure) - - async def close(self): - if self.closed: - return - if not self.broken: - self._courier.write_goodbye() - try: - await self._courier.send() - except BoltConnectionBroken: - pass - await super().close() - - @property - def ready(self): - """ If true, this flag indicates that there is no transaction - in progress, and one may be started. - """ - return not self._tx or self._tx.closed - - def _assert_open(self): - if self.closed: - raise BoltConnectionClosed("Connection has been closed", self.remote_address) - if self.broken: - raise BoltConnectionBroken("Connection is broken", self.remote_address) - - def _assert_ready(self): - self._assert_open() - if not self.ready: - # TODO: add transaction identifier - raise BoltTransactionError("A transaction is already in progress on " - "this connection", self.remote_address) - - async def reset(self, force=False): - self._assert_open() - if force or not self.ready: - self._courier.write_reset() - if self._courier.requests_to_send: - await self._courier.send() - if self._courier.responses_to_fetch: - await self._courier.fetch() - - async def run(self, cypher, parameters=None, discard=False, readonly=False, - bookmarks=None, timeout=None, metadata=None): - self._assert_ready() - self._tx = Transaction(self._courier, readonly=readonly, bookmarks=bookmarks, - timeout=timeout, metadata=metadata) - return await self._tx.run(cypher, parameters, discard=discard) - - async def begin(self, readonly=False, bookmarks=None, - timeout=None, metadata=None): - self._assert_ready() - self._tx = await Transaction.begin(self._courier, readonly=readonly, bookmarks=bookmarks, - timeout=timeout, metadata=metadata) - return self._tx - - async def run_tx(self, f, args=None, kwargs=None, readonly=False, - bookmarks=None, timeout=None, metadata=None): - self._assert_open() - tx = await self.begin(readonly=readonly, bookmarks=bookmarks, - timeout=None, metadata=metadata) - if not iscoroutinefunction(f): - raise TypeError("Transaction function must be awaitable") - try: - value = await f(tx, *(args or ()), **(kwargs or {})) - except Exception: - await tx.rollback() - raise - else: - await tx.commit() - return value - - async def get_routing_table(self, context=None): - try: - result = await self.run("CALL dbms.cluster.routing.getRoutingTable($context)", - {"context": dict(context or {})}) - record = await result.single() - if not record: - raise BoltRoutingError("Routing table call returned " - "no data", self.remote_address) - assert isinstance(record, Record) - servers = record["servers"] - ttl = record["ttl"] - log.debug("[#%04X] S: servers=%r ttl=%r", - self.local_address.port_number, servers, ttl) - return RoutingTable.parse_routing_info(servers, ttl) - except BoltFailure as error: - if error.title == "ProcedureNotFound": - raise BoltRoutingError("Server does not support " - "routing", self.remote_address) from error - else: - raise - except ValueError as error: - raise BoltRoutingError("Invalid routing table", self.remote_address) from error - - def fail(self, failure): - t = type(failure) - handler = self.get_failure_handler(t) - if callable(handler): - # TODO: fix "requires two params, only one was given" error - handler(failure) - else: - # TODO: fix correct error logic after error and exception refactoring - # raise failure # This is a BoltFailure - raise Neo4jError.hydrate(message=str(failure), code=failure.code) - - def get_failure_handler(self, cls): - return self._failure_handlers.get(cls) - - def set_failure_handler(self, cls, f): - self._failure_handlers[cls] = f - - def del_failure_handler(self, cls): - try: - del self._failure_handlers[cls] - except KeyError: - pass diff --git a/neo4j/aio/_collections.py b/neo4j/aio/_collections.py deleted file mode 100644 index 16fed54b1..000000000 --- a/neo4j/aio/_collections.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from asyncio import Event -from collections import deque - - -class WaitingList: - - def __init__(self, *, loop=None): - self._loop = loop - self._wait_list = deque() - - async def join(self): - event = Event(loop=self._loop) - self._wait_list.append(event) - await event.wait() - - def notify(self): - try: - event = self._wait_list.popleft() - except IndexError: - pass - else: - event.set() diff --git a/neo4j/aio/_mixins.py b/neo4j/aio/_mixins.py deleted file mode 100644 index 280cb0dad..000000000 --- a/neo4j/aio/_mixins.py +++ /dev/null @@ -1,56 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from neo4j.addressing import Address - - -class Addressable: - """ Mixin for providing access to local and remote address - properties via an asyncio.Transport object. - """ - - __transport = None - - def set_transport(self, transport): - self.__transport = transport - - @property - def local_address(self): - return Address(self.__transport.get_extra_info("sockname")) - - @property - def remote_address(self): - return Address(self.__transport.get_extra_info("peername")) - - -class Breakable: - """ Mixin for objects that can break, resulting in an unusable, - unrecoverable state. - """ - - __broken = False - - def set_broken(self): - self.__broken = True - - @property - def broken(self): - return self.__broken diff --git a/neo4j/packstream.py b/neo4j/packstream.py index 680ba6ea5..42d6afcc6 100644 --- a/neo4j/packstream.py +++ b/neo4j/packstream.py @@ -607,55 +607,3 @@ def receive(self, sock, n_bytes): if n == 0: raise OSError("No data") self.used += n - - -class PackStream: - """ Asynchronous chunked message reader/writer for PackStream - messaging. - """ - - def __init__(self, reader, writer): - self._reader = reader - self._writer = writer - - async def read_message(self): - """ Read a chunked message. - - :return: - """ - data = [] - more = True - while more: - chunk_header = await self._reader.readexactly(2) - chunk_size, = struct_unpack(">H", chunk_header) - if chunk_size: - chunk_data = await self._reader.readexactly(chunk_size) - data.append(chunk_data) - else: - more = False - buffer = UnpackableBuffer(b"".join(data)) - unpacker = Unpacker(buffer) - return unpacker.unpack() - - def write_message(self, message): - """ Write a chunked message. - - :param message: - :return: - """ - if not isinstance(message, Structure): - raise TypeError("Message must be a Structure instance") - b = BytesIO() - packer = Packer(b) - packer.pack(message) - data = b.getvalue() - # TODO: multi-chunk messages - header = bytearray(divmod(len(data), 0x100)) - self._writer.write(header + data + b"\x00\x00") - - async def drain(self): - """ Flush the writer. - - :return: - """ - await self._writer.drain() diff --git a/neo4j/work/__init__.py b/neo4j/work/__init__.py index d48320bb3..df981b193 100644 --- a/neo4j/work/__init__.py +++ b/neo4j/work/__init__.py @@ -89,21 +89,6 @@ def close(self): self._disconnect(sync=True) -class AsyncWorkspace(Workspace): - - def __init__(self, pool, config): - super().__init__(pool, config) - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() - - async def close(self): - super().close() - - class WorkspaceError(Exception): pass diff --git a/neo4j/work/aio.py b/neo4j/work/aio.py deleted file mode 100644 index 03b816695..000000000 --- a/neo4j/work/aio.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from neo4j.conf import DeprecatedAlias -from neo4j.work import AsyncWorkspace, WorkspaceConfig - - -class AsyncSessionConfig(WorkspaceConfig): - - #: - acquire_timeout = 30.0 # seconds - - #: - bookmarks = () - - #: - default_access_mode = "WRITE" - access_mode = DeprecatedAlias("default_access_mode") - - -class AsyncSession(AsyncWorkspace): - - # The set of bookmarks after which the next - # :class:`.Transaction` should be carried out. - _bookmarks_in = None - - # The bookmark returned from the last commit. - _bookmark_out = None - - def __init__(self, pool, config): - super().__init__(pool, config) - assert isinstance(config, AsyncSessionConfig) - self._bookmarks_in = tuple(config.bookmarks) - - async def run(self, cypher, parameters=None, **kwparameters): - raise NotImplementedError diff --git a/tests/integration/aio/__init__.py b/tests/integration/aio/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/integration/aio/conftest.py b/tests/integration/aio/conftest.py deleted file mode 100644 index 30d7099d5..000000000 --- a/tests/integration/aio/conftest.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from neo4j.aio import Bolt, BoltPool -from neo4j._exceptions import BoltHandshakeError - - -@pytest.fixture -async def bolt(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - yield bolt - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@pytest.fixture -async def bolt_pool(address, auth): - try: - pool = await BoltPool.open(address, auth=auth) - yield pool - await pool.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) diff --git a/tests/integration/aio/test_bolt.py b/tests/integration/aio/test_bolt.py deleted file mode 100644 index 594f3a05b..000000000 --- a/tests/integration/aio/test_bolt.py +++ /dev/null @@ -1,510 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from asyncio import sleep, wait, wait_for, TimeoutError - -import pytest -from pytest import mark, raises - - -from neo4j import PoolConfig -from neo4j.aio import Bolt, BoltPool -from neo4j._exceptions import ( - BoltConnectionError, - BoltTransactionError, - BoltHandshakeError, -) -from neo4j.exceptions import ClientError - -# python -m pytest tests/integration/aio/test_bolt.py -s -v - -@mark.asyncio -async def test_good_connectivity(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - assert bolt.protocol_version - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - - -@mark.asyncio -async def test_connectivity_over_ipv4(address, auth): - try: - bolt = await Bolt.open(("127.0.0.1", address[1]), auth=auth) - assert bolt.protocol_version - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - -# @mark.asyncio -# async def test_connectivity_over_ipv6(address, auth): -# bolt = await Bolt.open(("::1", address[1], 0, 0), auth=auth) -# assert bolt.protocol_version -# await bolt.close() - - -@mark.asyncio -async def test_bad_connectivity(address, auth): - with raises(BoltConnectionError) as e: - _ = await Bolt.open(("localhost", 9999), auth=auth) - assert e.value.address == ("localhost", 9999) - - -@mark.asyncio -async def test_security_none(address, auth): - try: - bolt = await Bolt.open(address, auth=auth, secure=None) - assert not bolt.secure - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - -@mark.asyncio -async def test_security_false(address, auth): - try: - bolt = await Bolt.open(address, auth=auth, secure=False) - assert not bolt.secure - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -# TODO: re-enable when we have a way of testing against full certs -# @mark.asyncio -# async def test_security_true(address, auth): -# bolt = await Bolt.open(address, auth=auth, secure=True) -# assert bolt.security -# assert bolt.security == Security.default() -# await bolt.close() - - -# TODO: re-enable when we have a way of testing against full certs -# @mark.asyncio -# async def test_security_custom(address, auth): -# bolt = await Bolt.open(address, auth=auth, secure=...) -# assert bolt.security -# assert bolt.security == Security() -# await bolt.close() - - -@mark.asyncio -async def test_unsupported_protocol_version(address, auth): - with raises(ValueError): - _ = await Bolt.open(address, auth=auth, protocol_version=(1, 0)) - - -@mark.asyncio -async def test_bad_protocol_version_format(address, auth): - with raises(TypeError): - _ = await Bolt.open(address, auth=auth, protocol_version="4.0") - - -@mark.asyncio -async def test_bad_auth(address, auth): - try: - with raises(ClientError) as e: - _ = await Bolt.open(address, auth=("sneaky", "hacker")) - error = e.value - assert error.category == "Security" - assert error.title == "Unauthorized" - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_autocommit_transaction(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - values = [] - async for record in await bolt.run("UNWIND [2, 3, 5] AS n RETURN n"): - values.append(record[0]) - await bolt.close() - assert values == [2, 3, 5] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_discarded_autocommit_transaction(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - values = [] - async for record in await bolt.run("UNWIND [2, 3, 5] AS n RETURN n", discard=True): - values.append(record[0]) - await bolt.close() - assert values == [] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_explicit_transaction_with_commit(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - assert not tx.closed - values = [] - async for record in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - values.append(record[0]) - bookmark = await tx.commit() - assert bookmark # We can't assert anything about the content - assert tx.closed - await bolt.close() - assert values == [2, 3, 5] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_explicit_transaction_with_rollback(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - assert not tx.closed - values = [] - async for record in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - values.append(record[0]) - await tx.rollback() - assert tx.closed - await bolt.close() - assert values == [2, 3, 5] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_autocommit_in_autocommit(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - values = [] - async for r1 in await bolt.run("UNWIND [2, 3, 5] AS n RETURN n"): - async for r2 in await bolt.run("UNWIND [7, 11, 13] AS n RETURN n"): - values.append(r1[0] * r2[0]) - await bolt.close() - assert values == [14, 22, 26, 21, 33, 39, 35, 55, 65] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_explicit_in_autocommit(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - with raises(BoltTransactionError): - _ = await bolt.run("UNWIND [2, 3, 5] AS n RETURN n") - await tx.rollback() - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_autocommit_in_explicit(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - async for _ in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - with raises(BoltTransactionError): - _ = await bolt.run("UNWIND [7, 11, 13] AS n RETURN n") - await tx.commit() - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_explicit_in_explicit(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - with raises(BoltTransactionError): - _ = await bolt.begin() - await tx.rollback() - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_commit_is_non_idempotent(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - values = [] - async for record in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - values.append(record[0]) - await tx.commit() - with raises(BoltTransactionError): - await tx.commit() - await bolt.close() - assert values == [2, 3, 5] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_rollback_is_non_idempotent(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - values = [] - async for record in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - values.append(record[0]) - await tx.rollback() - with raises(BoltTransactionError): - await tx.rollback() - await bolt.close() - assert values == [2, 3, 5] - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_cypher_error_in_autocommit_transaction(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - with raises(ClientError) as e: - async for _ in await bolt.run("X"): - pass - error = e.value - assert isinstance(error, ClientError) - assert error.category == "Statement" - assert error.title == "SyntaxError" - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.skip(reason="TODO: fix correct error logic after error and exception refactoring") -@mark.asyncio -async def test_can_resume_after_error_in_autocommit_transaction(address, auth): - bolt = await Bolt.open(address, auth=auth) - with raises(ClientError): - async for _ in await bolt.run("X"): - pass - values = [] - async for record in await bolt.run("RETURN 1"): - values.append(record[0]) - await bolt.close() - assert values == [1] - - -@mark.asyncio -async def test_cypher_error_in_explicit_transaction(address, auth): - try: - bolt = await Bolt.open(address, auth=auth) - tx = await bolt.begin() - result1 = await tx.run("X") - result2 = await tx.run("RETURN 1") - with raises(ClientError) as e: - await tx.commit() - error = e.value - assert isinstance(error, ClientError) - assert error.category == "Statement" - assert error.title == "SyntaxError" - ok = await result1.consume() - assert not ok - ok = await result2.consume() - assert not ok - await bolt.close() - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_clean_transaction_function(bolt): - - async def work(tx): - product = 1 - async for record in await tx.run("UNWIND [2, 3, 5] AS n RETURN n"): - product *= record[0] - return product - - value = await bolt.run_tx(work) - assert value == 30 - - -@mark.asyncio -async def test_dirty_transaction_function(bolt): - await bolt.run("MATCH (_) DETACH DELETE _", discard=True) - - created = [] - - async def create_nodes(tx): - node_id = await tx.evaluate("CREATE (a) RETURN id(a)") - created.append(node_id) - raise RuntimeError("This should trigger a rollback") - - async def count_nodes(tx): - return await tx.evaluate("MATCH (a) WHERE id(a) = $x " - "RETURN count(a)", {"x": created[0]}) - - with raises(RuntimeError): - _ = await bolt.run_tx(create_nodes) - - assert len(created) == 1 - matched = await bolt.run_tx(count_nodes) - assert matched == 0 - - -@mark.asyncio -async def test_pool_exhaustion(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=3) - first = await pool.acquire() - second = await pool.acquire() - third = await pool.acquire() - assert isinstance(first, Bolt) - assert isinstance(second, Bolt) - assert isinstance(third, Bolt) - with raises(TimeoutError): - _ = await wait_for(pool.acquire(), timeout=1) - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_pool_reuse(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=3) - first = await pool.acquire() - second = await pool.acquire() - third = await pool.acquire() - assert first is not second and second is not third and first is not third - await pool.release(second) - fourth = await pool.acquire() - assert fourth is second - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - -@mark.asyncio -async def test_pool_release_notifies_acquire(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=1) - first = await pool.acquire() - - async def delayed_release(): - await sleep(1) - await pool.release(first) - - done, pending = await wait([ - delayed_release(), - pool.acquire(), - ]) - assert len(done) == 2 - assert len(pending) == 0 - assert pool.size == 1 - for future in done: - result = future.result() - assert result is None or result is first - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_default_pool_open_and_close(bolt_pool, address): - assert bolt_pool.address == address - - -@mark.asyncio -async def test_closing_pool_with_free_connections(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=3) - first = await pool.acquire() - second = await pool.acquire() - third = await pool.acquire() - await pool.release(first) - await pool.release(second) - await pool.release(third) - await pool.close() - assert first.closed - assert second.closed - assert third.closed - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - -@mark.asyncio -async def test_closing_pool_with_in_use_connections(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=3) - first = await pool.acquire() - second = await pool.acquire() - third = await pool.acquire() - await pool.close() - assert first.closed - assert second.closed - assert third.closed - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - -@mark.asyncio -async def test_expired_connections_are_not_returned_to_pool(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=1, max_age=0.25) - assert pool.size == PoolConfig.init_size - assert pool.in_use == 0 - cx = await pool.acquire() - assert pool.size == 1 - assert pool.in_use == 1 - await sleep(0.5) - await pool.release(cx) - assert pool.size == 0 - assert pool.in_use == 0 - assert cx.closed - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_closed_connections_are_not_returned_to_pool(address, auth): - try: - pool = await BoltPool.open(address, auth=auth, max_size=1) - assert pool.size == PoolConfig.init_size - assert pool.in_use == 0 - cx = await pool.acquire() - assert pool.size == 1 - assert pool.in_use == 1 - await cx.close() - await pool.release(cx) - assert pool.size == 0 - assert pool.in_use == 0 - except BoltHandshakeError as error: - pytest.skip(error.args[0]) - - -@mark.asyncio -async def test_cannot_release_already_released_connection(bolt_pool): - cx = await bolt_pool.acquire() - await bolt_pool.release(cx) - with raises(ValueError): - await bolt_pool.release(cx) - - -@mark.asyncio -async def test_cannot_release_unowned_connection(bolt_pool, address, auth): - try: - cx = await Bolt.open(address, auth=auth) - with raises(ValueError): - await bolt_pool.release(cx) - except BoltHandshakeError as error: - pytest.skip(error.args[0]) \ No newline at end of file diff --git a/tests/integration/aio/test_concurrency.py b/tests/integration/aio/test_concurrency.py deleted file mode 100644 index acfc28083..000000000 --- a/tests/integration/aio/test_concurrency.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from asyncio import sleep, wait -from random import random - -from pytest import mark - - -async def _run_queries(bolt_pool, d, values): - cx = await bolt_pool.acquire(force_reset=True) - for x in values: - await sleep(random()) - result = await cx.run("RETURN $x", {"x": x}) - record = await result.single() - assert record[0] == x - d.append(x) - await bolt_pool.release(cx, force_reset=True) - - -async def _run_tasks(bolt_pool, n_tasks, n_queries): - x_range = range(n_queries) - y_range = range(n_tasks) - data = [list() for _ in y_range] - cos = {_run_queries(bolt_pool, d, x_range) for d in data} - await wait(cos) - for d in data: - assert d == list(x_range) - - -@mark.asyncio -async def test_bolt_pool_should_allow_concurrent_async_usage(bolt_pool): - await _run_tasks(bolt_pool, 10, 50) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 4e3dd3e35..4957e7615 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -288,15 +288,6 @@ def bolt_driver(target, auth): skip(error.args[0]) -@fixture(scope="session") -async def async_bolt_driver(target, auth): - driver = await GraphDatabase.async_bolt_driver(target, auth=auth) - try: - yield driver - finally: - driver.close() - - @fixture(scope="session") def neo4j_driver(target, auth): try: @@ -315,25 +306,11 @@ def neo4j_driver(target, auth): driver.close() -@fixture(scope="session") -async def async_neo4j_driver(target, auth): - driver = await GraphDatabase.async_neo4j_driver(target, auth=auth) - try: - yield driver - finally: - driver.close() - - @fixture(scope="session") def driver(neo4j_driver): return neo4j_driver -@fixture(scope="session") -async def async_driver(async_neo4j_driver): - return async_neo4j_driver - - @fixture() def session(bolt_driver): session = bolt_driver.session() diff --git a/tests/integration/test_bolt_driver.py b/tests/integration/test_bolt_driver.py index b7910bf08..57ab32d38 100644 --- a/tests/integration/test_bolt_driver.py +++ b/tests/integration/test_bolt_driver.py @@ -54,18 +54,6 @@ def test_neo4j_uri(neo4j_uri, auth): except BoltHandshakeError as error: skip(error.args[0]) - -# TODO -# @mark.asyncio -# async def test_async_bolt_uri(bolt_uri, auth): -# async with await GraphDatabase.async_driver(bolt_uri, auth=auth) as driver: -# async with await driver.session() as session: -# result = await session.run("RETURN 1") -# record = await result.single() -# value = record.value() -# assert value == 1 - - def test_normal_use_case(bolt_driver): session = bolt_driver.session() value = session.run("RETURN 1").single().value() diff --git a/tests/requirements.txt b/tests/requirements.txt index 4bb9b24ae..f254f73a3 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,7 +1,6 @@ -e git://github.com/neo4j-drivers/boltkit.git#egg=boltkit coverage pytest -pytest-asyncio pytest-benchmark pytest-cov teamcity-messages diff --git a/tests/stub/aio/__init__.py b/tests/stub/aio/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/stub/aio/test_bolt.py b/tests/stub/aio/test_bolt.py deleted file mode 100644 index 8d22b8468..000000000 --- a/tests/stub/aio/test_bolt.py +++ /dev/null @@ -1,188 +0,0 @@ -#!/usr/bin/env python -# -*- encoding: utf-8 -*- - -# Copyright (c) 2002-2020 "Neo4j," -# Neo4j Sweden AB [http://neo4j.com] -# -# This file is part of Neo4j. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from asyncio import IncompleteReadError - -from boltkit.server.stub import BoltStubService -from pytest import mark, raises - -from neo4j.aio import Bolt -from neo4j._exceptions import BoltConnectionError, BoltConnectionBroken, BoltHandshakeError - - -@mark.asyncio -async def test_good_connectivity(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - assert bolt.remote_address.host in ("localhost", "127.0.0.1", "::1") - assert bolt.remote_address.port_number == stub.addresses[0].port_number - assert bolt.local_address.host in ("localhost", "127.0.0.1", "::1") - await bolt.close() - - -@mark.asyncio -async def test_bad_connectivity(script): - with raises(BoltConnectionError): - _ = await Bolt.open(("localhost", 9999), auth=()) - - -@mark.asyncio -async def test_return_1(script): - async with BoltStubService.load(script("v3", "return_1.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - values = [] - async for record in await bolt.run("RETURN $x", {"x": 1}): - values.append(record[0]) - await bolt.close() - assert values == [1] - - -@mark.asyncio -async def test_explicit_protocol_version(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth, protocol_version=(3, 0)) - assert bolt.protocol_version == (3, 0) - await bolt.close() - - -# TODO: re-enable this test when new boltkit stub server is working -# @mark.asyncio -# async def test_explicit_unsupported_protocol_version(script): -# async with BoltStubService.load(script("v3", "empty.script")) as stub: -# with raises(ValueError): -# _ = await Bolt.open(stub.addresses[0], auth=stub.auth, protocol_version=(0, 1)) - - -@mark.asyncio -async def test_illegal_protocol_version_type(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - with raises(TypeError): - _ = await Bolt.open(stub.addresses[0], auth=stub.auth, protocol_version=object()) - - -@mark.asyncio -async def test_unusable_value_on_handshake(script): - async with BoltStubService.load(script("v3", "unusable_value_on_handshake.script")) as stub: - with raises(BoltHandshakeError) as e: - await Bolt.open(stub.addresses[0], auth=stub.auth) - assert isinstance(e.value.__cause__, ValueError) - - -@mark.asyncio -async def test_incomplete_read_on_handshake(script): - async with BoltStubService.load(script("v3", "incomplete_read_on_handshake.script")) as stub: - with raises(BoltConnectionBroken) as e: - await Bolt.open(stub.addresses[0], auth=stub.auth) - assert isinstance(e.value.__cause__, IncompleteReadError) - - -# TODO: re-enable this test when new boltkit stub server is working -# @mark.asyncio -# async def test_unsupported_old_protocol_version(script): -# # TODO: fix task pending in boltkit that arises from this test -# async with BoltStubService.load(script("v3", "old_protocol.script")) as stub: -# with raises(BoltHandshakeError) as e: -# await Bolt.open(stub.addresses[0], auth=stub.auth, protocol_version=(3, 0)) -# error = e.value -# assert isinstance(error, BoltHandshakeError) -# port = stub.primary_address.port_number -# assert error.address in {("localhost", port), ("127.0.0.1", port), ("::1", port)} -# assert error.request_data == (b"\x60\x60\xb0\x17" -# b"\x00\x00\x00\x03" -# b"\x00\x00\x00\x00" -# b"\x00\x00\x00\x00" -# b"\x00\x00\x00\x00") -# assert error.response_data == b"\x00\x00\x01\x00" - - -@mark.asyncio -async def test_incomplete_read_on_init(script): - async with BoltStubService.load(script("v3", "incomplete_read_on_init.script")) as stub: - with raises(BoltConnectionBroken) as e: - await Bolt.open(stub.addresses[0], auth=stub.auth) - assert isinstance(e.value.__cause__, IncompleteReadError) - - -@mark.asyncio -async def test_readonly_true(script): - async with BoltStubService.load(script("v3", "readonly_true.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - await bolt.run("RETURN 1", readonly=True) - await bolt.close() - - -@mark.asyncio -async def test_readonly_false(script): - async with BoltStubService.load(script("v3", "readonly_false.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - await bolt.run("RETURN 1", readonly=False) - await bolt.close() - - -@mark.asyncio -async def test_good_bookmarks_value(script): - async with BoltStubService.load(script("v3", "good_bookmarks.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - await bolt.run("RETURN 1", bookmarks=["bookmark1"]) - await bolt.close() - - -@mark.asyncio -async def test_bad_bookmarks_value(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - with raises(TypeError): - await bolt.run("RETURN 1", bookmarks=object()) - await bolt.close() - - -@mark.asyncio -async def test_good_metadata_value(script): - async with BoltStubService.load(script("v3", "good_metadata.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - await bolt.run("RETURN 1", metadata={"foo": "bar"}) - await bolt.close() - - -@mark.asyncio -async def test_bad_metadata_value(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - with raises(TypeError): - await bolt.run("RETURN 1", metadata=object()) - await bolt.close() - - -@mark.asyncio -async def test_good_timeout_value(script): - async with BoltStubService.load(script("v3", "good_timeout.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - await bolt.run("RETURN 1", timeout=15) - await bolt.close() - - -@mark.asyncio -async def test_bad_timeout_value(script): - async with BoltStubService.load(script("v3", "empty.script")) as stub: - bolt = await Bolt.open(stub.addresses[0], auth=stub.auth) - with raises(TypeError): - await bolt.run("RETURN 1", timeout=object()) - await bolt.close() From c1e6baf3b3012de087d912ab9b6e949da4d219e1 Mon Sep 17 00:00:00 2001 From: martin bendsoe Date: Fri, 21 Feb 2020 11:55:07 +0100 Subject: [PATCH 2/2] added support for py35 in tox.ini --- tox.ini | 1 + 1 file changed, 1 insertion(+) diff --git a/tox.ini b/tox.ini index 2112b15d7..cb8b62c8f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,6 @@ [tox] envlist = + py35 py36 py37 py38