Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 62 additions & 19 deletions neo4j/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from warnings import warn

from neo4j.bolt import ProtocolError, ServiceUnavailable
from neo4j.compat import urlparse
from neo4j.compat import unicode, urlparse
from neo4j.exceptions import CypherError, TransientError

from .exceptions import DriverError, SessionError, SessionExpired, TransactionError
Expand All @@ -42,6 +42,21 @@
RETRY_DELAY_JITTER_FACTOR = 0.2


def last_bookmark(b0, b1):
""" Return the latest of two bookmarks by looking for the maximum
integer value following the last colon in the bookmark string.
"""
n = [None, None]
_, _, n[0] = b0.rpartition(":")
_, _, n[1] = b1.rpartition(":")
for i in range(2):
try:
n[i] = int(n[i])
except ValueError:
raise ValueError("Invalid bookmark: {}".format(b0))
return b0 if n[0] > n[1] else b1


def retry_delay_generator(initial_delay, multiplier, jitter_factor):
delay = initial_delay
while True:
Expand Down Expand Up @@ -141,14 +156,25 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.close()

def session(self, access_mode=None, bookmark=None):
def session(self, access_mode=None, **parameters):
""" Create a new session using a connection from the driver connection
pool. Session creation is a lightweight operation and sessions are
not thread safe, therefore a session should generally be short-lived
within a single thread.

:param access_mode:
:param bookmark:
:param access_mode: access mode for this session (read or write)
:param parameters: set of parameters for this session:

`bookmark`
A bookmark after which this session should begin.

`bookmarks`
A collection of bookmarks after which this session should begin.

`max_retry_time`
The maximum time after which to stop attempting retries of failed
transactions.

:returns: new :class:`.Session` object
"""
if self.closed():
Expand Down Expand Up @@ -190,28 +216,39 @@ class Session(object):

"""

#: The current connection.
# The current connection.
_connection = None

#: The access mode for the current connection.
# The access mode for the current connection.
_connection_access_mode = None

#: The current :class:`.Transaction` instance, if any.
# The current :class:`.Transaction` instance, if any.
_transaction = None

#: The last result received.
# The last result received.
_last_result = None

#: The bookmark received from completion of the last :class:`.Transaction`.
_bookmark = None
# The collection of bookmarks after which the next
# :class:`.Transaction` should be carried out.
_bookmarks = ()

# Default maximum time to keep retrying failed transactions.
_max_retry_time = DEFAULT_MAX_RETRY_TIME

_closed = False

def __init__(self, acquirer, max_retry_time=None, access_mode=None, bookmark=None):
def __init__(self, acquirer, access_mode, **parameters):
self._acquirer = acquirer
self._max_retry_time = DEFAULT_MAX_RETRY_TIME if max_retry_time is None else max_retry_time
self._default_access_mode = access_mode or WRITE_ACCESS
self._bookmark = bookmark
self._default_access_mode = access_mode
for key, value in parameters.items():
if key == "bookmark":
self._bookmarks = [value] if value else []
elif key == "bookmarks":
self._bookmarks = value or []
elif key == "max_retry_time":
self._max_retry_time = value
else:
pass # for compatibility

def __del__(self):
try:
Expand Down Expand Up @@ -354,7 +391,13 @@ def detach(self, result):
def last_bookmark(self):
""" The bookmark returned by the last :class:`.Transaction`.
"""
return self._bookmark
last = None
for bookmark in self._bookmarks:
if last is None:
last = bookmark
else:
last = last_bookmark(last, bookmark)
return last

def has_transaction(self):
return bool(self._transaction)
Expand Down Expand Up @@ -383,7 +426,7 @@ def begin_transaction(self, bookmark=None):
from warnings import warn
warn("Passing bookmarks at transaction level is deprecated", category=DeprecationWarning, stacklevel=2)
_warned_about_transaction_bookmarks = True
self._bookmark = bookmark
self._bookmarks = [bookmark]

self._create_transaction()
self._connect()
Expand All @@ -401,8 +444,9 @@ def commit_transaction(self):
self._transaction = None
result = self.__commit__()
result.consume()
self._bookmark = self.__bookmark__(result)
return self._bookmark
bookmark = self.__bookmark__(result)
self._bookmarks = [bookmark]
return bookmark

def rollback_transaction(self):
""" Rollback the current transaction.
Expand All @@ -412,7 +456,6 @@ def rollback_transaction(self):
if not self.has_transaction():
raise TransactionError("No transaction to rollback")
self._transaction = None
self._bookmark = None
self.__rollback__().consume()

def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):
Expand Down
6 changes: 4 additions & 2 deletions neo4j/v1/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,7 @@ def __init__(self, uri, **config):
pool.acquire()
Driver.__init__(self, pool, **config)

