-
-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory leak in Xandra.Connection
#355
Fix memory leak in Xandra.Connection
#355
Conversation
lib/xandra/connection.ex
Outdated
@@ -249,7 +255,7 @@ defmodule Xandra.Connection do | |||
{:error, {:connection_crashed, reason}} | |||
after | |||
timeout -> | |||
Process.demonitor(req_alias, [:flush]) | |||
:gen_statem.cast(conn_pid, {:release_stream_id, stream_id}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah, I see okay. Couple of questions, since it's gonna take me a sec to load all of Xandra's context back into my head 😄
- We should still demonitor the request alias, no?
- What happens if the response for this stream ID then comes after the timeout? Doesn't this happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yeah, the
req_alias
gets demonitored here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Good point, I thought that there was something fishy here, but didn't think it through.
- If the old stream_id that we made a request for but didn't get an answer from cassandra within
timeout
ms isn't currently used by another request, we should raise that error, yes. - More problematic: If the old stream_id that we made a request for but didn't get an answer from cassandra within
timeout
ms is currently being used by another in_flight_request, we will be sending the answer to that new request. So we will be mixing up replies. And the fact that it gets used by another request is very likely, since we're fetching stream_ids withEnum.at(MapSet.new(5000), 0)
, which is eventually ordered and we're using the first elements of the MapSet.
Not sure how to solve this though. Is it possible to somehow encode the req_alias to the cassandra request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea would be to free the stream ids waaay later after the timout, so it is unlikely that we get a reply from cassandra.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, the stream_id is a short, so a reference wouldn't fit.
https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec#L318
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harunzengin what's the throughput you're testing at? Because ~30 just unanswered responses in 5 minutes is a huge number, I’m pretty confused. Also, you could try asking whether this is a possibility in the C* community, they have a ton of ways to get in touch and people are really nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rate is 1000 insertions per second. We have a cluster of 3 nodes, with 10 connections on each pool. So it's around 33 insertions per Xandra.Connection
per second.
I have to emphasize that with DBConnection, we had 0 timeouts on our staging system, for the same 1000x insertions per second. That's the reason why I suspect the async protocol being the cause.
I'll ask in the Cassandra community, but in the meantime, can we agree on setting a second timeout to the timed_out_ids
, let's say 30 minutes? I already implemented it and it is ready for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@harunzengin 30 minutes is too long. If you won't get a resp within something like 5 min, I don't see how you would (for a single query at least).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@whatyouhide Reduced it to 5 minutes
@whatyouhide This is ready for review if you have time :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments but this is looking really good.
Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} -> | ||
send_reply(req_alias, {:error, :disconnected}) | ||
end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m confused: now, we send the reply to the caller but we don't update the in_flight_requests
, which we were resetting to %{}
before. How does this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I must've overseen this, I deleted it in another iteration.
lib/xandra/connection.ex
Outdated
data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id)) | ||
|
||
actions = [ | ||
{{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should try to avoid setting potentially-thousands of timeouts here. Instead, what we could do is store the timestamp that a stream ID timed out at and then periodically clean those timeouts. For example, stored timed_out_ids
as %{id => timed_out_at, ...}
. Then, every 30 seconds, flush the ones that are older than 5 minutes. Makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
Xandra.Connection
Co-authored-by: Andrea Leopardi <an.leopardi@gmail.com>
@whatyouhide ready for another pass |
@whatyouhide Also added a telemetry event for a client timeout 3a5f2b2 |
There are some adjustments to do here on docs but this looks fantastic. I'll take care of those @harunzengin, thank you for all the great work and for the patience with this long review time 😄 |
@whatyouhide cool and no worries. Should we release a patch then? |
@harunzengin I opened #358 first with a couple of fixes. Before releasing a patch, would you have a chance to run this for a bit and see the impact? Especially around the timed out IDs. |
Closes #354
Fixed the memory leak in
Xandra.Connection
that was caused by not releasing stream ids.This is how the memory usage of our application looks like after the fix at 08.02 around 10:00: