Skip to content

Commit

Permalink
Merge pull request #10088 from rabbitmq/convert-rabbit_writer-to-gen_…
Browse files Browse the repository at this point in the history
…server

rabbit_writer: Convert to a regular gen_server
  • Loading branch information
dumbbell committed May 27, 2024
2 parents 19a7518 + c607eb0 commit a337684
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 71 deletions.
13 changes: 7 additions & 6 deletions deps/amqp_client/src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,11 @@ handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
handle_shutdown({connection_closing, ReportedReason}, State);
handle_channel_exit(Reason, State) ->
%% Unexpected death of a channel infrastructure process
{stop, {infrastructure_died, Reason}, State}.
Reason1 = case Reason of
{shutdown, R} -> R;
_ -> Reason
end,
{stop, {infrastructure_died, Reason1}, State}.

handle_shutdown({_, 200, _}, State) ->
{stop, normal, State};
Expand Down Expand Up @@ -872,11 +876,8 @@ do(Method, Content, Flow, #state{driver = direct, writer = W}) ->


flush_writer(#state{driver = network, writer = Writer}) ->
try
rabbit_writer:flush(Writer)
catch
exit:noproc -> ok
end;
_ = catch rabbit_writer:flush(Writer),
ok;
flush_writer(#state{driver = direct}) ->
ok.
amqp_msg(none) ->
Expand Down
22 changes: 19 additions & 3 deletions deps/amqp_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,13 @@ channel_writer_death(Config) ->
when ConnType =:= direct -> ok;

exit:{{infrastructure_died, {unknown_properties_record, <<>>}}, _}
when ConnType =:= network -> ok
when ConnType =:= network -> ok;

%% The writer process exited before the call and the amqp_channel_sup
%% supervisor shut the supervision tree down because the channel is
%% significant. The call happened at that shutdown time or just after.
exit:{shutdown, {gen_server, call, _}} -> ok;
exit:{noproc, {gen_server, call, _}} -> ok
end,
wait_for_death(Channel),
wait_for_death(Connection).
Expand Down Expand Up @@ -1435,7 +1441,12 @@ shortstr_overflow_property(Config) ->
Ret = amqp_channel:call(Channel, QoS),
throw({unexpected_success, Ret})
catch
exit:{{infrastructure_died, content_properties_shortstr_overflow}, _} -> ok
exit:{{infrastructure_died, content_properties_shortstr_overflow}, _} -> ok;
%% The writer process exited before the call and the amqp_channel_sup
%% supervisor shut the supervision tree down because the channel is
%% significant. The call happened at that shutdown time or just after.
exit:{shutdown, {gen_server, call, _}} -> ok;
exit:{noproc, {gen_server, call, _}} -> ok
end,
wait_for_death(Channel),
wait_for_death(Connection).
Expand All @@ -1457,7 +1468,12 @@ shortstr_overflow_field(Config) ->
consumer_tag = SentString}),
throw({unexpected_success, Ret})
catch
exit:{{infrastructure_died, method_field_shortstr_overflow}, _} -> ok
exit:{{infrastructure_died, method_field_shortstr_overflow}, _} -> ok;
%% The writer process exited before the call and the amqp_channel_sup
%% supervisor shut the supervision tree down because the channel is
%% significant. The call happened at that shutdown time or just after.
exit:{shutdown, {gen_server, call, _}} -> ok;
exit:{noproc, {gen_server, call, _}} -> ok
end,
wait_for_death(Channel),
wait_for_death(Connection).
Expand Down
126 changes: 64 additions & 62 deletions deps/rabbit_common/src/rabbit_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

-module(rabbit_writer).

-behavior(gen_server).

%% This module backs writer processes ("writers"). The responsibility of
%% a writer is to serialise protocol methods and write them to the socket.
%% Every writer is associated with a channel and normally it's the channel
Expand All @@ -27,7 +29,12 @@
-include("rabbit.hrl").
-export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]).

-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
Expand All @@ -37,9 +44,6 @@
-export([internal_send_command/4, internal_send_command/6]).
-export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]).

%% internal
-export([enter_mainloop/2, mainloop/2, mainloop1/2]).

-record(wstate, {
%% socket (port)
sock,
Expand Down Expand Up @@ -97,10 +101,6 @@
rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) ->
rabbit_types:ok(pid()).

-spec system_code_change(_,_,_,_) -> {'ok',_}.
-spec system_continue(_,_,#wstate{}) -> any().
-spec system_terminate(_,_,_,_) -> no_return().

-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
-spec send_command
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
Expand Down Expand Up @@ -161,13 +161,15 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start(?MODULE, [Identity, State], Options).

start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, [Identity, State], Options).

initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GCThreshold) ->
(case ReaderWantsStats of
Expand All @@ -182,49 +184,57 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GC
writer_gc_threshold = GCThreshold},
#wstate.stats_timer).

system_continue(Parent, Deb, State) ->
mainloop(Deb, State#wstate{reader = Parent}).

system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).

system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.

enter_mainloop(Identity, State) ->
init([Identity, State]) ->
?LG_PROCESS_TYPE(writer),
Deb = sys:debug_options([]),
?store_proc_name(Identity),
mainloop(Deb, State).
{ok, State}.

mainloop(Deb, State) ->
handle_call({send_command_sync, MethodRecord}, _From, State) ->
try
mainloop1(Deb, State)
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
{reply, ok, State1, 0}
catch
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Error}
end,
done.

mainloop1(Deb, State = #wstate{pending = []}) ->
receive
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
?MODULE:mainloop1(Deb1, State1)
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [Deb, State])
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
mainloop1(Deb, State) ->
receive
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
?MODULE:mainloop1(Deb1, State1)
after 0 ->
?MODULE:mainloop1(Deb, internal_flush(State))
handle_call({send_command_sync, MethodRecord, Content}, _From, State) ->
try
State1 = internal_flush(
internal_send_command_async(MethodRecord, Content, State)),
{reply, ok, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
handle_call(flush, _From, State) ->
try
State1 = internal_flush(State),
{reply, ok, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end.

handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
handle_message(Deb, Message, State) ->
{Deb, handle_message(Message, State)}.
handle_cast(_Message, State) ->
{noreply, State, 0}.

handle_info(timeout, State) ->
try
State1 = internal_flush(State),
{noreply, State1}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
handle_info(Message, State) ->
try
State1 = handle_message(Message, State),
{noreply, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end.

handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
Expand All @@ -236,21 +246,6 @@ handle_message({send_command_flow, MethodRecord, Sender}, State) ->
handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
credit_flow:ack(Sender),
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, Content, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, flush}, State) ->
State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
State1 = internal_send_command_async(MethodRecord, State),
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
Expand All @@ -277,6 +272,14 @@ handle_message({ok, _Ref} = Msg, State) ->
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).

terminate(Reason, State) ->
#wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Reason},
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%---------------------------------------------------------------------------

send_command(W, MethodRecord) ->
Expand Down Expand Up @@ -316,8 +319,7 @@ flush(W) -> call(W, flush).
%%---------------------------------------------------------------------------

call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
gen_server:call(Pid, Msg, infinity).

%%---------------------------------------------------------------------------

Expand Down

0 comments on commit a337684

Please sign in to comment.