def session(self, access_mode=None, bookmark=None):
return BoltSession(self._pool.acquire, self._max_retry_time, access_mode=access_mode, bookmark=bookmark)
def session(self, access_mode=None, **parameters):
if "max_retry_time" not in parameters:
parameters["max_retry_time"] = self._max_retry_time
return BoltSession(self._pool.acquire, access_mode, **parameters)
9 changes: 5 additions & 4 deletions neo4j/v1/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def fetch_routing_info(self, address):
if routing support is broken
"""
try:
with RoutingSession(lambda _: self.acquire_direct(address)) as session:
with RoutingSession(lambda _: self.acquire_direct(address), access_mode=None) as session:
return list(session.run("ignored", self.routing_context))
except CypherError as error:
if error.code == "Neo.ClientError.Procedure.ProcedureNotFound":
Expand Down Expand Up @@ -272,7 +272,6 @@ def update_routing_table(self):
if self.update_routing_table_with_routers(initial_routers):
return


# None of the routers have been successful, so just fail
raise ServiceUnavailable("Unable to retrieve routing information")

Expand Down Expand Up @@ -364,5 +363,7 @@ def connector(a):
else:
Driver.__init__(self, pool, **config)

def session(self, access_mode=None, bookmark=None):
return BoltSession(self._pool.acquire, self._max_retry_time, access_mode=access_mode, bookmark=bookmark)
def session(self, access_mode=None, **parameters):
if "max_retry_time" not in parameters:
parameters["max_retry_time"] = self._max_retry_time
return BoltSession(self._pool.acquire, access_mode, **parameters)
12 changes: 5 additions & 7 deletions neo4j/v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@


class BoltSession(Session):
"""

:param acquirer: function that can accept an access mode and return a connection
:param access_mode:
:param bookmark:
"""

def _run(self, statement, parameters):
assert isinstance(statement, unicode)
Expand All @@ -59,7 +53,11 @@ def __run__(self, statement, parameters):
return self._run(statement, parameters)

def __begin__(self):
return self.__run__(u"BEGIN", {"bookmark": self._bookmark} if self._bookmark else {})
if self._bookmarks:
parameters = {"bookmark": self.last_bookmark(), "bookmarks": self._bookmarks}
else:
parameters = {}
return self.__run__(u"BEGIN", parameters)

def __commit__(self):
return self.__run__(u"COMMIT", {})
Expand Down
8 changes: 4 additions & 4 deletions test/stub/scripts/bookmark_chain.script
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
!: AUTO INIT
!: AUTO RESET

C: RUN "BEGIN" {"bookmark": "bookmark:1"}
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:0", "bookmark:1"]}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:2"}
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
SUCCESS {}

C: RUN "BEGIN" {"bookmark": "bookmark:2"}
C: RUN "BEGIN" {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:3"}
S: SUCCESS {"bookmark": "bookmark:3", "bookmarks": ["bookmark:3"]}
SUCCESS {}
10 changes: 5 additions & 5 deletions test/stub/scripts/bookmark_chain_with_autocommit.script
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
!: AUTO INIT
!: AUTO RESET

C: RUN "BEGIN" {"bookmark": "bookmark:1"}
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:2"}
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
SUCCESS {}

C: RUN "RETURN 1" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:x"}
S: SUCCESS {"bookmark": "bookmark:x", "bookmarks": ["bookmark:x"]}
SUCCESS {}

C: RUN "BEGIN" {"bookmark": "bookmark:2"}
C: RUN "BEGIN" {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:3"}
S: SUCCESS {"bookmark": "bookmark:3", "bookmarks": ["bookmark:3"]}
SUCCESS {}
2 changes: 1 addition & 1 deletion test/stub/scripts/return_1_in_tx.script
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ S: SUCCESS {"fields": ["1"]}

C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:1"}
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
SUCCESS {}
6 changes: 3 additions & 3 deletions test/stub/scripts/return_1_in_tx_twice.script
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ S: SUCCESS {"fields": ["1"]}

C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:1"}
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
SUCCESS {}

C: RUN "BEGIN" {"bookmark": "bookmark:1"}
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
PULL_ALL
S: SUCCESS {"fields": []}
SUCCESS {}
Expand All @@ -30,5 +30,5 @@ S: SUCCESS {"fields": ["1"]}

C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:2"}
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
SUCCESS {}
2 changes: 1 addition & 1 deletion test/stub/scripts/return_1_twice_in_tx.script
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ S: SUCCESS {"fields": ["x"]}

C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:1"}
S: SUCCESS {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
SUCCESS {}
4 changes: 2 additions & 2 deletions test/stub/scripts/return_2_in_tx.script
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
!: AUTO INIT
!: AUTO RESET

C: RUN "BEGIN" {"bookmark": "bookmark:1"}
C: RUN "BEGIN" {"bookmark": "bookmark:1", "bookmarks": ["bookmark:1"]}
PULL_ALL
S: SUCCESS {"fields": []}
SUCCESS {}
Expand All @@ -14,5 +14,5 @@ S: SUCCESS {"fields": ["2"]}

C: RUN "COMMIT" {}
PULL_ALL
S: SUCCESS {"bookmark": "bookmark:2"}
S: SUCCESS {"bookmark": "bookmark:2", "bookmarks": ["bookmark:2"]}
SUCCESS {}
9 changes: 8 additions & 1 deletion test/stub/test_bookmarking.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ def test_should_be_able_to_set_bookmark(self):
with driver.session(bookmark="X") as session:
assert session.last_bookmark() == "X"

def test_should_be_able_to_set_multiple_bookmarks(self):
with StubCluster({9001: "router.script"}):
uri = "bolt+routing://localhost:9001"
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
with driver.session(bookmarks=[":1", ":2"]) as session:
assert session.last_bookmark() == ":2"

def test_should_automatically_chain_bookmarks(self):
with StubCluster({9001: "router.script", 9004: "bookmark_chain.script"}):
uri = "bolt+routing://localhost:9001"
with GraphDatabase.driver(uri, auth=self.auth_token, encrypted=False) as driver:
with driver.session(access_mode=READ_ACCESS, bookmark="bookmark:1") as session:
with driver.session(access_mode=READ_ACCESS, bookmarks=["bookmark:0", "bookmark:1"]) as session:
with session.begin_transaction():
pass
assert session.last_bookmark() == "bookmark:2"
Expand Down