Skip to content

Commit

Permalink
Merge pull request #3908 from rabbitmq/stream-coord-mnesia-update-crash
Browse files Browse the repository at this point in the history
Stream coordinator: avoid mnesia update process crashing after delete
  • Loading branch information
kjnilsson committed Dec 17, 2021
2 parents 249e8c8 + d72719a commit e6108e8
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -846,18 +846,22 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
%% This can happen during recovery
%% we need to re-initialise the queue record
%% if the stream id is a match
[Q] = mnesia:dirty_read(rabbit_durable_queue, QName),
case amqqueue:get_type_state(Q) of
#{name := S} when S == StreamId ->
rabbit_log:debug("~s: initializing queue record for stream id ~s",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
case mnesia:dirty_read(rabbit_durable_queue, QName) of
[] ->
%% queue not found at all, it must have been deleted
ok;
_ ->
ok
end,

send_self_command({mnesia_updated, StreamId, Args});
[Q] ->
case amqqueue:get_type_state(Q) of
#{name := S} when S == StreamId ->
rabbit_log:debug("~s: initializing queue record for stream id ~s",
[?MODULE, StreamId]),
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
ok;
_ ->
ok
end,
send_self_command({mnesia_updated, StreamId, Args})
end;
_ ->
send_self_command({mnesia_updated, StreamId, Args})
catch _:E ->
Expand Down

0 comments on commit e6108e8

Please sign in to comment.