Skip to content

Commit

Permalink
Fix memory leak in Xandra.Connection (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
harunzengin committed Mar 4, 2024
1 parent b45e829 commit 33cd538
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 32 deletions.
157 changes: 125 additions & 32 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ defmodule Xandra.Connection do

@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000
@max_cassandra_stream_id 32_768
@timed_out_stream_id_timeout_minutes 5
@flush_timed_out_stream_id_interval 30 * 1000

# This record is used internally when we check out a "view" of the state of
# the connection. This holds all the necessary info to encode queries and more.
Expand Down Expand Up @@ -87,7 +90,8 @@ defmodule Xandra.Connection do

:telemetry.span([:xandra, :prepare_query], metadata, fn ->
with :ok <- send_prepare_frame(state, prepared, options),
{:ok, %Frame{} = frame} <- receive_response_frame(req_alias, state, timeout) do
{:ok, %Frame{} = frame} <-
receive_response_frame(conn_pid, req_alias, state, timeout, metadata) do
case protocol_module.decode_response(frame, prepared, options) do
{%Prepared{} = prepared, warnings} ->
Prepared.Cache.insert(prepared_cache, prepared)
Expand Down Expand Up @@ -159,11 +163,22 @@ defmodule Xandra.Connection do
timeout = Keyword.fetch!(options, :timeout)
payload = query_mod.encode(query, params, options)

telemetry_meta =
checked_out_state
|> telemetry_meta(conn_pid, %{query: query})
|> Map.put(:extra_metadata, options[:telemetry_metadata])

# This is in an anonymous function so that we can use it in a Telemetry span.
fun = fn ->
with :ok <- Transport.send(transport, payload),
{:ok, %Frame{} = frame} <-
receive_response_frame(req_alias, checked_out_state, timeout) do
receive_response_frame(
conn_pid,
req_alias,
checked_out_state,
timeout,
telemetry_meta
) do
case protocol_module.decode_response(frame, query, options) do
{%_{} = response, warnings} ->
maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings)
Expand All @@ -190,11 +205,6 @@ defmodule Xandra.Connection do
end
end

telemetry_meta =
checked_out_state
|> telemetry_meta(conn_pid, %{query: query})
|> Map.put(:extra_metadata, options[:telemetry_metadata])

:telemetry.span([:xandra, :execute_query], telemetry_meta, fn ->
{fun.(), telemetry_meta}
end)
Expand Down Expand Up @@ -236,7 +246,13 @@ defmodule Xandra.Connection do
}
end

defp receive_response_frame(req_alias, checked_out_state(atom_keys?: atom_keys?), timeout) do
defp receive_response_frame(
conn_pid,
req_alias,
checked_out_state(atom_keys?: atom_keys?, stream_id: stream_id),
timeout,
telemetry_metadata
) do
receive do
{^req_alias, {:ok, %Frame{} = frame}} ->
frame = %Frame{frame | atom_keys?: atom_keys?}
Expand All @@ -249,7 +265,8 @@ defmodule Xandra.Connection do
{:error, {:connection_crashed, reason}}
after
timeout ->
Process.demonitor(req_alias, [:flush])
:telemetry.execute([:xandra, :client_timeout], %{}, telemetry_metadata)
:gen_statem.cast(conn_pid, {:timed_out_id, stream_id})
{:error, :timeout}
end
end
Expand Down Expand Up @@ -279,8 +296,8 @@ defmodule Xandra.Connection do
current_keyspace: String.t() | nil,
default_consistency: atom(),
disconnection_reason: term(),
free_stream_ids: MapSet.t(stream_id()),
in_flight_requests: %{optional(stream_id()) => term()},
timed_out_ids: %{optional(stream_id()) => DateTime.t()},
options: keyword(),
original_options: keyword(),
peername: {:inet.ip_address(), :inet.port_number()},
Expand Down Expand Up @@ -310,8 +327,8 @@ defmodule Xandra.Connection do
:protocol_module,
:protocol_version,
:transport,
free_stream_ids: MapSet.new(1..@max_concurrent_requests),
in_flight_requests: %{},
timed_out_ids: %{},
current_keyspace: nil,
buffer: <<>>
]
Expand All @@ -324,7 +341,13 @@ defmodule Xandra.Connection do
@impl true
def init(options) do
data = %__MODULE__{original_options: options, configure: Keyword.get(options, :configure)}
{:ok, :disconnected, data, {:next_event, :internal, :connect}}

