Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ResponseFuture: do not return the stream ID on client timeout #106

Merged

Conversation

piodul
Copy link

@piodul piodul commented Sep 27, 2021

When a timeout occurs, the ResponseFuture associated with the query
returns its stream ID to the associated connection's free stream ID pool,
so that the stream ID can be immediately reused by another query.

However, that is incorrect and dangerous. If query A times out before it
receives a response from the cluster, a different query B might be
issued on the same connection and stream. If response for query A
arrives earlier than the response for query B, the first one might be
misinterpreted as the response for query B.

This commit changes the logic so that stream IDs are not returned on
timeout - now, they are only returned after receiving a response.

@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from 1ee9c89 to 8f3add3 Compare September 27, 2021 16:30
@psarna
Copy link

psarna commented Sep 28, 2021

I tried the updated driver on the reproducer. What happens right now is that the third request, which previously incorrectly reused a stream id, now fails with 'Unable to complete the operation against any hosts', {<Host: 127.0.0.1:9042 datacenter1>: AssertionError()}. Is it the desired outcome? It's infinitely better than incorrect results, but perhaps it should be queued and run once there are enough stream ids available?

@psarna
Copy link

psarna commented Sep 28, 2021

edit: never mind! I tested based on 3.24, without this fix present:

commit a6a66cac2931c4f66d248ed7091d68f6932de5b0
Author: Alan Boudreault <alan@kovaro.ca>
Date:   Tue Jun 2 09:46:04 2020 -0400

    ensure the connection max request id's is respected

diff --git a/cassandra/connection.py b/cassandra/connection.py
index f30be682..3d154de0 100644
--- a/cassandra/connection.py
+++ b/cassandra/connection.py
@@ -1443,7 +1443,7 @@ class HeartbeatFuture(object):
         log.debug("Sending options message heartbeat on idle connection (%s) %s",
                   id(connection), connection.endpoint)
         with connection.lock:
-            if connection.in_flight <= connection.max_request_id:
+            if connection.in_flight < connection.max_request_id:
                 connection.in_flight += 1
                 connection.send_msg(OptionsMessage(), connection.get_request_id(), self._options_callback)
             else:
diff --git a/cassandra/pool.py b/cassandra/pool.py
index 87e8f037..cd276560 100644
--- a/cassandra/pool.py
+++ b/cassandra/pool.py
@@ -418,7 +418,7 @@ class HostConnection(object):
         remaining = timeout
         while True:
             with conn.lock:
-                if conn.in_flight <= conn.max_request_id:
+                if conn.in_flight < conn.max_request_id:
                     conn.in_flight += 1
                     return conn, conn.get_request_id()
             if timeout is not None:
diff --git a/tests/integration/simulacron/test_connection.py b/tests/integration/simulacron/test_connection.py
index afe2685d..11bfef7f 100644
--- a/tests/integration/simulacron/test_connection.py
+++ b/tests/integration/simulacron/test_connection.py
@@ -24,7 +24,7 @@ from mock import Mock, patch
 from cassandra import OperationTimedOut
 from cassandra.cluster import (EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile,
                                _Scheduler, NoHostAvailable)
-from cassandra.policies import HostStateListener, RoundRobinPolicy
+from cassandra.policies import HostStateListener, RoundRobinPolicy, WhiteListRoundRobinPolicy

... and then, an assertion is hit when trying to get another stream_id due to the off-by-one error. With the patch above applied, the error no longer happens.

An important conclusion is that if we backport the fix presented in this issue to previous versions, we should also ship the patch I posted above.

@psarna
Copy link

psarna commented Sep 28, 2021

And just to clarify - after both fixes are in place, the code works perfectly - in case of max concurrency == 2, two requests are performed first, and only then, the third one is sent - even though one of the first two timed out.

@piodul
Copy link
Author

piodul commented Sep 28, 2021

I think that the PR might still have some subtle synchronization issues. I'll look through the code once more - please do not merge yet.

Copy link

@haaawk haaawk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general but we need a way to close the connection and open another one when we accumulate two many orphaned_stream_ids. Just like a Java driver does:
apache/cassandra-java-driver@4e680fb

Motivation:

Sometimes the driver might stop waiting for a response on a particular
channel (for example if the request timed out, or was completed by
another speculative execution). Before this change we just kept the
response callback in our inflight map, which is a problem if we never
get the response: it creates a memory leak, and if this keeps happening
we'll eventually run out of stream ids on the channel.

Modifications:

Add a channel method to cancel a response callback, indicating that the
caller is not interested in the response anymore. Modify existing
clients to cancel their callbacks.
Track the number of "orphan" stream ids (cancelled and have not yet
received a response from the server). Initiate a graceful shutdown if
this number exceeds a threshold.
Improve the way the channel pool manages channel shutdowns: start the
reconnection as soon as an orderly shutdown has *started*, not when it
finishes.

Result:

Cancelled callbacks are not leaked anymore. When the number of orphan
ids exceed the threshold, the channel is closed gracefully and the pool
starts replacing it immediately.

