diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 3981d94bd055..e418dd1022ef 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz error; validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) -> case lists:member(Locator, [<<"client-local">>, - <<"random">>, - <<"least-leaders">>]) of + <<"random">>, + <<"least-leaders">>]) of true -> validate_stream_queue_arguments(T); false -> @@ -210,7 +210,25 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> {ok, Q} -> case is_stream_queue(Q) of true -> - {ok, maps:with([leader_node, replica_nodes], amqqueue:get_type_state(Q))}; + QState = amqqueue:get_type_state(Q), + ProcessAliveFun = fun(Pid) -> + rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000) + end, + LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of + true -> + maps:get(leader_node, QState); + _ -> + undefined + end, + ReplicaNodes = lists:foldl(fun(Pid, Acc) -> + case ProcessAliveFun(Pid) of + true -> + Acc ++ [node(Pid)]; + _ -> + Acc + end + end, [], maps:get(replica_pids, QState)), + {ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}}; _ -> {error, stream_not_found} end; diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 4507c98fe6a2..d3b48202561d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -553,8 +553,8 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St {Connection, State, Rest}; handle_frame_pre_auth(Transport, #stream_connection{socket = S, - authentication_state = AuthState0, - host = Host} = Connection0, State, + authentication_state = AuthState0, + host = Host} = Connection0, State, <>, Rest) -> @@ -578,39 +578,39 @@ handle_frame_pre_auth(Transport, C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, {C2, FrameFragment} = case AuthMechanism:handle_response(SaslBin, AuthState) of - {refused, Username, Msg, Args} -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), - auth_fail(Username, Msg, Args, C1, State), - rabbit_log:warning(Msg, Args), - {C1#stream_connection{connection_step = failure}, <>}; - {protocol_error, Msg, Args} -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream), - notify_auth_result(none, user_authentication_failure, - [{error, rabbit_misc:format(Msg, Args)}], - C1, State), - rabbit_log:warning(Msg, Args), - {C1#stream_connection{connection_step = failure}, <>}; - {challenge, Challenge, AuthState1} -> - rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream), - ChallengeSize = byte_size(Challenge), - {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, - <> - }; - {ok, User = #user{username = Username}} -> - case rabbit_access_control:check_user_loopback(Username, S) of - ok -> - rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream), - notify_auth_result(Username, user_authentication_success, - [], C1, State), - {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, - <> - }; - not_allowed -> - rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), - rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), - {C1#stream_connection{connection_step = failure}, <>} - end - end, + {refused, Username, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + auth_fail(Username, Msg, Args, C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <>}; + {protocol_error, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream), + notify_auth_result(none, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <>}; + {challenge, Challenge, AuthState1} -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream), + ChallengeSize = byte_size(Challenge), + {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, + <> + }; + {ok, User = #user{username = Username}} -> + case rabbit_access_control:check_user_loopback(Username, S) of + ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream), + notify_auth_result(Username, user_authentication_success, + [], C1, State), + {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, + <> + }; + not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), + {C1#stream_connection{connection_step = failure}, <>} + end + end, Frame = <>, frame(Transport, C1, Frame), {C2, Rest}; @@ -689,9 +689,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi PublisherId:8/unsigned, MessageCount:32, Messages/binary>>, Rest) -> case rabbit_stream_utils:check_write_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> @@ -721,9 +721,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, <>, Rest) -> case rabbit_stream_utils:check_read_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of ok -> case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of {error, not_available} -> @@ -851,13 +851,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection, State, <>, Rest) -> + ReferenceSize:16, Reference:ReferenceSize/binary, + StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) -> case rabbit_stream_utils:check_write_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of ok -> case lookup_leader(Stream, Connection) of cluster_not_found -> @@ -880,24 +880,24 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = StreamSize:16, Stream:StreamSize/binary>>, Rest) -> FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Offset} = case rabbit_stream_utils:check_read_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of - ok -> - case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of - {error, not_found} -> - {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; - {ok, LocalMemberPid} -> - {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of - undefined -> - 0; - Offt -> - Offt - end} - end; - error -> - {?RESPONSE_CODE_ACCESS_REFUSED, 0} - end, + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of + ok -> + case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of + {error, not_found} -> + {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0}; + {ok, LocalMemberPid} -> + {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of + undefined -> + 0; + Offt -> + Offt + end} + end; + error -> + {?RESPONSE_CODE_ACCESS_REFUSED, 0} + end, Transport:send(S, [<>, <>, <>, <>]), {Connection, State, Rest}; @@ -909,9 +909,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost, {ok, StreamName} -> {Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount), case rabbit_stream_utils:check_configure_permitted( - #resource{name = StreamName, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of + #resource{name = StreamName, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of ok -> case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of {ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} -> @@ -940,9 +940,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = user = #user{username = Username} = User} = Connection, State, <>, Rest) -> case rabbit_stream_utils:check_configure_permitted( - #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, - User, - #{}) of + #resource{name = Stream, kind = queue, virtual_host = VirtualHost}, + User, + #{}) of ok -> case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of {ok, deleted} -> @@ -973,6 +973,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host = %% get the nodes involved in the streams NodesMap = lists:foldl(fun(Stream, Acc) -> case rabbit_stream_manager:topology(VirtualHost, Stream) of + {ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} -> + lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc, ReplicaNodes); {ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} -> Acc1 = maps:put(LeaderNode, ok, Acc), lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);