actions = [
{:next_event, :internal, :connect},
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}
]

{:ok, :disconnected, data, actions}
end

## "Disconnected" state
Expand All @@ -341,11 +364,9 @@ defmodule Xandra.Connection do
send(data.cluster_pid, {:xandra, :disconnected, data.peername, self()})
end

data =
Enum.reduce(data.in_flight_requests, data, fn {stream_id, req_alias}, data_acc ->
send_reply(req_alias, {:error, :disconnected})
update_in(data_acc.free_stream_ids, &MapSet.put(&1, stream_id))
end)
Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} ->
send_reply(req_alias, {:error, :disconnected})
end)

data = put_in(data.in_flight_requests, %{})

Expand Down Expand Up @@ -509,11 +530,36 @@ defmodule Xandra.Connection do
end

def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
{:keep_state, data}
end

def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## "Connected" state

def connected(:enter, :disconnected, %__MODULE__{} = data) do
Expand Down Expand Up @@ -546,12 +592,15 @@ defmodule Xandra.Connection do
end
end

def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{
in_flight_requests: in_flight_requests
})
when map_size(in_flight_requests) == @max_concurrent_requests do
{:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_requests}}}
end

def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
{stream_id, data} =
get_and_update_in(data.free_stream_ids, fn ids ->
id = Enum.at(ids, 0)
{id, MapSet.delete(ids, id)}
end)
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

response =
checked_out_state(
Expand Down Expand Up @@ -598,11 +647,36 @@ defmodule Xandra.Connection do
end

def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
{:keep_state, data}
end

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## Helpers

defp startup_connection(
Expand Down Expand Up @@ -685,18 +759,27 @@ defmodule Xandra.Connection do

defp handle_frame(%__MODULE__{} = data, %Frame{stream_id: stream_id} = frame) do
case pop_in(data.in_flight_requests[stream_id]) do
{nil, _data} ->
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
{nil, data} ->
if Map.has_key?(data.timed_out_ids, stream_id) do
:telemetry.execute(
[:xandra, :timed_out_response],
telemetry_meta(data, %{stream_id: stream_id})
)

update_in(data.timed_out_ids, &Map.delete(&1, stream_id))
else
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
#{inspect(frame)}
"""
#{inspect(frame)}
"""
end

{req_alias, data} ->
send_reply(req_alias, {:ok, frame})
update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data
end
end

Expand Down Expand Up @@ -808,4 +891,14 @@ defmodule Xandra.Connection do
:telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata)
end
end

defp random_free_stream_id(in_flight_requests, timed_out_ids) do
random_id = Enum.random(1..@max_cassandra_stream_id)

if Map.has_key?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do
random_free_stream_id(in_flight_requests, timed_out_ids)
else
random_id
end
end
end
11 changes: 11 additions & 0 deletions lib/xandra/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ defmodule Xandra.Telemetry do
[:xandra, :prepared_cache, :miss],
[:xandra, :prepare_query, :stop],
[:xandra, :execute_query, :stop],
[:xandra, :client_timeout],
[:xandra, :timed_out_response],
[:xandra, :server_warnings],
[:xandra, :cluster, :change_event],
[:xandra, :cluster, :control_connection, :connected],
Expand Down Expand Up @@ -170,6 +172,15 @@ defmodule Xandra.Telemetry do
[:server_warnings] ->
Logger.warning("Received warnings: #{inspect(measurements.warnings)}", logger_meta)

[:client_timeout] ->
Logger.error("Client timeout for query: #{inspect(metadata.query)}")

[:timed_out_response] ->
Logger.warning(
"Received response for stream id #{metadata.stream_id}, but request had already timed out",
logger_meta
)

[:prepared_cache, status] when status in [:hit, :miss] ->
query = inspect(metadata.query)
Logger.debug("Prepared cache #{status} for query: #{query}", logger_meta)
Expand Down

0 comments on commit 33cd538

Please sign in to comment.