Skip to content

Commit

Permalink
Merge pull request #3413 from rabbitmq/stream-reader-close-in-terminate
Browse files Browse the repository at this point in the history
Stream reader: close osiris logs and sockets in terminate

(cherry picked from commit c915d2e)
  • Loading branch information
michaelklishin committed Sep 13, 2021
1 parent ba1635a commit be2d8a4
Showing 1 changed file with 14 additions and 30 deletions.
44 changes: 14 additions & 30 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1,6 +1,4 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 2.0 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/en-US/MPL/2.0/
%%
%% Software distributed under the License is distributed on an "AS IS"
Expand Down Expand Up @@ -173,11 +171,15 @@
callback_mode() ->
[state_functions, state_enter].

terminate(Reason, State, StatemData) ->
terminate(Reason, State,
#statem_data{transport = Transport,
connection = #stream_connection{socket = Socket},
connection_state = ConnectionState} = StatemData) ->
close(Transport, Socket, ConnectionState),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(StatemData),
rabbit_log:debug("~p terminating in state '~s' with reason '~p'",
[?MODULE, State, Reason]).
rabbit_log:debug("~s terminating in state '~s' with reason '~W'",
[?MODULE, State, Reason, 10]).

start_link(KeepaliveSup, Transport, Ref, Opts) ->
{ok,
Expand Down Expand Up @@ -715,7 +717,6 @@ open(info, {OK, S, Data},
#stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
close(Transport, S, State),
stop;
close_sent ->
rabbit_log_connection:debug("Transitioned to close_sent"),
Expand Down Expand Up @@ -800,8 +801,7 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
connection_state = State1}};
open(info, heartbeat_send,
#statem_data{transport = Transport,
connection = #stream_connection{socket = S} = Connection,
connection_state = State}) ->
connection = #stream_connection{socket = S} = Connection}) ->
Frame = rabbit_stream_core:frame(heartbeat),
case catch send(Transport, S, Frame) of
ok ->
Expand All @@ -810,16 +810,12 @@ open(info, heartbeat_send,
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
[Unexpected]),
_C1 = demonitor_all_streams(Connection),
close(Transport, S, State),
stop
end;
open(info, heartbeat_timeout,
#statem_data{transport = Transport,
connection = #stream_connection{socket = S} = Connection,
connection_state = State}) ->
#statem_data{connection = #stream_connection{} = Connection}) ->
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
_C1 = demonitor_all_streams(Connection),
close(Transport, S, State),
stop;
open(info, {infos, From},
#statem_data{connection =
Expand Down Expand Up @@ -852,14 +848,11 @@ open({call, From}, {publishers_info, Items},
{keep_state_and_data,
{reply, From, publishers_infos(Items, Connection)}};
open({call, From}, {shutdown, Explanation},
#statem_data{transport = Transport,
connection = #stream_connection{socket = S} = Connection,
connection_state = State}) ->
#statem_data{connection = Connection}) ->
% likely closing call from the management plugin
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
demonitor_all_streams(Connection),
close(Transport, S, State),
{stop_and_reply, normal, {reply, From, ok}};
open(cast,
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
Expand Down Expand Up @@ -1059,14 +1052,9 @@ close_sent(enter, _OldState,
#configuration{connection_negotiation_step_timeout =
StateTimeout}}) ->
{keep_state_and_data, {state_timeout, StateTimeout, close}};
close_sent(state_timeout, close,
#statem_data{transport = Transport,
connection = #stream_connection{socket = Socket},
connection_state = State}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state "
"'~s' likely due to lack of client action.",
close_sent(state_timeout, close, #statem_data{}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
close(Transport, Socket, State),
stop;
close_sent(info, {tcp, S, Data},
#statem_data{transport = Transport,
Expand All @@ -1081,7 +1069,6 @@ close_sent(info, {tcp, S, Data},
[?FUNCTION_NAME, Step]),
case Step of
closing_done ->
close(Transport, S, State1),
stop;
_ ->
Transport:setopts(S, [{active, once}]),
Expand All @@ -1093,12 +1080,9 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]",
[S, self()]),
stop;
close_sent(info, {tcp_error, S, Reason},
#statem_data{transport = Transport, connection_state = State}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
"[~w]",
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
[Reason, S, self()]),
close(Transport, S, State),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
StatemData = #statem_data{connection = Connection}) ->
Expand Down

0 comments on commit be2d8a4

Please sign in to comment.