@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch 2 times, most recently from b125e1f to 70ffe0e Compare September 28, 2021 18:13
@piodul
Copy link
Author

piodul commented Sep 28, 2021

v2: Added commits with fixes to two issues I found by reading the code more thoroughly. One caused the connection to be re-created when we run out of free streams on it and there are connections timed out on the client side but not replied by Scylla. The other caused more timeouts than necessary right after the free stream pool gets empty. More information about them in the commit descriptions.

@piodul
Copy link
Author

piodul commented Sep 28, 2021

It looks like CI checks are skipped for now... For this PR, I ran the following tests locally:

  • Unit tests: nosetests -w tests/unit/
  • Integration tests specified in the .github/workflows/integration-tests.yaml file + tests/integration/standard/test_shard_aware.py
  • Some manual unit tests which hits the per-connection request limit and runs enough requests concurrently so that they time out

I can run more if requested, although it would be nice if the tests were ran in a standardized environment...

@avelanarius avelanarius added the integration-tests If this tag, this PR won't run integration tests label Sep 28, 2021
@avelanarius
Copy link
Member

I can run more if requested, although it would be nice if the tests were ran in a standardized environment...

It seems you need to add "integration-tests" tag (which I just did) to run the tests - however it still didn't help...

@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from 70ffe0e to 3210a9e Compare October 11, 2021 21:30
@piodul
Copy link
Author

piodul commented Oct 11, 2021

v3:

  • Implemented excess connection pooling (improves robustness of connecting to the right shards, it is also needed for the connection replacement mechanism to work well),
  • Implemented connection replacement similar to the one mentioned here - when the number of orphaned stream IDs crosses a certain threshold, a new connection is opened which replaces the old one, and the old connection is closed after all its outstanding requests either finish or time out.

When a timeout occurs, the ResponseFuture associated with the query
returns its stream ID to the associated connection's free stream ID pool
- so that the stream ID can be immediately reused by another query.

However, that it incorrect and dangerous. If query A times out before it
receives a response from the cluster, a different query B might be
issued on the same connection and stream. If response for query A
arrives earlier than the response for query B, the first one might be
misinterpreted as the response for query B.

This commit changes the logic so that stream IDs are not returned on
timeout - now, they are only returned after receiving a response.
@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from 3210a9e to a04f44f Compare October 12, 2021 13:28
@piodul
Copy link
Author

piodul commented Oct 12, 2021

v4: reverted to the state from v2. Excess connection pooling and connection replacing will be sent in a followup PR.

Copy link

@psarna psarna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, left one question about a lock

cassandra/connection.py Outdated Show resolved Hide resolved
@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from a04f44f to e804539 Compare October 12, 2021 14:20
This commit fixes tracking of in_flight requests. Before it, in case of
a client-side timeout, the response ID was not returned to the pool, but
the in_flight counter was decremented anyway. This counter is used to
determine if there is a need to wait for stream IDs to be freed -
without this patch, it could happen that the driver throught that it can
initiate another request due to in_flight counter being low, but there
weren't any free stream IDs to allocate, so an assertion was triggered
and the connection was defuncted and opened again.

Now, requests timed out on the client side are tracked in the
orphaned_request_ids field, and the in_flight counter is decremented
only after the response is received.
Before this patch, the following situation could occur:

1. On a single connection, multiple requests are spawned up to the
   maximum concurrency,
2. We want to issue more requests but we need to wait on a condition
   variable because requests spawned in 1. took all stream IDs and we
   need to wait until some of them are freed,
3. All requests from point 1. time out on the client side - we cannot
   free their stream IDs until the database node responds,
4. Responses for requests issued in point 1. arrive, but the Connection
   class has no access to the condition variable mentioned in point 2.,
   so no requests from point 2. are admitted,
5. Requests from point 2. waiting on the condition variable time out
   despite there are stream IDs available.

This commit adds an _owning_pool field to the Connection class, and now
it notifies the owning pool in case a timed out request receives a late
response and a stream ID is freed.
@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from e804539 to 5edce49 Compare October 12, 2021 14:31
Copy link

@psarna psarna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approve, because my comments were adressed, but please take it with a grain of salt as I'm no Python expert. Another ack would be welcome

When assigning shards to connections, Scylla chooses the shard with the
least number of connections. If the pool misses a single connection to
some shard which is not the shard with the least number of connections,
the pool will not be able to connect to it because the attempted
connection is immediately closed in case it gets the wrong shard.

In order to make this process more robust, now the pool can keep
"missed" connections in the excess connection pool. The excess
connections are not used to serve requests, they are kept only in order
to affect the algorithm on the Scylla side which chooses shards to
connections - and make it possible to connect to shards which are
currently not the least loaded with connections. When the pool becomes
full, excess connections are closed. Furthermore, the number of excess
connections is bounded by 3 * shard count; if it goes above this number,
all excess connections are closed.

This change is also necessary for the logic of replacing of connections
with too many orphaned stream IDs - which is coming in later commits of
this PR.
@piodul
Copy link
Author

