diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 2db20aec7ae3..64a9a8171707 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -931,18 +931,27 @@ send_self_command(Cmd) -> phase_delete_member(StreamId, #{node := Node} = Arg, Conf) -> fun() -> - try osiris_server_sup:delete_child(Node, Conf) of - ok -> - rabbit_log:info("~ts: Member deleted for ~ts : on node ~ts", + case rabbit_nodes:is_member(Node) of + true -> + try osiris_server_sup:delete_child(Node, Conf) of + ok -> + rabbit_log:info("~ts: Member deleted for ~ts : on node ~ts", + [?MODULE, StreamId, Node]), + send_self_command({member_deleted, StreamId, Arg}); + _ -> + send_action_failed(StreamId, deleting, Arg) + catch _:E -> + rabbit_log:warning("~ts: Error while deleting member for ~ts : on node ~ts ~W", + [?MODULE, StreamId, Node, E, 10]), + maybe_sleep(E), + send_action_failed(StreamId, deleting, Arg) + end; + false -> + %% node is no longer a cluster member, we return success to avoid + %% trying to delete the member indefinitely + rabbit_log:info("~ts: Member deleted/forgotten for ~ts : node ~ts is no longer a cluster member", [?MODULE, StreamId, Node]), - send_self_command({member_deleted, StreamId, Arg}); - _ -> - send_action_failed(StreamId, deleting, Arg) - catch _:E -> - rabbit_log:warning("~ts: Error while deleting member for ~ts : on node ~ts ~W", - [?MODULE, StreamId, Node, E, 10]), - maybe_sleep(E), - send_action_failed(StreamId, deleting, Arg) + send_self_command({member_deleted, StreamId, Arg}) end end. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 35eb5ab49e1e..880f35097081 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -926,14 +926,9 @@ delete_replica(VHost, Name, Node) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {error, quorum_queue_not_supported}; {ok, Q} when ?amqqueue_is_stream(Q) -> - case lists:member(Node, rabbit_nodes:list_running()) of - false -> - {error, node_not_running}; - true -> - #{name := StreamId} = amqqueue:get_type_state(Q), - {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), - Reply - end; + #{name := StreamId} = amqqueue:get_type_state(Q), + {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), + Reply; E -> E end. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index 257d9de7ea11..c31327b9b469 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -16,6 +16,8 @@ -compile(nowarn_export_all). -compile(export_all). +-define(WAIT, 5000). + suite() -> [{timetrap, 15 * 60000}]. @@ -583,7 +585,7 @@ delete_replica(Config) -> declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), check_leader_and_replicas(Config, [Server0, Server1, Server2]), %% Not a member of the cluster, what would happen? - ?assertEqual({error, node_not_running}, + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, 'zen@rabbit'])), ?assertEqual(ok, @@ -725,17 +727,20 @@ delete_down_replica(Config) -> declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), check_leader_and_replicas(Config, [Server0, Server1, Server2]), ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - ?assertEqual({error, node_not_running}, + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1])), - %% check it isn't gone - check_leader_and_replicas(Config, [Server0, Server1, Server2], members), + %% check it's gone + check_leader_and_replicas(Config, [Server0, Server2], members), ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - rabbit_ct_helpers:await_condition( - fun() -> - ok == rpc:call(Server0, rabbit_stream_queue, delete_replica, - [<<"/">>, Q, Server1]) - end), + check_leader_and_replicas(Config, [Server0, Server2], members), + %% check the folder was deleted + QName = rabbit_misc:r(<<"/">>, queue, Q), + StreamId = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_stream_id, [QName]), + Server1DataDir = rabbit_ct_broker_helpers:get_node_config(Config, 1, data_dir), + DeletedReplicaDir = filename:join([Server1DataDir, "stream", StreamId]), + timer:sleep(1000), + ?awaitMatch(false, filelib:is_dir(DeletedReplicaDir), ?WAIT), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). publish_coordinator_unavailable(Config) -> @@ -1294,6 +1299,12 @@ get_leader_info(QName) -> {error, not_found} end. +get_stream_id(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + QState = amqqueue:get_type_state(Q), + #{name := StreamId} = QState, + StreamId. + kill_process(Config, Node, Pid) -> rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, do_kill_process, [Pid]).