Skip to content

Commit

Permalink
Check if member processes are alive in metadata command
Browse files Browse the repository at this point in the history
In case the Mnesia record is stale.
  • Loading branch information
acogoluegnes committed Nov 17, 2020
1 parent d0d901f commit 1ed8f76
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 73 deletions.
24 changes: 21 additions & 3 deletions deps/rabbitmq_stream/src/rabbit_stream_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz
error;
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
case lists:member(Locator, [<<"client-local">>,
<<"random">>,
<<"least-leaders">>]) of
<<"random">>,
<<"least-leaders">>]) of
true ->
validate_stream_queue_arguments(T);
false ->
Expand Down Expand Up @@ -210,7 +210,25 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
{ok, Q} ->
case is_stream_queue(Q) of
true ->
{ok, maps:with([leader_node, replica_nodes], amqqueue:get_type_state(Q))};
QState = amqqueue:get_type_state(Q),
ProcessAliveFun = fun(Pid) ->
rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
end,
LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of
true ->
maps:get(leader_node, QState);
_ ->
undefined
end,
ReplicaNodes = lists:foldl(fun(Pid, Acc) ->
case ProcessAliveFun(Pid) of
true ->
Acc ++ [node(Pid)];
_ ->
Acc
end
end, [], maps:get(replica_pids, QState)),
{ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
_ ->
{error, stream_not_found}
end;
Expand Down
142 changes: 72 additions & 70 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St
{Connection, State, Rest};
handle_frame_pre_auth(Transport,
#stream_connection{socket = S,
authentication_state = AuthState0,
host = Host} = Connection0, State,
authentication_state = AuthState0,
host = Host} = Connection0, State,
<<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
MechanismLength:16, Mechanism:MechanismLength/binary,
SaslFragment/binary>>, Rest) ->
Expand All @@ -578,39 +578,39 @@ handle_frame_pre_auth(Transport,
C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
{C2, FrameFragment} =
case AuthMechanism:handle_response(SaslBin, AuthState) of
{refused, Username, Msg, Args} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
auth_fail(Username, Msg, Args, C1, State),
rabbit_log:warning(Msg, Args),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
{protocol_error, Msg, Args} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
notify_auth_result(none, user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}],
C1, State),
rabbit_log:warning(Msg, Args),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
{challenge, Challenge, AuthState1} ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
ChallengeSize = byte_size(Challenge),
{C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
<<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
};
{ok, User = #user{username = Username}} ->
case rabbit_access_control:check_user_loopback(Username, S) of
ok ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
notify_auth_result(Username, user_authentication_success,
[], C1, State),
{C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
<<?RESPONSE_CODE_OK:16>>
};
not_allowed ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
end
end,
{refused, Username, Msg, Args} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
auth_fail(Username, Msg, Args, C1, State),
rabbit_log:warning(Msg, Args),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
{protocol_error, Msg, Args} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
notify_auth_result(none, user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}],
C1, State),
rabbit_log:warning(Msg, Args),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
{challenge, Challenge, AuthState1} ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
ChallengeSize = byte_size(Challenge),
{C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
<<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
};
{ok, User = #user{username = Username}} ->
case rabbit_access_control:check_user_loopback(Username, S) of
ok ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
notify_auth_result(Username, user_authentication_success,
[], C1, State),
{C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
<<?RESPONSE_CODE_OK:16>>
};
not_allowed ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
{C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
end
end,
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
frame(Transport, C1, Frame),
{C2, Rest};
Expand Down Expand Up @@ -689,9 +689,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
PublisherId:8/unsigned,
MessageCount:32, Messages/binary>>, Rest) ->
case rabbit_stream_utils:check_write_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
Expand Down Expand Up @@ -721,9 +721,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
case rabbit_stream_utils:check_read_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
{error, not_available} ->
Expand Down Expand Up @@ -851,13 +851,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct =
handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
State,
<<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, _CorrelationId:32,
ReferenceSize:16, Reference:ReferenceSize/binary,
StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
ReferenceSize:16, Reference:ReferenceSize/binary,
StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->

case rabbit_stream_utils:check_write_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
Expand All @@ -880,24 +880,24 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Offset} = case rabbit_stream_utils:check_read_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
{error, not_found} ->
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
{ok, LocalMemberPid} ->
{?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
undefined ->
0;
Offt ->
Offt
end}
end;
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
{error, not_found} ->
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
{ok, LocalMemberPid} ->
{?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
undefined ->
0;
Offt ->
Offt
end}
end;
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
end,
Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
{Connection, State, Rest};
Expand All @@ -909,9 +909,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
{ok, StreamName} ->
{Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
case rabbit_stream_utils:check_configure_permitted(
#resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
#resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
Expand Down Expand Up @@ -940,9 +940,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
user = #user{username = Username} = User} = Connection, State,
<<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
case rabbit_stream_utils:check_configure_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
#{}) of
ok ->
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
{ok, deleted} ->
Expand Down Expand Up @@ -973,6 +973,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
%% get the nodes involved in the streams
NodesMap = lists:foldl(fun(Stream, Acc) ->
case rabbit_stream_manager:topology(VirtualHost, Stream) of
{ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} ->
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc, ReplicaNodes);
{ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
Acc1 = maps:put(LeaderNode, ok, Acc),
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);
Expand Down

0 comments on commit 1ed8f76

Please sign in to comment.