Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 115 additions & 7 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2615,6 +2615,12 @@ def __init__(self, cluster, hosts, keyspace=None):

self._lock = RLock()
self._pools = {}
# Tracks in-flight pool creation futures keyed by host, guarded by
# _lock. Used by add_or_renew_pool to detect and reuse concurrent
# creations so that update_created_pools does not schedule a duplicate
# run_add_or_renew_pool for a host whose pool creation is already
# in-flight (scylladb/python-driver#317).
self._pending_pool_futures = {}
self._profile_manager = cluster.profile_manager
self._metrics = cluster.metrics
self._request_init_callbacks = []
Expand Down Expand Up @@ -3240,23 +3246,37 @@ def add_or_renew_pool(self, host, is_host_addition):
if distance == HostDistance.IGNORED:
return None

# Mutable one-element list so the outer code can upgrade the flag
# after the closure has been submitted but before it reads it. This
# fixes the coalescing race where an in-flight future created with
# is_host_addition=False is reused by a later on_add() call that
# needs is_host_addition=True: the closure then passes the wrong flag
# to signal_connection_failure(), causing _HostReconnectionHandler to
# call on_up() instead of on_add() on reconnect (scylladb/python-driver#317).
is_host_addition_cell = [is_host_addition]

# Unique token for this submission. The closure checks it before
# installing its pool so that a stale task (whose entry was replaced by
# remove_pool + a fresh add_or_renew_pool) discards its pool rather
# than overwriting the freshly-started one (scylladb/python-driver#317).
creation_id = object()

def run_add_or_renew_pool():
try:
new_pool = HostConnection(host, distance, self)
except AuthenticationFailed as auth_exc:
conn_exc = ConnectionException(str(auth_exc), endpoint=host)
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition)
self.cluster.signal_connection_failure(host, conn_exc, is_host_addition_cell[0])
return False
except Exception as conn_exc:
log.warning("Failed to create connection pool for new host %s:",
host, exc_info=conn_exc)
# the host itself will still be marked down, so we need to pass
# a special flag to make sure the reconnector is created
self.cluster.signal_connection_failure(
host, conn_exc, is_host_addition, expect_host_to_be_down=True)
host, conn_exc, is_host_addition_cell[0], expect_host_to_be_down=True)
return False

previous = self._pools.get(host)
with self._lock:
while new_pool._keyspace != self.keyspace:
self._lock.release()
Expand All @@ -3271,23 +3291,111 @@ def callback(pool, errors):
set_keyspace_event.wait(self.cluster.connect_timeout)
if not set_keyspace_event.is_set() or errors_returned:
log.warning("Failed setting keyspace for pool after keyspace changed during connect: %s", errors_returned)
self.cluster.on_down(host, is_host_addition)
self.cluster.on_down(host, is_host_addition_cell[0])
new_pool.shutdown()
self._lock.acquire()
return False
self._lock.acquire()
self._pools[host] = new_pool

# Identity guard: if _pending_pool_futures no longer holds our
# creation_id it means remove_pool() ran (and possibly a fresh
# add_or_renew_pool was submitted) while we were connecting.
# Discard our pool so the fresher task can install its own
# (scylladb/python-driver#317).
entry = self._pending_pool_futures.get(host)
if entry is None or entry['creation_id'] is not creation_id:
log.debug("Discarding stale connection pool for host %s "
"(superseded by a newer creation)", host)
discard_new_pool = True
else:
# Read the current pool state inside the lock so the check
# is atomic with the installation of our new pool.
previous = self._pools.get(host)
if previous is not None and not previous.is_shutdown:
# A concurrent add_or_renew_pool already installed a
# live pool for this host while we were connecting.
# Discard ours to avoid replacing it and dropping
# in-flight requests (scylladb/python-driver#317).
log.debug("Discarding duplicate connection pool for host %s "
"(live pool already present)", host)
discard_new_pool = True
else:
discard_new_pool = False
self._pools[host] = new_pool

if discard_new_pool:
new_pool.shutdown()
return True

log.debug("Added pool for host %s to session", host)
if previous:
previous.shutdown()

return True

return self.submit(run_add_or_renew_pool)
with self._lock:
if self.is_shutdown:
return None
# If there is already an in-flight pool creation for this host,
# return that future instead of scheduling a duplicate. This
# prevents update_created_pools from creating a second pool when
# the first one has not yet finished connecting
# (scylladb/python-driver#317).
entry = self._pending_pool_futures.get(host)
if entry is not None and entry['future'] is not None and not entry['future'].done():
if distance == entry['distance']:
# Same distance: safe to coalesce. Upgrade is_host_addition
# in the shared cell if the new caller needs the stricter
# on_add() reconnect path (scylladb/python-driver#317).
if is_host_addition:
entry['is_host_addition_cell'][0] = True
log.debug("Reusing in-flight pool creation for host %s", host)
return entry['future']
# Distance changed: the in-flight HostConnection was constructed
# with the old distance (e.g. REMOTE with connect_to_remote_hosts
# =False => no connections). Submit a fresh task; the creation_id
# guard below ensures it wins over the stale one
# (scylladb/python-driver#317).
log.debug("Distance changed for host %s while pool creation was "
"in-flight; submitting fresh creation", host)
# Store the entry BEFORE calling submit so the closure always
# finds a valid creation_id in _pending_pool_futures, even when
# the executor runs the task synchronously
# (scylladb/python-driver#317).
new_entry = {
'future': None, # filled in immediately after submit returns
'creation_id': creation_id,
'distance': distance,
'is_host_addition_cell': is_host_addition_cell,
}
self._pending_pool_futures[host] = new_entry
future = self.submit(run_add_or_renew_pool)
if future is None:
# Session is shutting down; clean up the placeholder entry.
self._pending_pool_futures.pop(host, None)
return None
new_entry['future'] = future
# Remove the entry once the future finishes, regardless of how
# run_add_or_renew_pool exits (including unhandled exceptions).
# The callback acquires _lock and only clears the entry if it
# still holds *this* creation_id, so a concurrent remove_pool
# followed by a new add_or_renew_pool is not affected
# (scylladb/python-driver#317).
def _clear_pending(f, _host=host, _creation_id=creation_id):
with self._lock:
e = self._pending_pool_futures.get(_host)
if e is not None and e['creation_id'] is _creation_id:
self._pending_pool_futures.pop(_host, None)
future.add_done_callback(_clear_pending)
return future
Comment thread
nyh marked this conversation as resolved.

def remove_pool(self, host):
pool = self._pools.pop(host, None)
with self._lock:
pool = self._pools.pop(host, None)
# Invalidate any in-flight pool creation for this host so that a
# subsequent update_created_pools call can schedule a fresh one if
# needed (scylladb/python-driver#317).
self._pending_pool_futures.pop(host, None)
if pool:
log.debug("Removed connection pool for %r", host)
return self.submit(pool.shutdown)
Expand Down
Loading
Loading