Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 0 additions & 106 deletions neo4j/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,13 @@ def driver(cls, uri, *, auth=None, acquire_timeout=None, **config):
else:
raise ValueError("Unknown URI scheme {!r}".format(parsed.scheme))

@classmethod
async def async_driver(cls, uri, *, auth=None, loop=None, **config):
""" Create a Neo4j driver that uses async I/O and task-based
concurrency.
"""
parsed = urlparse(uri)
if parsed.scheme == "bolt":
return await cls.async_bolt_driver(parsed.netloc, auth=auth, loop=loop, **config)
elif parsed.scheme == "neo4j" or parsed.scheme == "bolt+routing":
rc = cls._parse_routing_context(parsed.query)
return await cls.async_neo4j_driver(parsed.netloc, auth=auth, routing_context=rc,
loop=loop, **config)
else:
raise ValueError("Unknown URI scheme {!r}".format(parsed.scheme))

@classmethod
def bolt_driver(cls, target, *, auth=None, acquire_timeout=None, **config):
""" Create a driver for direct Bolt server access that uses
socket I/O and thread-based concurrency.
"""
return BoltDriver.open(target, auth=auth, acquire_timeout=acquire_timeout, **config)

@classmethod
async def async_bolt_driver(cls, target, *, auth=None, loop=None, **config):
""" Create a driver for direct Bolt server access that uses
async I/O and task-based concurrency.
"""
return await AsyncBoltDriver.open(target, auth=auth, loop=loop, **config)

@classmethod
def neo4j_driver(cls, *targets, auth=None, routing_context=None, acquire_timeout=None,
**config):
Expand All @@ -111,13 +89,6 @@ def neo4j_driver(cls, *targets, auth=None, routing_context=None, acquire_timeout
return Neo4jDriver.open(*targets, auth=auth, routing_context=routing_context,
acquire_timeout=acquire_timeout, **config)

@classmethod
async def async_neo4j_driver(cls, *targets, auth=None, loop=None, **config):
""" Create a driver for routing-capable Neo4j service access
that uses async I/O and task-based concurrency.
"""
return await AsyncNeo4jDriver.open(*targets, auth=auth, loop=loop, **config)

@classmethod
def _parse_routing_context(cls, query):
""" Parse the query portion of a URI to generate a routing
Expand Down Expand Up @@ -254,34 +225,6 @@ def verify_connectivity(self, **config):
raise NotImplementedError


class AsyncDriver:

@classmethod
async def open(cls, uri, **config):
raise NotImplementedError

def __init__(self, pool):
self._pool = pool

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

async def session(self, **config):
""" Create a reactive session.

:param config: session configuration
(see :class:`.RxSessionConfig` for details)
:returns: new :class:`.RxSession` object
"""
raise NotImplementedError

async def close(self):
await self._pool.close()


class BoltDriver(Direct, Driver):
""" A :class:`.BoltDriver` is created from a ``bolt`` URI and addresses
a single database machine. This may be a standalone server or could be a
Expand Down Expand Up @@ -380,52 +323,3 @@ def _verify_routing_connectivity(self):
if val is not None:
return routing_info
raise ServiceUnavailable("Could not connect to any routing servers.")


class AsyncBoltDriver(Direct, AsyncDriver):

@classmethod
async def open(cls, target, *, auth=None, loop=None, **config):
from neo4j.aio import BoltPool
from neo4j.work import WorkspaceConfig
address = cls.parse_target(target)
pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig,
WorkspaceConfig)
pool = await BoltPool.open(address, auth=auth, loop=loop, **pool_config)
return cls(pool, default_workspace_config)

def __init__(self, pool, default_workspace_config):
Direct.__init__(self, pool.address)
AsyncDriver.__init__(self, pool)
self._default_workspace_config = default_workspace_config

async def session(self, **config):
from neo4j.work.aio import AsyncSession, AsyncSessionConfig
session_config = AsyncSessionConfig(self._default_workspace_config,
AsyncSessionConfig.consume(config))
return AsyncSession(self._pool, session_config)


class AsyncNeo4jDriver(Routing, AsyncDriver):

@classmethod
async def open(cls, *targets, auth=None, routing_context=None, loop=None, **config):
from neo4j.aio import Neo4jPool
from neo4j.work import WorkspaceConfig
addresses = cls.parse_targets(*targets)
pool_config, default_workspace_config = Config.consume_chain(config, PoolConfig,
WorkspaceConfig)
pool = await Neo4jPool.open(*addresses, auth=auth, routing_context=routing_context,
loop=loop, **pool_config)
return cls(pool, default_workspace_config)

def __init__(self, pool, default_workspace_config):
Routing.__init__(self, pool.routing_table.initial_routers)
AsyncDriver.__init__(self, pool)
self._default_workspace_config = default_workspace_config

async def session(self, **config):
from neo4j.work.aio import AsyncSession, AsyncSessionConfig
session_config = AsyncSessionConfig(self._default_workspace_config,
AsyncSessionConfig.consume(config))
return AsyncSession(self._pool, session_config)
Loading