piodul commented Oct 14, 2021

v5: brought back improvements from v3 (keeping excess connections and overloaded connection reopening).

cassandra/pool.py Outdated Show resolved Hide resolved
cassandra/pool.py Show resolved Hide resolved
cassandra/pool.py Outdated Show resolved Hide resolved
@@ -521,6 +541,16 @@ def return_connection(self, connection, stream_was_orphaned=False):
return
self._is_replacing = True
self._session.submit(self._replace, connection)
else:
if connection in self._trash:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to access self._trash without a lock? I'm asking in broad terms of Python, is the access pattern atomic enough to never cause garbage to be read here if another thread writes to _trash? Otherwise, maybe you could just wrap this single check in a lock as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried googling information about atomicity, and I found the following entry in an FAQ in the official Python documentation:

https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe

"In practice, it means that operations on shared variables of built-in data types (ints, lists, dicts, etc) that “look atomic” really are."

The article then lists some examples of operations on lists, dicts which are atomic (it even includes sorting a list!).

I'm not entirely satisfied by this explanation because it's vague (what does it mean to "look atomic"?). However, the existing code seems to already operate under that assumption - see the analogous variable _trash in the HostConnectionPool class. Moreover, set is a built-in data type and the in operation, at least to me, "looks atomic".

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure that you don't change the size of trash in other threads, otherwise, you will experience a similar issue that was found: #111.
You can potentially get RuntimeError: Set changed size during iteration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bentsi The size of the trash definitely can be changed, for example increased or decreased.

The issue #111 seems to be caused by iteration running concurrently with modification, but the line this comment thread is about uses the in operator. Does the in operator used on set perform iteration?

I copied the pattern used here from the other pool implementation (HostConnectionPool) - see here. Does that mean that there is an issue there as well?

@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from 1bb767e to b40c23b Compare October 15, 2021 13:10
In a situation of very high overload or poor networking conditions, it
might happen that there is a large number of outstanding requests on a
single connection. Each request reserves a stream ID which cannot be
reused until a response for it arrives, even if the request already
timed out on the client side. Because the pool of available stream IDs
for a single connection is limited, such situation might cause the set
of free stream IDs to shrink a very small size (including zero), which
will drastically reduce the available concurrency on the connection, or
even render it unusable for some time.

In order to prevent this, the following strategy is adopted: when the
number of orphaned stream IDs reaches a certain threshold (e.g. 75% of
all available stream IDs), the connection becomes marked as overloaded.
Meanwhile, a new connection is opened - when it becomes available, it
replaces the old one, and the old connection is moved to "trash" where
it waits until all its outstanding requests either respond or time out.

Because there is no guarantee that the new connection will have the same
shard assigned as the old connection, this strategy uses the excess
connection pool to increase the chances of getting the right shard after
several attempts.

This feature is implemented for HostConnection but not for
HostConnectionPool, which means that it will only work for clusters
which use protocol v3 or newer.

This fix is heavily inspired by the fix for JAVA-1519.
@piodul piodul force-pushed the do-not-return-stream-ids-on-client-timeout branch from b40c23b to d780076 Compare October 15, 2021 13:26
@piodul
Copy link
Author

piodul commented Oct 18, 2021

@fruch could you take a look?

@psarna
Copy link

psarna commented Oct 18, 2021

@piodul here's a neat trick for pinging: @scylladb/python-driver-maint

@bentsi
Copy link

bentsi commented Oct 18, 2021

Per our conversation with @haaawk my team is working on merging this PR and #112, testing and release the new version

@haaawk
Copy link

haaawk commented Oct 18, 2021

Thanks a lot @bentsi

@dkropachev dkropachev force-pushed the do-not-return-stream-ids-on-client-timeout branch from 479610f to 9bc49b9 Compare October 18, 2021 16:45
@dkropachev dkropachev merged commit d2ab67c into scylladb:master Oct 18, 2021
@@ -500,9 +506,6 @@ def borrow_connection(self, timeout, routing_key=None):
remaining = timeout - time.time() + start
if remaining < 0:
break
# The connection might have been closed in the meantime - if so, try again
if conn.is_closed:
return self.borrow_connection(timeout, routing_key)
with self._stream_available_condition:
self._stream_available_condition.wait(remaining)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dkropachev I think this change is wrong. Now when connection is not closed in line 498 but becomes closed before line 495/501 the loop will stop at conditional valiable _stream_available_condition in line 507/510.

It's not a huge deal because _stream_available_condition should be called frequently but it can lead to some corner cases when a call to borrow_connection will get stuck on this conditional variable until timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haaawk , that is totally correct, that is why there is another fix to address it - d40130f

if connection in self._trash:
self._trash.remove(connection)
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
connection.close()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piodul why this is not under self._lock? If we assume something else could have removed the connection from self._trash then it could as well close the connection. Is it fine to close a connection twice?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is safe to close a connection twice. All connection "backends" check is_closed flag under the lock and returns if it is set, e.g. see here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
integration-tests If this tag, this PR won't run integration tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants