From 1778bc22aab806310c603c162d716c1445990d50 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 28 Oct 2024 16:46:55 +0100 Subject: [PATCH 1/2] Support AMQP 1.0 token renewal Closes #9259. ## What? Allow an AMQP 1.0 client to renew an OAuth 2.0 token before it expires. ## Why? This allows clients to keep the AMQP connection open instead of having to create a new connection whenever the token expires. ## How? As explained in https://github.com/rabbitmq/rabbitmq-server/issues/9259#issuecomment-2437602040 the client can `PUT` a new token on HTTP API v2 path `/auth/tokens`. RabbitMQ will then: 1. Store the new token on the given connection. 2. Recheck access to the connection's vhost. 3. Clear all permission caches in the AMQP sessions. 4. Recheck write permissions to exchanges for links publishing to RabbitMQ, and recheck read permissions from queues for links consuming from RabbitMQ. The latter complies with the user expectation in #11364. --- deps/rabbit/src/rabbit_access_control.erl | 2 +- deps/rabbit/src/rabbit_amqp_management.erl | 14 +- deps/rabbit/src/rabbit_amqp_reader.erl | 84 +++-- deps/rabbit/src/rabbit_amqp_session.erl | 47 ++- deps/rabbit/src/rabbit_channel.erl | 2 +- .../src/rabbitmq_amqp_client.erl | 21 +- .../test/system_SUITE.erl | 291 ++++++++++++++++-- release-notes/4.1.0.md | 5 + 8 files changed, 411 insertions(+), 55 deletions(-) diff --git a/deps/rabbit/src/rabbit_access_control.erl b/deps/rabbit/src/rabbit_access_control.erl index cfc8b591eb3f..305a3b743f0f 100644 --- a/deps/rabbit/src/rabbit_access_control.erl +++ b/deps/rabbit/src/rabbit_access_control.erl @@ -249,7 +249,7 @@ check_user_id0(ClaimedUserName, #user{username = ActualUserName, end. -spec update_state(User :: rabbit_types:user(), NewState :: term()) -> - {'ok', rabbit_types:auth_user()} | + {'ok', rabbit_types:user()} | {'refused', string()} | {'error', any()}. diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index e4555e806033..9cd2669f57b1 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -381,7 +381,19 @@ handle_http_req(<<"GET">>, Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName), Bindings = [B || B = #binding{key = K} <- Bindings0, K =:= Key], RespPayload = encode_bindings(Bindings), - {<<"200">>, RespPayload, PermCaches}. + {<<"200">>, RespPayload, PermCaches}; + +handle_http_req(<<"PUT">>, + [<<"auth">>, <<"tokens">>], + _Query, + ReqPayload, + _Vhost, + _User, + ConnPid, + PermCaches) -> + {binary, Token} = ReqPayload, + ok = rabbit_amqp_reader:set_credential(ConnPid, Token), + {<<"204">>, null, PermCaches}. decode_queue({map, KVList}) -> M = lists:foldl( diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index bcfa6a1dcc8c..9ae1c3e6eeae 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -13,7 +13,8 @@ -export([init/1, info/2, - mainloop/2]). + mainloop/2, + set_credential/2]). -export([system_continue/3, system_terminate/4, @@ -53,6 +54,7 @@ channel_max :: non_neg_integer(), auth_mechanism :: sasl_init_unprocessed | {binary(), module()}, auth_state :: term(), + credential_timer :: undefined | reference(), properties :: undefined | {map, list(tuple())} }). @@ -139,6 +141,11 @@ server_properties() -> Props = [{{symbol, <<"node">>}, {utf8, atom_to_binary(node())}} | Props1], {map, Props}. +-spec set_credential(pid(), binary()) -> ok. +set_credential(Pid, Credential) -> + Pid ! {set_credential, Credential}, + ok. + %%-------------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). @@ -243,6 +250,8 @@ handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> State; handle_other(terminate_connection, _State) -> stop; +handle_other({set_credential, Cred}, State) -> + set_credential0(Cred, State); handle_other(credential_expired, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []), handle_exception(State, 0, Error); @@ -416,15 +425,17 @@ handle_connection_frame( }, helper_sup = HelperSupPid, sock = Sock} = State0) -> - logger:update_process_metadata(#{amqp_container => ContainerId}), Vhost = vhost(Hostname), + logger:update_process_metadata(#{amqp_container => ContainerId, + vhost => Vhost, + user => Username}), ok = check_user_loopback(State0), ok = check_vhost_exists(Vhost, State0), ok = check_vhost_alive(Vhost), ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}), ok = check_vhost_connection_limit(Vhost, Username), ok = check_user_connection_limit(Username), - ok = ensure_credential_expiry_timer(User), + Timer = maybe_start_credential_expiry_timer(User), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), notify_auth(user_authentication_success, Username, State0), rabbit_log_connection:info( @@ -499,7 +510,8 @@ handle_connection_frame( outgoing_max_frame_size = OutgoingMaxFrameSize, channel_max = EffectiveChannelMax, properties = Properties, - timeout = ReceiveTimeoutMillis}, + timeout = ReceiveTimeoutMillis, + credential_timer = Timer}, heartbeater = Heartbeater}, State = start_writer(State1), HostnameVal = case Hostname of @@ -871,39 +883,57 @@ check_user_connection_limit(Username) -> end. -%% TODO Provide a means for the client to refresh the credential. -%% This could be either via: -%% 1. SASL (if multiple authentications are allowed on the same AMQP 1.0 connection), see -%% https://datatracker.ietf.org/doc/html/rfc4422#section-3.8 , or -%% 2. Claims Based Security (CBS) extension, see https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html -%% and https://github.com/rabbitmq/rabbitmq-server/issues/9259 -%% 3. Simpler variation of 2. where a token is put to a special /token node. -%% -%% If the user does not refresh their credential on time (the only implementation currently), -%% close the entire connection as we must assume that vhost access could have been revoked. -%% -%% If the user refreshes their credential on time (to be implemented), the AMQP reader should -%% 1. rabbit_access_control:check_vhost_access/4 -%% 2. send a message to all its sessions which should then erase the permission caches and -%% re-check all link permissions (i.e. whether reading / writing to exchanges / queues is still allowed). -%% 3. cancel the current timer, and set a new timer -%% similary as done for Stream connections, see https://github.com/rabbitmq/rabbitmq-server/issues/10292 -ensure_credential_expiry_timer(User) -> +set_credential0(Cred, + State = #v1{connection = #v1_connection{ + user = User0, + vhost = Vhost, + credential_timer = OldTimer} = Conn, + tracked_channels = Chans, + sock = Sock}) -> + rabbit_log:info("updating credential", []), + case rabbit_access_control:update_state(User0, Cred) of + {ok, User} -> + try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of + ok -> + maps:foreach(fun(_ChanNum, Pid) -> + rabbit_amqp_session:reset_authz(Pid, User) + end, Chans), + case OldTimer of + undefined -> ok; + Ref -> ok = erlang:cancel_timer(Ref, [{info, false}]) + end, + NewTimer = maybe_start_credential_expiry_timer(User), + State#v1{connection = Conn#v1_connection{ + user = User, + credential_timer = NewTimer}} + catch _:Reason -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "access to vhost ~s failed for new credential: ~p", + [Vhost, Reason]), + handle_exception(State, 0, Error) + end; + Err -> + Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + "credential update failed: ~p", + [Err]), + handle_exception(State, 0, Error) + end. + +maybe_start_credential_expiry_timer(User) -> case rabbit_access_control:expiry_timestamp(User) of never -> - ok; + undefined; Ts when is_integer(Ts) -> Time = (Ts - os:system_time(second)) * 1000, rabbit_log:debug( - "Credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", + "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", [Time, Ts]), case Time > 0 of true -> - _TimerRef = erlang:send_after(Time, self(), credential_expired), - ok; + erlang:send_after(Time, self(), credential_expired); false -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "Credential expired ~b ms ago", [abs(Time)]) + "credential expired ~b ms ago", [abs(Time)]) end end. diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 81e4d88d071d..a406de7c4277 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -90,7 +90,8 @@ list_local/0, conserve_resources/3, check_resource_access/4, - check_read_permitted_on_topic/4 + check_read_permitted_on_topic/4, + reset_authz/2 ]). -export([init/1, @@ -393,6 +394,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, handle_max = ClientHandleMax}}) -> process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), + logger:update_process_metadata(#{channel_number => ChannelNum, + connection => ConnName, + vhost => Vhost, + user => User#user.username}), ok = pg:join(pg_scope(), self(), self()), Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), @@ -480,6 +485,10 @@ list_local() -> conserve_resources(Pid, Source, {_, Conserve, _}) -> gen_server:cast(Pid, {conserve_resources, Source, Conserve}). +-spec reset_authz(pid(), rabbit_types:user()) -> ok. +reset_authz(Pid, User) -> + gen_server:cast(Pid, {reset_authz, User}). + handle_call(Msg, _From, State) -> Reply = {error, {not_understood, Msg}}, reply(Reply, State). @@ -574,7 +583,18 @@ handle_cast({conserve_resources, Alarm, Conserve}, noreply(State); handle_cast(refresh_config, #state{cfg = #cfg{vhost = Vhost} = Cfg} = State0) -> State = State0#state{cfg = Cfg#cfg{trace_state = rabbit_trace:init(Vhost)}}, - noreply(State). + noreply(State); +handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> + State1 = State0#state{ + permission_cache = [], + topic_permission_cache = [], + cfg = Cfg#cfg{user = User}}, + try recheck_authz(State1) of + State -> + noreply(State) + catch exit:#'v1_0.error'{} = Error -> + log_error_and_close_session(Error, State1) + end. log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, @@ -3522,6 +3542,29 @@ check_topic_authorisation(#exchange{type = topic, check_topic_authorisation(_, _, _, _, Cache) -> Cache. +recheck_authz(#state{incoming_links = IncomingLinks, + outgoing_links = OutgoingLinks, + permission_cache = Cache0, + cfg = #cfg{user = User} + } = State) -> + rabbit_log:debug("rechecking link authorizations", []), + Cache1 = maps:fold( + fun(_Handle, #incoming_link{exchange = X}, Cache) -> + case X of + #exchange{name = XName} -> + check_resource_access(XName, write, User, Cache); + #resource{} = XName -> + check_resource_access(XName, write, User, Cache); + to -> + Cache + end + end, Cache0, IncomingLinks), + Cache2 = maps:fold( + fun(_Handle, #outgoing_link{queue_name = QName}, Cache) -> + check_resource_access(QName, read, User, Cache) + end, Cache1, OutgoingLinks), + State#state{permission_cache = Cache2}. + check_user_id(Mc, User) -> case rabbit_access_control:check_user_id(Mc, User) of ok -> diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 8688f5e5e679..0d7bd5bf45d7 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -470,7 +470,7 @@ force_event_refresh(Ref) -> list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). --spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. +-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}. update_user_state(Pid, UserState) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl index 0fde808151d8..ef385b6162e3 100644 --- a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl @@ -28,7 +28,9 @@ declare_exchange/3, bind_exchange/5, unbind_exchange/5, - delete_exchange/2 + delete_exchange/2, + + set_token/2 ]. -define(TIMEOUT, 20_000). @@ -381,6 +383,23 @@ delete_exchange(LinkPair, ExchangeName) -> Err end. +%% Renew OAuth 2.0 token. +-spec set_token(link_pair(), binary()) -> + ok | {error, term()}. +set_token(LinkPair, Token) -> + Props = #{subject => <<"PUT">>, + to => <<"/auth/tokens">>}, + Body = {binary, Token}, + case request(LinkPair, Props, Body) of + {ok, Resp} -> + case is_success(Resp) of + true -> ok; + false -> {error, Resp} + end; + Err -> + Err + end. + -spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) -> {ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}. request(#link_pair{session = Session, diff --git a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl index e17a76281411..8ba8eb33575a 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl @@ -11,6 +11,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). -include_lib("eunit/include/eunit.hrl"). -import(rabbit_ct_client_helpers, [close_connection/1, close_channel/1, @@ -46,8 +47,7 @@ groups() -> more_than_one_resource_server_id_not_allowed_in_one_token, mqtt_expired_token, mqtt_expirable_token, - web_mqtt_expirable_token, - amqp_expirable_token + web_mqtt_expirable_token ]}, {token_refresh, [], [ @@ -73,7 +73,14 @@ groups() -> ]}, {rich_authorization_requests, [], [ test_successful_connection_with_rich_authorization_request_token - ]} + ]}, + {amqp, [shuffle], + [ + amqp_token_expire, + amqp_token_refresh_expire, + amqp_token_refresh_vhost_permission, + amqp_token_refresh_revoked_permissions + ]} ]. %% @@ -100,7 +107,9 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). - +init_per_group(amqp, Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), + Config; init_per_group(_Group, Config) -> %% The broker is managed by {init,end}_per_testcase(). lists:foreach(fun(Value) -> @@ -109,6 +118,8 @@ init_per_group(_Group, Config) -> [<<"vhost1">>, <<"vhost2">>, <<"vhost3">>, <<"vhost4">>]), Config. +end_per_group(amqp, Config) -> + Config; end_per_group(_Group, Config) -> %% The broker is managed by {init,end}_per_testcase(). lists:foreach(fun(Value) -> @@ -500,29 +511,20 @@ mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) -> after Millis * 2 -> ct:fail("missing DISCONNECT packet from server") end. -amqp_expirable_token(Config) -> - {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), - - Seconds = 4, +%% Test that RabbitMQ closes the AMQP 1.0 connection when the token expires. +amqp_token_expire(Config) -> + Seconds = 3, Millis = Seconds * 1000, {_Algo, Token} = generate_expirable_token(Config, - [<<"rabbitmq.configure:*/*">>, - <<"rabbitmq.write:*/*">>, - <<"rabbitmq.read:*/*">>], + [<<"rabbitmq.configure:%2F/*">>, + <<"rabbitmq.write:%2F/*">>, + <<"rabbitmq.read:%2F/*">>], Seconds), - %% Send and receive a message via AMQP 1.0. + %% Send and receive a message. + {Connection, Session, LinkPair} = amqp_init(Token, Config), QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - OpnConf = #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"">>, Token}}, - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"my sender">>, Address), receive {amqp10_event, {link, Sender, credited}} -> ok @@ -535,7 +537,53 @@ amqp_expirable_token(Config) -> {ok, Msg} = amqp10_client:get_msg(Receiver), ?assertEqual([Body], amqp10_msg:body(Msg)), - %% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired. + %% In 3 seconds from now, we expect that RabbitMQ disconnects us because our token expired. + receive {amqp10_event, + {connection, Connection, + {closed, {unauthorized_access, <<"credential expired">>}}}} -> + ok + after Millis * 2 -> + ct:fail("server did not close our connection") + end. + +%% First, test the success case that an OAuth 2.0 token can be renewed via AMQP 1.0. +%% Second, test that the new token expires. +amqp_token_refresh_expire(Config) -> + Seconds = 3, + Millis = Seconds * 1000, + Scopes = [<<"rabbitmq.configure:%2F/*">>, + <<"rabbitmq.write:%2F/*">>, + <<"rabbitmq.read:%2F/*">>], + {_, Token1} = generate_expirable_token(Config, Scopes, Seconds), + + %% Send and receive a message. + {Connection, Session, LinkPair} = amqp_init(Token1, Config), + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"my sender">>, Address), + receive {amqp10_event, {link, Sender, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>, true)), + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my receiver">>, Address), + {ok, Msg1} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<"m1">>], amqp10_msg:body(Msg1)), + + %% Renew token before the old one expires. + {_, Token2} = generate_expirable_token(Config, Scopes, Seconds * 2), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + + %% Wait until old token would have expired. + timer:sleep(Millis + 500), + + %% We should still be able to send and receive a message thanks to the new token. + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t2">>, <<"m2">>, true)), + {ok, Msg2} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<"m2">>], amqp10_msg:body(Msg2)), + + %% In 2.5 seconds from now, we expect that RabbitMQ + %% disconnects us because the new token should expire. receive {amqp10_event, {connection, Connection, {closed, {unauthorized_access, <<"credential expired">>}}}} -> @@ -544,6 +592,178 @@ amqp_expirable_token(Config) -> ct:fail("server did not close our connection") end. +%% Test that RabbitMQ closes the AMQP 1.0 connection if the client +%% submits a new token without any permission to the vhost. +amqp_token_refresh_vhost_permission(Config) -> + {_, Token1} = generate_valid_token(Config), + {Connection, _Session, LinkPair} = amqp_init(Token1, Config), + + {_, Token2} = generate_valid_token(Config, + [<<"rabbitmq.configure:wrongvhost/*">>, + <<"rabbitmq.write:wrongvhost/*">>, + <<"rabbitmq.read:wrongvhost/*">>]), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + receive {amqp10_event, + {connection, Connection, + {closed, {unauthorized_access, Reason}}}} -> + ?assertMatch(<<"access to vhost / failed for new credential:", _/binary>>, + Reason) + after 5000 -> ct:fail({missing_event, ?LINE}) + end. + +%% Test that RabbitMQ closes AMQP 1.0 sessions if the client +%% submits a new token with reduced permissions. +amqp_token_refresh_revoked_permissions(Config) -> + {_, Token1} = generate_expirable_token(Config, + [<<"rabbitmq.configure:%2F/*/*">>, + <<"rabbitmq.write:%2F/*/*">>, + <<"rabbitmq.read:%2F/*/*">>], + 30), + {Connection, Session1, LinkPair} = amqp_init(Token1, Config), + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + {ok, Session3} = amqp10_client:begin_session_sync(Connection), + {ok, Session4} = amqp10_client:begin_session_sync(Connection), + {ok, Session5} = amqp10_client:begin_session_sync(Connection), + {ok, Session6} = amqp10_client:begin_session_sync(Connection), + + {ok, Sender2} = amqp10_client:attach_sender_link_sync( + Session2, <<"sender 2">>, + rabbitmq_amqp_address:exchange(<<"amq.fanout">>)), + receive {amqp10_event, {link, Sender2, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + QName = <<"q1">>, + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"#">>, #{}), + {ok, Receiver3} = amqp10_client:attach_receiver_link( + Session3, <<"receiver 3">>, rabbitmq_amqp_address:queue(QName)), + receive {amqp10_event, {link, Receiver3, attached}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + {ok, Sender4} = amqp10_client:attach_sender_link_sync(Session4, <<"sender 4">>, null), + receive {amqp10_event, {link, Sender4, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + Sender4, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName)}, + amqp10_msg:new(<<"t4">>, <<"m4a">>))), + receive {amqp10_disposition, {accepted, <<"t4">>}} -> ok + after 5000 -> ct:fail({settled_timeout, <<"t4">>}) + end, + + {ok, Sender5} = amqp10_client:attach_sender_link_sync(Session5, <<"sender 5">>, null), + receive {amqp10_event, {link, Sender5, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = amqp10_client:send_msg( + Sender5, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"topic-1">>)}, + amqp10_msg:new(<<"t5">>, <<"m5a">>))), + receive {amqp10_disposition, {accepted, <<"t5">>}} -> ok + after 5000 -> ct:fail({settled_timeout, <<"t5">>}) + end, + + XName = <<"e1">>, + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{type => <<"fanout">>}), + {ok, Sender6} = amqp10_client:attach_sender_link_sync( + Session6, <<"sender 6">>, + rabbitmq_amqp_address:exchange(XName)), + receive {amqp10_event, {link, Sender6, credited}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Revoke the previous granted permissions on the default vhost. + {_, Token2} = generate_expirable_token( + Config, + [ + %% Set configure access on q1 and e1 so that we can delete this queue and exchange later. + <<"rabbitmq.configure:%2F/*1/nope">>, + %% Set write access on amq.topic so that we can test the revoked topic permission. + <<"rabbitmq.write:%2F/amq.topic/nope">>, + <<"rabbitmq.read:%2F/nope/nope">>], + 30), + flush(<<"setting token...">>), + ok = rabbitmq_amqp_client:set_token(LinkPair, Token2), + + %% We expect RabbitMQ to close Session2 because we are no longer allowed to write to exchange amq.fanout. + receive + {amqp10_event, + {session, Session2, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'amq.fanout' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% We expect RabbitMQ to close Session3 because we are no longer allowed to read from queue q1. + %% This complies with the user expectation in + %% https://github.com/rabbitmq/rabbitmq-server/discussions/11364 + receive + {amqp10_event, + {session, Session3, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"read access to queue 'q1' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:send_msg( + Sender4, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:queue(QName)}, + amqp10_msg:new(<<"t4">>, <<"m4b">>))), + %% We expect RabbitMQ to close Session4 because we are no longer allowed to write to the default exchange. + receive + {amqp10_event, + {session, Session4, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'amq.default' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:send_msg( + Sender5, + amqp10_msg:set_properties( + #{to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"topic-1">>)}, + amqp10_msg:new(<<"t5">>, <<"m5b">>))), + %% We expect RabbitMQ to close Session5 because we are no longer allowed to write to topic topic-1. + receive + {amqp10_event, + {session, Session5, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to topic 'topic-1' in exchange" + " 'amq.topic' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% We expect RabbitMQ to close Session6 because we are no longer allowed to write to exchange e1. + receive + {amqp10_event, + {session, Session6, + {ended, + #'v1_0.error'{ + condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, <<"write access to exchange 'e1' in vhost '/' refused", _/binary>>}}}}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ?assertMatch({ok, #{message_count := 2}}, + rabbitmq_amqp_client:delete_queue(LinkPair, QName)), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = amqp10_client:end_session(Session1), + ok = amqp10_client:close_connection(Connection). + test_successful_connection_with_complex_claim_as_a_map(Config) -> {_Algo, Token} = generate_valid_token_with_extra_fields( Config, @@ -765,3 +985,30 @@ test_failed_connection_with_non_existent_scope_alias_in_scope_field(Config) -> more_than_one_resource_server_id_not_allowed_in_one_token(Config) -> {_Algo, Token} = generate_valid_token(Config, <<"rmq.configure:*/*">>, [<<"prod">>, <<"dev">>]), {error, _} = open_unmanaged_connection(Config, 0, <<"username">>, Token). + +amqp_init(Token, Config) -> + OpnConf = amqp_connection_config(Token, Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Session} = amqp10_client:begin_session_sync(Connection), + {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"my link pair">>), + {Connection, Session, LinkPair}. + +amqp_connection_config(Token, Config) -> + Host = proplists:get_value(rmq_hostname, Config), + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + #{address => Host, + port => Port, + container_id => <<"my container">>, + sasl => {plain, <<>>, Token}}. + +flush(Prefix) -> + receive + Msg -> + ct:pal("~p flushed: ~p~n", [Prefix, Msg]), + flush(Prefix) + after 1 -> + ok + end. diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index b4fe0f8b56cc..294aabe37ffc 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -24,6 +24,11 @@ Each metric is labelled by protocol (AMQP 1.0, AMQP 0.9.1, MQTT 5.0, MQTT 3.1.1, [PR #12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) enables AMQP 1.0 publishers to set multiple routing keys by using the `x-cc` message annotation. This annotation allows publishers to specify a [list](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) of routing keys ([strings](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-string)) for more flexible message distribution, similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1. +### OAuth 2.0 Token Renewal on AMQP 1.0 Connections +[PR #12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) introduces support for OAuth 2.0 token renewal on AMQP 1.0 connections. +This feature allows clients to set a new token proactively before the current one [expires](/docs/oauth2#token-expiration), ensuring uninterrupted connectivity. +If a client does not set a new token before the existing one expires, RabbitMQ will automatically close the AMQP 1.0 connection. + ## Potential incompatibilities * The default MQTT [Maximum Packet Size](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901086) changed from 256 MiB to 16 MiB. This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`. Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB). From dbd9ede67b1dfea74a02320a4bacea2643e3341b Mon Sep 17 00:00:00 2001 From: David Ansari Date: Wed, 30 Oct 2024 14:50:05 +0100 Subject: [PATCH 2/2] Use log macros for AMQP Using a log macro has the benefit that location data is added as explained in https://www.erlang.org/doc/apps/kernel/logger.html#t:metadata/0 --- deps/rabbit/src/rabbit_amqp_reader.erl | 41 ++++++++++++------------- deps/rabbit/src/rabbit_amqp_session.erl | 23 +++++++------- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 9ae1c3e6eeae..070205fa0b64 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -7,6 +7,7 @@ -module(rabbit_amqp_reader). +-include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). -include("rabbit_amqp.hrl"). @@ -329,16 +330,14 @@ error_frame(Condition, Fmt, Args) -> handle_exception(State = #v1{connection_state = closed}, Channel, #'v1_0.error'{description = {utf8, Desc}}) -> - rabbit_log_connection:error( - "Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", - [self(), closed, Channel, Desc]), + ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", + [self(), closed, Channel, Desc]), State; handle_exception(State = #v1{connection_state = CS}, Channel, Error = #'v1_0.error'{description = {utf8, Desc}}) when ?IS_RUNNING(State) orelse CS =:= closing -> - rabbit_log_connection:error( - "Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", - [self(), CS, Channel, Desc]), + ?LOG_ERROR("Error on AMQP 1.0 connection ~tp (~tp), channel number ~b:~n~tp", + [self(), CS, Channel, Desc]), close(Error, State); handle_exception(State, _Channel, Error) -> silent_close_delay(), @@ -438,10 +437,10 @@ handle_connection_frame( Timer = maybe_start_credential_expiry_timer(User), rabbit_core_metrics:auth_attempt_succeeded(<<>>, Username, amqp10), notify_auth(user_authentication_success, Username, State0), - rabbit_log_connection:info( - "Connection from AMQP 1.0 container '~ts': user '~ts' authenticated " - "using SASL mechanism ~s and granted access to vhost '~ts'", - [ContainerId, Username, Mechanism, Vhost]), + ?LOG_INFO( + "Connection from AMQP 1.0 container '~ts': user '~ts' authenticated " + "using SASL mechanism ~s and granted access to vhost '~ts'", + [ContainerId, Username, Mechanism, Vhost]), OutgoingMaxFrameSize = case ClientMaxFrame of undefined -> @@ -519,9 +518,9 @@ handle_connection_frame( null -> undefined; {utf8, Val} -> Val end, - rabbit_log:debug( - "AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p", - [HostnameVal, Vhost, IdleTimeout]), + ?LOG_DEBUG( + "AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle-time-out = ~p", + [HostnameVal, Vhost, IdleTimeout]), Infos = infos(?CONNECTION_EVENT_KEYS, State), ok = rabbit_core_metrics:connection_created( @@ -780,16 +779,16 @@ notify_auth(EventType, Username, State) -> rabbit_event:notify(EventType, EventProps). track_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels} = State) -> - rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b", - [SessionPid, ChannelNum]), + ?LOG_DEBUG("AMQP 1.0 created session process ~p for channel number ~b", + [SessionPid, ChannelNum]), _Ref = erlang:monitor(process, SessionPid, [{tag, {'DOWN', ChannelNum}}]), State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, Channels)}. untrack_channel(ChannelNum, SessionPid, #v1{tracked_channels = Channels0} = State) -> case maps:take(ChannelNum, Channels0) of {SessionPid, Channels} -> - rabbit_log:debug("AMQP 1.0 closed session process ~p with channel number ~b", - [SessionPid, ChannelNum]), + ?LOG_DEBUG("AMQP 1.0 closed session process ~p with channel number ~b", + [SessionPid, ChannelNum]), State#v1{tracked_channels = Channels}; _ -> State @@ -890,7 +889,7 @@ set_credential0(Cred, credential_timer = OldTimer} = Conn, tracked_channels = Chans, sock = Sock}) -> - rabbit_log:info("updating credential", []), + ?LOG_INFO("updating credential", []), case rabbit_access_control:update_state(User0, Cred) of {ok, User} -> try rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}) of @@ -925,9 +924,9 @@ maybe_start_credential_expiry_timer(User) -> undefined; Ts when is_integer(Ts) -> Time = (Ts - os:system_time(second)) * 1000, - rabbit_log:debug( - "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", - [Time, Ts]), + ?LOG_DEBUG( + "credential expires in ~b ms frow now (absolute timestamp = ~b seconds since epoch)", + [Time, Ts]), case Time > 0 of true -> erlang:send_after(Time, self(), credential_expired); diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index a406de7c4277..8e965aa8c8ee 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -11,6 +11,7 @@ -behaviour(gen_server). +-include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). -include("rabbit_amqp.hrl"). @@ -601,8 +602,8 @@ log_error_and_close_session( writer_pid = WriterPid, channel_num = Ch}}) -> End = #'v1_0.end'{error = Error}, - rabbit_log:warning("Closing session for connection ~p: ~tp", - [ReaderPid, Error]), + ?LOG_WARNING("Closing session for connection ~p: ~tp", + [ReaderPid, Error]), ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End), {stop, {shutdown, Error}, State}. @@ -889,8 +890,8 @@ destroy_outgoing_link(_, _, _, Acc) -> Acc. detach(Handle, Link, Error = #'v1_0.error'{}) -> - rabbit_log:warning("Detaching link handle ~b due to error: ~tp", - [Handle, Error]), + ?LOG_WARNING("Detaching link handle ~b due to error: ~tp", + [Handle, Error]), publisher_or_consumer_deleted(Link), #'v1_0.detach'{handle = ?UINT(Handle), closed = true, @@ -981,8 +982,8 @@ handle_frame(#'v1_0.flow'{handle = Handle} = Flow, %% "If set to a handle that is not currently associated with %% an attached link, the recipient MUST respond by ending the %% session with an unattached-handle session error." [2.7.4] - rabbit_log:warning( - "Received Flow frame for unknown link handle: ~tp", [Flow]), + ?LOG_WARNING("Received Flow frame for unknown link handle: ~tp", + [Flow]), protocol_error( ?V_1_0_SESSION_ERROR_UNATTACHED_HANDLE, "Unattached link handle: ~b", [HandleInt]) @@ -2161,9 +2162,9 @@ handle_deliver(ConsumerTag, AckRequired, outgoing_links = OutgoingLinks}; _ -> %% TODO handle missing link -- why does the queue think it's there? - rabbit_log:warning( - "No link handle ~b exists for delivery with consumer tag ~p from queue ~tp", - [Handle, ConsumerTag, QName]), + ?LOG_WARNING( + "No link handle ~b exists for delivery with consumer tag ~p from queue ~tp", + [Handle, ConsumerTag, QName]), State end. @@ -3008,7 +3009,7 @@ credit_reply_timeout(QType, QName) -> Fmt = "Timed out waiting for credit reply from ~s ~s. " "Hint: Enable feature flag rabbitmq_4.0.0", Args = [QType, rabbit_misc:rs(QName)], - rabbit_log:error(Fmt, Args), + ?LOG_ERROR(Fmt, Args), protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). default(undefined, Default) -> Default; @@ -3547,7 +3548,7 @@ recheck_authz(#state{incoming_links = IncomingLinks, permission_cache = Cache0, cfg = #cfg{user = User} } = State) -> - rabbit_log:debug("rechecking link authorizations", []), + ?LOG_DEBUG("rechecking link authorizations", []), Cache1 = maps:fold( fun(_Handle, #incoming_link{exchange = X}, Cache) -> case X of