From 7bbc7962e718853858b830945f2c4d65233a0a64 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Sun, 9 Feb 2025 13:28:41 +0100 Subject: [PATCH 1/5] Orderly shutdown of sessions Make AMQP 1.0 connection shut down its sessions before sending the close frame to the client similar to how the AMQP 0.9.1 connection shuts down its channels before closing the connection. This commit avoids concurrent deletion of exclusive queues by the session process and the classic queue process. This commit should also fix https://github.com/rabbitmq/rabbitmq-server/issues/2596 (cherry picked from commit 06ec8f0342ae120a7a6b48a90392df052555d4e8) --- deps/rabbit/include/rabbit_amqp_reader.hrl | 2 ++ deps/rabbit/src/rabbit_amqp_reader.erl | 36 ++++++++++++++++++++-- deps/rabbit/src/rabbit_amqp_session.erl | 4 ++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl index 0077a9c9c2be..732bc9f04398 100644 --- a/deps/rabbit/include/rabbit_amqp_reader.hrl +++ b/deps/rabbit/include/rabbit_amqp_reader.hrl @@ -3,6 +3,8 @@ -define(CLOSING_TIMEOUT, 30_000). -define(SILENT_CLOSE_DELAY, 3_000). +-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000). + %% Allow for potentially large sets of tokens during the SASL exchange. %% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915 -define(INITIAL_MAX_FRAME_SIZE, 8192). diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 423aa84ed829..f18387fb0a47 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -220,10 +220,17 @@ terminate(_, _) -> %%-------------------------------------------------------------------------- %% error handling / termination -close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> +close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) -> %% Client properties will be emitted in the connection_closed event by rabbit_reader. - ClientProperties = i(client_properties, State), + ClientProperties = i(client_properties, State0), put(client_properties, ClientProperties), + + %% "It is illegal to send any more frames (or bytes of any other kind) + %% after sending a close frame." [2.7.9] + %% Sessions might send frames via the writer proc. + %% Therefore, let's first try to orderly shutdown our sessions. + State = shutdown_sessions(State0), + Time = case Timeout > 0 andalso Timeout < ?CLOSING_TIMEOUT of true -> Timeout; @@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing), State#v1{connection_state = closed}. +shutdown_sessions(#v1{tracked_channels = Channels} = State) -> + maps:foreach(fun(_ChannelNum, Pid) -> + gen_server:cast(Pid, shutdown) + end, Channels), + TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT, + self(), + shutdown_sessions_timeout), + wait_for_shutdown_sessions(TimerRef, State). + +wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State) + when map_size(Channels) =:= 0 -> + ok = erlang:cancel_timer(TimerRef, [{async, false}, + {info, false}]), + State; +wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) -> + receive + {{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} -> + State = untrack_channel(ChannelNum, SessionPid, State0), + wait_for_shutdown_sessions(TimerRef, State); + shutdown_sessions_timeout -> + ?LOG_INFO("sessions not shut down after ~b ms: ~p", + [?SHUTDOWN_SESSIONS_TIMEOUT, Channels]), + State0 + end. + handle_session_exit(ChannelNum, SessionPid, Reason, State0) -> State = untrack_channel(ChannelNum, SessionPid, State0), S = case terminated_normally(Reason) of diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index b23c492d3bfe..2ecc5728b531 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -602,7 +602,9 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) -> noreply(State) catch exit:#'v1_0.error'{} = Error -> log_error_and_close_session(Error, State1) - end. + end; +handle_cast(shutdown, State) -> + {stop, normal, State}. log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, From 7245f57c1990713786d826ddde8c7df76b01cc3e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 4 Feb 2025 18:45:24 +0100 Subject: [PATCH 2/5] Support dynamic creation of queues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What? Support the `dynamic` field of sources and targets. ## Why? 1. This allows AMQP clients to dynamically create exclusive queues, which can be useful for RPC workloads. 2. Support creation of JMS temporary queues over AMQP using the Qpid JMS client. Exclusive queues map very nicely to JMS temporary queues because: > Although sessions are used to create temporary destinations, this is only for convenience. Their scope is actually the entire connection. Their lifetime is that of their connection and any of the connection’s sessions are allowed to create a consumer for them. https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#creating-temporary-destinations ## How? If the terminus contains the capability `temporary-queue` as defined in [amqp-bindmap-jms-v1.0-wd10](https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=67638) [5.2] and as sent by Qpid JMS client, RabbitMQ will create an exclusive queue. (This allows a future commit to take other actions if capability `temporary-topic` will be used, such as the additional creation of bindings.) No matter what the desired node properties are, RabbitMQ will set the lifetime policy delete-on-close deleting the exclusive queue when the link which caused its creation ceases to exist. This means the exclusive queue will be deleted if either: * the link gets detached, or * the session ends, or * the connection closes Although the AMQP JMS Mapping and Qpid JMS create only a **sending** link with `dynamic=true`, this commit also supports **receiving** links with `dynamic=true` for non-JMS AMQP clients. RabbitMQ is free to choose the generated queue name. As suggested by the AMQP spec, the generated queue name will contain the container-id and link name unless they are very long. Co-authored-by: Arnaud Cogoluègnes (cherry picked from commit 9062476a180ee1e167a9ecd27025eaffe6f84186) --- .../src/amqp10_client_session.erl | 36 +- deps/rabbit/src/rabbit_amqp_reader.erl | 6 +- deps/rabbit/src/rabbit_amqp_session.erl | 313 +++++++++++++----- deps/rabbit/test/amqp_auth_SUITE.erl | 91 +++++ deps/rabbit/test/amqp_client_SUITE.erl | 228 +++++++++++++ deps/rabbit/test/amqp_jms_SUITE.erl | 50 ++- .../java/com/rabbitmq/amqp/tests/jms/Cli.java | 163 +++++++++ .../amqp/tests/jms/JmsConnectionTest.java | 199 +++++++++++ .../amqp/tests/jms/JmsTemporaryQueueTest.java | 135 ++++++++ .../com/rabbitmq/amqp/tests/jms/JmsTest.java | 57 +++- .../rabbitmq/amqp/tests/jms/TestUtils.java | 66 ++++ 11 files changed, 1243 insertions(+), 101 deletions(-) create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index 7b7418058714..435cce8aed61 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) -> make_source(#{role := {sender, _}}) -> #'v1_0.source'{}; -make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) -> +make_source(#{role := {receiver, Source, _Pid}, + filter := Filter}) -> Durable = translate_terminus_durability(maps:get(durable, Source, none)), + Dynamic = maps:get(dynamic, Source, false), TranslatedFilter = translate_filters(Filter), - #'v1_0.source'{address = {utf8, Address}, + #'v1_0.source'{address = make_address(Source), durable = {uint, Durable}, - filter = TranslatedFilter}. + dynamic = Dynamic, + filter = TranslatedFilter, + capabilities = make_capabilities(Source)}. make_target(#{role := {receiver, _Source, _Pid}}) -> #'v1_0.target'{}; -make_target(#{role := {sender, #{address := Address} = Target}}) -> +make_target(#{role := {sender, Target}}) -> Durable = translate_terminus_durability(maps:get(durable, Target, none)), - TargetAddr = case is_binary(Address) of - true -> {utf8, Address}; - false -> Address - end, - #'v1_0.target'{address = TargetAddr, - durable = {uint, Durable}}. + Dynamic = maps:get(dynamic, Target, false), + #'v1_0.target'{address = make_address(Target), + durable = {uint, Durable}, + dynamic = Dynamic, + capabilities = make_capabilities(Target)}. + +make_address(#{address := Addr}) -> + if is_binary(Addr) -> + {utf8, Addr}; + is_atom(Addr) -> + Addr + end. + +make_capabilities(#{capabilities := Caps0}) -> + Caps = [{symbol, C} || C <- Caps0], + {array, symbol, Caps}; +make_capabilities(_) -> + undefined. max_message_size(#{max_message_size := Size}) when is_integer(Size) andalso diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index f18387fb0a47..3e5d5cc08dd7 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -260,8 +260,8 @@ wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) State = untrack_channel(ChannelNum, SessionPid, State0), wait_for_shutdown_sessions(TimerRef, State); shutdown_sessions_timeout -> - ?LOG_INFO("sessions not shut down after ~b ms: ~p", - [?SHUTDOWN_SESSIONS_TIMEOUT, Channels]), + ?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p", + [?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]), State0 end. @@ -792,6 +792,7 @@ send_to_new_session( connection = #v1_connection{outgoing_max_frame_size = MaxFrame, vhost = Vhost, user = User, + container_id = ContainerId, name = ConnName}, writer = WriterPid} = State) -> %% Subtract fixed frame header size. @@ -804,6 +805,7 @@ send_to_new_session( OutgoingMaxFrameSize, User, Vhost, + ContainerId, ConnName, BeginFrame], case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 2ecc5728b531..4ad681707a25 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -85,8 +85,10 @@ -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(HIBERNATE_AFTER, 6_000). -define(CREDIT_REPLY_TIMEOUT, 30_000). +%% Capability defined in amqp-bindmap-jms-v1.0-wd10 [5.2] and sent by Qpid JMS client. +-define(CAP_TEMPORARY_QUEUE, <<"temporary-queue">>). --export([start_link/8, +-export([start_link/9, process_frame/2, list_local/0, conserve_resources/3, @@ -163,6 +165,7 @@ routing_key :: rabbit_types:routing_key() | to | subject, %% queue_name_bin is only set if the link target address refers to a queue. queue_name_bin :: undefined | rabbit_misc:resource_name(), + dynamic :: boolean(), max_message_size :: pos_integer(), delivery_count :: sequence_no(), credit :: rabbit_queue_type:credit(), @@ -206,6 +209,7 @@ %% or a topic filter, an outgoing link will always consume from a queue. queue_name :: rabbit_amqqueue:name(), queue_type :: rabbit_queue_type:queue_type(), + dynamic :: boolean(), send_settled :: boolean(), max_message_size :: unlimited | pos_integer(), @@ -260,6 +264,7 @@ -record(cfg, { outgoing_max_frame_size :: unlimited | pos_integer(), + container_id :: binary(), reader_pid :: rabbit_types:connection(), writer_pid :: pid(), user :: rabbit_types:user(), @@ -382,15 +387,17 @@ -type state() :: #state{}. -start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame) -> - Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost, ConnName, BeginFrame}, +start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, + User, Vhost, ContainerId, ConnName, BeginFrame) -> + Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, + User, Vhost, ContainerId, ConnName, BeginFrame}, Opts = [{hibernate_after, ?HIBERNATE_AFTER}], gen_server:start_link(?MODULE, Args, Opts). process_frame(Pid, FrameBody) -> gen_server:cast(Pid, {frame_body, FrameBody}). -init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, +init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId, ConnName, #'v1_0.begin'{ %% "If a session is locally initiated, the remote-channel MUST NOT be set." [2.7.2] remote_channel = undefined, @@ -401,6 +408,7 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, process_flag(trap_exit, true), rabbit_process_flag:adjust_for_message_handling_proc(), logger:update_process_metadata(#{channel_number => ChannelNum, + amqp_container => ContainerId, connection => ConnName, vhost => Vhost, user => User#user.username}), @@ -453,7 +461,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, remote_incoming_window = RemoteIncomingWindow, remote_outgoing_window = RemoteOutgoingWindow, outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID, - cfg = #cfg{reader_pid = ReaderPid, + cfg = #cfg{container_id = ContainerId, + reader_pid = ReaderPid, writer_pid = WriterPid, outgoing_max_frame_size = MaxFrameSize, user = User, @@ -470,14 +479,17 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName, terminate(_Reason, #state{incoming_links = IncomingLinks, outgoing_links = OutgoingLinks, - queue_states = QStates}) -> + queue_states = QStates, + cfg = Cfg}) -> maps:foreach( - fun (_, _) -> - rabbit_global_counters:publisher_deleted(?PROTOCOL) + fun (_, Link) -> + rabbit_global_counters:publisher_deleted(?PROTOCOL), + maybe_delete_dynamic_queue(Link, Cfg) end, IncomingLinks), maps:foreach( - fun (_, _) -> - rabbit_global_counters:consumer_deleted(?PROTOCOL) + fun (_, Link) -> + rabbit_global_counters:consumer_deleted(?PROTOCOL), + maybe_delete_dynamic_queue(Link, Cfg) end, OutgoingLinks), ok = rabbit_queue_type:close(QStates). @@ -1094,39 +1106,52 @@ handle_frame(#'v1_0.attach'{handle = ?UINT(Handle)} = Attach, end; handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, - State0 = #state{incoming_links = IncomingLinks, + State0 = #state{incoming_links = IncomingLinks0, outgoing_links = OutgoingLinks0, outgoing_unsettled_map = Unsettled0, outgoing_pending = Pending0, queue_states = QStates0, - cfg = #cfg{user = #user{username = Username}}}) -> + cfg = Cfg = #cfg{user = #user{username = Username}}}) -> {OutgoingLinks, Unsettled, Pending, QStates} = case maps:take(HandleInt, OutgoingLinks0) of - {#outgoing_link{queue_name = QName}, OutgoingLinks1} -> + {#outgoing_link{queue_name = QName, + dynamic = Dynamic}, OutgoingLinks1} -> Ctag = handle_to_ctag(HandleInt), {Unsettled1, Pending1} = remove_outgoing_link(Ctag, Unsettled0, Pending0), - case rabbit_amqqueue:lookup(QName) of - {ok, Q} -> - Spec = #{consumer_tag => Ctag, - reason => remove, - user => Username}, - case rabbit_queue_type:cancel(Q, Spec, QStates0) of - {ok, QStates1} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates1}; - {error, Reason} -> - protocol_error( - ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, - "Failed to remove consumer from ~s: ~tp", - [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) - end; - {error, not_found} -> - {OutgoingLinks1, Unsettled1, Pending1, QStates0} + case Dynamic of + true -> + delete_dynamic_queue(QName, Cfg), + {OutgoingLinks1, Unsettled1, Pending1, QStates0}; + false -> + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + Spec = #{consumer_tag => Ctag, + reason => remove, + user => Username}, + case rabbit_queue_type:cancel(Q, Spec, QStates0) of + {ok, QStates1} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates1}; + {error, Reason} -> + protocol_error( + ?V_1_0_AMQP_ERROR_INTERNAL_ERROR, + "Failed to remove consumer from ~s: ~tp", + [rabbit_misc:rs(amqqueue:get_name(Q)), Reason]) + end; + {error, not_found} -> + {OutgoingLinks1, Unsettled1, Pending1, QStates0} + end end; error -> {OutgoingLinks0, Unsettled0, Pending0, QStates0} end, - - State1 = State0#state{incoming_links = maps:remove(HandleInt, IncomingLinks), + IncomingLinks = case maps:take(HandleInt, IncomingLinks0) of + {IncomingLink, IncomingLinks1} -> + maybe_delete_dynamic_queue(IncomingLink, Cfg), + IncomingLinks1; + error -> + IncomingLinks0 + end, + State1 = State0#state{incoming_links = IncomingLinks, outgoing_links = OutgoingLinks, outgoing_unsettled_map = Unsettled, outgoing_pending = Pending, @@ -1271,29 +1296,33 @@ handle_attach(#'v1_0.attach'{ reply_frames([Reply], State); handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, - name = LinkName = {utf8, LinkName0}, + name = LinkName = {utf8, LinkNameBin}, handle = Handle = ?UINT(HandleInt), source = Source, snd_settle_mode = MaybeSndSettleMode, - target = Target = #'v1_0.target'{address = TargetAddress}, + target = Target0, initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt) }, State0 = #state{incoming_links = IncomingLinks0, permission_cache = PermCache0, - cfg = #cfg{max_link_credit = MaxLinkCredit, + cfg = #cfg{container_id = ContainerId, + reader_pid = ReaderPid, + max_link_credit = MaxLinkCredit, vhost = Vhost, user = User}}) -> - case ensure_target(Target, Vhost, User, PermCache0) of - {ok, Exchange, RoutingKey, QNameBin, PermCache} -> + case ensure_target(Target0, LinkNameBin, Vhost, User, + ContainerId, ReaderPid, PermCache0) of + {ok, Exchange, RoutingKey, QNameBin, Target, PermCache} -> SndSettleMode = snd_settle_mode(MaybeSndSettleMode), MaxMessageSize = persistent_term:get(max_message_size), IncomingLink = #incoming_link{ - name = LinkName0, + name = LinkNameBin, snd_settle_mode = SndSettleMode, - target_address = address(TargetAddress), + target_address = address(Target#'v1_0.target'.address), exchange = Exchange, routing_key = RoutingKey, queue_name_bin = QNameBin, + dynamic = default(Target#'v1_0.target'.dynamic, false), max_message_size = MaxMessageSize, delivery_count = DeliveryCountInt, credit = MaxLinkCredit}, @@ -1327,10 +1356,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER, end; handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, - name = LinkName = {utf8, LinkName0}, + name = LinkName = {utf8, LinkNameBin}, handle = Handle = ?UINT(HandleInt), - source = Source = #'v1_0.source'{address = SourceAddress, - filter = DesiredFilter}, + source = Source0 = #'v1_0.source'{filter = DesiredFilter}, snd_settle_mode = SndSettleMode, rcv_settle_mode = RcvSettleMode, max_message_size = MaybeMaxMessageSize, @@ -1341,6 +1369,7 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, topic_permission_cache = TopicPermCache0, cfg = #cfg{vhost = Vhost, user = User = #user{username = Username}, + container_id = ContainerId, reader_pid = ReaderPid}}) -> {SndSettled, EffectiveSndSettleMode} = case SndSettleMode of @@ -1352,10 +1381,11 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% client only for durable messages. {false, ?V_1_0_SENDER_SETTLE_MODE_UNSETTLED} end, - case ensure_source(Source, Vhost, User, PermCache0, TopicPermCache0) of + case ensure_source(Source0, LinkNameBin, Vhost, User, ContainerId, + ReaderPid, PermCache0, TopicPermCache0) of {error, Reason} -> protocol_error(?V_1_0_AMQP_ERROR_INVALID_FIELD, "Attach rejected: ~tp", [Reason]); - {ok, QName = #resource{name = QNameBin}, PermCache1, TopicPermCache} -> + {ok, QName = #resource{name = QNameBin}, Source, PermCache1, TopicPermCache} -> PermCache = check_resource_access(QName, read, User, PermCache1), case rabbit_amqqueue:with( QName, @@ -1441,12 +1471,14 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER, %% Echo back that we will respect the client's requested max-message-size. max_message_size = MaybeMaxMessageSize, offered_capabilities = OfferedCaps}, + {utf8, SourceAddress} = Source#'v1_0.source'.address, MaxMessageSize = max_message_size(MaybeMaxMessageSize), Link = #outgoing_link{ - name = LinkName0, - source_address = address(SourceAddress), + name = LinkNameBin, + source_address = SourceAddress, queue_name = queue_resource(Vhost, QNameBin), queue_type = QType, + dynamic = default(Source#'v1_0.source'.dynamic, false), send_settled = SndSettled, max_message_size = MaxMessageSize, credit_api_version = CreditApiVsn, @@ -2616,17 +2648,53 @@ maybe_grant_mgmt_link_credit(Credit, _, _) -> {Credit, []}. -spec ensure_source(#'v1_0.source'{}, + binary(), rabbit_types:vhost(), rabbit_types:user(), + binary(), + rabbit_types:connection(), permission_cache(), topic_permission_cache()) -> - {ok, rabbit_amqqueue:name(), permission_cache(), topic_permission_cache()} | + {ok, + rabbit_amqqueue:name(), + #'v1_0.source'{}, + permission_cache(), + topic_permission_cache()} | {error, term()}. -ensure_source(#'v1_0.source'{dynamic = true}, _, _, _, _) -> - exit_not_implemented("Dynamic sources not supported"); -ensure_source(#'v1_0.source'{address = Address, - durable = Durable}, - Vhost, User, PermCache, TopicPermCache) -> +ensure_source(#'v1_0.source'{ + address = undefined, + dynamic = true, + %% We will reply with the actual node properties. + dynamic_node_properties = _IgnoreDesiredProperties, + capabilities = {array, symbol, Caps} + } = Source0, + LinkName, Vhost, User, ContainerId, + ConnPid, PermCache0, TopicPermCache) -> + case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of + true -> + {QNameBin, Address, Props, PermCache} = + declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), + Source = Source0#'v1_0.source'{ + address = {utf8, Address}, + %% While Khepri stores queue records durably, the terminus + %% - i.e. the existence of this receiver - is not stored durably. + durable = ?V_1_0_TERMINUS_DURABILITY_NONE, + expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, + timeout = {uint, 0}, + dynamic_node_properties = Props, + distribution_mode = ?V_1_0_STD_DIST_MODE_MOVE, + capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) + }, + QName = queue_resource(Vhost, QNameBin), + {ok, QName, Source, PermCache, TopicPermCache}; + false -> + exit_not_implemented("Dynamic source not supported: ~p", [Source0]) + end; +ensure_source(Source = #'v1_0.source'{dynamic = true}, _, _, _, _, _, _, _) -> + exit_not_implemented("Dynamic source not supported: ~p", [Source]); +ensure_source(Source = #'v1_0.source'{address = Address, + durable = Durable}, + _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache, TopicPermCache) -> case Address of {utf8, <<"/queues/", QNameBinQuoted/binary>>} -> %% The only possible v2 source address format is: @@ -2635,15 +2703,20 @@ ensure_source(#'v1_0.source'{address = Address, QNameBin -> QName = queue_resource(Vhost, QNameBin), ok = exit_if_absent(QName), - {ok, QName, PermCache, TopicPermCache} + {ok, QName, Source, PermCache, TopicPermCache} catch error:_ -> {error, {bad_address, Address}} end; {utf8, SourceAddr} -> case address_v1_permitted() of true -> - ensure_source_v1(SourceAddr, Vhost, User, Durable, - PermCache, TopicPermCache); + case ensure_source_v1(SourceAddr, Vhost, User, Durable, + PermCache, TopicPermCache) of + {ok, QName, PermCache1, TopicPermCache1} -> + {ok, QName, Source, PermCache1, TopicPermCache1}; + Err -> + Err + end; false -> {error, {amqp_address_v1_not_permitted, Address}} end; @@ -2689,42 +2762,71 @@ ensure_source_v1(Address, Err end. -address(undefined) -> - null; -address({utf8, String}) -> - String. - -spec ensure_target(#'v1_0.target'{}, + binary(), rabbit_types:vhost(), rabbit_types:user(), + binary(), + rabbit_types:connection(), permission_cache()) -> {ok, rabbit_types:exchange() | rabbit_exchange:name() | to, rabbit_types:routing_key() | to | subject, rabbit_misc:resource_name() | undefined, + #'v1_0.target'{}, permission_cache()} | {error, term()}. -ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> - exit_not_implemented("Dynamic targets not supported"); -ensure_target(#'v1_0.target'{address = Address, - durable = Durable}, - Vhost, User, PermCache) -> +ensure_target(#'v1_0.target'{ + address = undefined, + dynamic = true, + %% We will reply with the actual node properties. + dynamic_node_properties = _IgnoreDesiredProperties, + capabilities = {array, symbol, Caps} + } = Target0, + LinkName, Vhost, User, ContainerId, ConnPid, PermCache0) -> + case lists:member({symbol, ?CAP_TEMPORARY_QUEUE}, Caps) of + true -> + {QNameBin, Address, Props, PermCache1} = + declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0), + {ok, Exchange, PermCache} = check_exchange(?DEFAULT_EXCHANGE_NAME, User, Vhost, PermCache1), + Target = #'v1_0.target'{ + address = {utf8, Address}, + %% While Khepri stores queue records durably, + %% the terminus - i.e. the existence of this producer - is not stored durably. + durable = ?V_1_0_TERMINUS_DURABILITY_NONE, + expiry_policy = ?V_1_0_TERMINUS_EXPIRY_POLICY_LINK_DETACH, + timeout = {uint, 0}, + dynamic = true, + dynamic_node_properties = Props, + capabilities = rabbit_amqp_util:capabilities([?CAP_TEMPORARY_QUEUE]) + }, + {ok, Exchange, QNameBin, QNameBin, Target, PermCache}; + false -> + exit_not_implemented("Dynamic target not supported: ~p", [Target0]) + end; +ensure_target(Target = #'v1_0.target'{dynamic = true}, _, _, _, _, _, _) -> + exit_not_implemented("Dynamic target not supported: ~p", [Target]); +ensure_target(Target = #'v1_0.target'{address = Address, + durable = Durable}, + _LinkName, Vhost, User, _ContainerId, _ConnPid, PermCache0) -> case target_address_version(Address) of 2 -> case ensure_target_v2(Address, Vhost) of {ok, to, RKey, QNameBin} -> - {ok, to, RKey, QNameBin, PermCache}; + {ok, to, RKey, QNameBin, Target, PermCache0}; {ok, XNameBin, RKey, QNameBin} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache0), + {ok, Exchange, RKey, QNameBin, Target, PermCache}; {error, _} = Err -> Err end; 1 -> case address_v1_permitted() of true -> - case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of + case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of {ok, XNameBin, RKey, QNameBin, PermCache1} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1); + {ok, Exchange, PermCache} = check_exchange(XNameBin, User, Vhost, PermCache1), + {ok, Exchange, RKey, QNameBin, Target, PermCache}; {error, _} = Err -> Err end; @@ -2733,7 +2835,7 @@ ensure_target(#'v1_0.target'{address = Address, end end. -check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> +check_exchange(XNameBin, User, Vhost, PermCache0) -> XName = exchange_resource(Vhost, XNameBin), PermCache = check_resource_access(XName, write, User, PermCache0), case rabbit_exchange:lookup(XName) of @@ -2747,7 +2849,7 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> <<"amq.", _/binary>> -> X; _ -> XName end, - {ok, Exchange, RKey, QNameBin, PermCache}; + {ok, Exchange, PermCache}; {error, not_found} -> exit_not_found(XName) end. @@ -3035,7 +3137,10 @@ credit_reply_timeout(QType, QName) -> protocol_error(?V_1_0_AMQP_ERROR_INTERNAL_ERROR, Fmt, Args). default(undefined, Default) -> Default; -default(Thing, _Default) -> Thing. +default(Thing, _Default) -> Thing. + +address(undefined) -> null; +address({utf8, String}) -> String. snd_settle_mode({ubyte, Val}) -> case Val of @@ -3249,20 +3354,20 @@ ensure_terminus(Type, {exchange, {XNameList, _RoutingKey}}, Vhost, User, Durabil ok = exit_if_absent(exchange, Vhost, XNameList), case Type of target -> {undefined, PermCache}; - source -> declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache) + source -> declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache) end; ensure_terminus(target, {topic, _bindingkey}, _, _, _, PermCache) -> %% exchange amq.topic exists {undefined, PermCache}; ensure_terminus(source, {topic, _BindingKey}, Vhost, User, Durability, PermCache) -> %% exchange amq.topic exists - declare_queue(generate_queue_name(), Vhost, User, Durability, PermCache); + declare_queue_v1(generate_queue_name_v1(), Vhost, User, Durability, PermCache); ensure_terminus(target, {queue, undefined}, _, _, _, PermCache) -> %% Target "/queue" means publish to default exchange with message subject as routing key. %% Default exchange exists. {undefined, PermCache}; ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability, PermCache) -> - declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); + declare_queue_v1(unicode:characters_to_binary(QNameList), Vhost, User, Durability, PermCache); ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _, PermCache) -> %% Target "/amq/queue/" is handled specially due to AMQP legacy: %% "Queue names starting with "amq." are reserved for pre-declared and @@ -3287,22 +3392,39 @@ exit_if_absent(ResourceName = #resource{kind = Kind}) -> false -> exit_not_found(ResourceName) end. -generate_queue_name() -> +generate_queue_name_v1() -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"). +%% "The generated name of the address SHOULD include the link name and the +%% container-id of the remote container to allow for ease of identification." [3.5.4] +%% Let's include container-id and link name if they are not very long +%% because the generated address might be sent in every message. +generate_queue_name_dynamic(ContainerId, LinkName) + when byte_size(ContainerId) + byte_size(LinkName) < 150 -> + Prefix = <<"amq.dyn-", ContainerId/binary, "-", LinkName/binary>>, + rabbit_guid:binary(rabbit_guid:gen_secure(), Prefix); +generate_queue_name_dynamic(_, _) -> + rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.dyn.gen"). + +declare_queue_v1(QNameBin, Vhost, User, TerminusDurability, PermCache0) -> + Durable = queue_is_durable(TerminusDurability), + {ok, PermCache} = declare_queue(QNameBin, Vhost, User, Durable, none, PermCache0), + {QNameBin, PermCache}. + declare_queue(QNameBin, Vhost, User = #user{username = Username}, - TerminusDurability, + Durable, + QOwner, PermCache0) -> QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), rabbit_core_metrics:queue_declared(QName), Q0 = amqqueue:new(QName, _Pid = none, - queue_is_durable(TerminusDurability), + Durable, _AutoDelete = false, - _QOwner = none, + QOwner, _QArgs = [], Vhost, #{user => Username}, @@ -3322,7 +3444,40 @@ declare_queue(QNameBin, "Failed to declare ~s: ~p", [rabbit_misc:rs(QName), Other]) end, - {QNameBin, PermCache}. + {ok, PermCache}. + +declare_dynamic_queue(ContainerId, LinkName, Vhost, User, ConnPid, PermCache0) -> + QNameBin = generate_queue_name_dynamic(ContainerId, LinkName), + {ok, PermCache} = declare_queue(QNameBin, Vhost, User, true, ConnPid, PermCache0), + QNameBinQuoted = uri_string:quote(QNameBin), + Address = <<"/queues/", QNameBinQuoted/binary>>, + Props = {map, [{{symbol, <<"lifetime-policy">>}, + {described, ?V_1_0_SYMBOL_DELETE_ON_CLOSE, {list, []}}}, + {{symbol, <<"supported-dist-modes">>}, + {array, symbol, [?V_1_0_STD_DIST_MODE_MOVE]}}]}, + {QNameBin, Address, Props, PermCache}. + +maybe_delete_dynamic_queue(#incoming_link{dynamic = true, + queue_name_bin = QNameBin}, + Cfg = #cfg{vhost = Vhost}) -> + QName = queue_resource(Vhost, QNameBin), + delete_dynamic_queue(QName, Cfg); +maybe_delete_dynamic_queue(#outgoing_link{dynamic = true, + queue_name = QName}, + Cfg) -> + delete_dynamic_queue(QName, Cfg); +maybe_delete_dynamic_queue(_, _) -> + ok. + +delete_dynamic_queue(QName, #cfg{user = #user{username = Username}}) -> + %% No real need to check for 'configure' access again since this queue is owned by + %% this connection and the user had 'configure' access when the queue got declared. + _ = rabbit_amqqueue:with( + QName, + fun(Q) -> + rabbit_queue_type:delete(Q, false, false, Username) + end), + ok. outcomes(#'v1_0.source'{outcomes = undefined}) -> {array, symbol, ?OUTCOMES}; diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 581351c462ed..5889cbdd5003 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -55,9 +55,12 @@ groups() -> [ %% authz attach_source_queue, + attach_source_queue_dynamic, attach_target_exchange, attach_target_topic_exchange, attach_target_queue, + attach_target_queue_dynamic_exchange_write, + attach_target_queue_dynamic_queue_configure, target_per_message_exchange, target_per_message_internal_exchange, target_per_message_topic, @@ -437,6 +440,39 @@ attach_source_queue(Config) -> end, ok = close_connection_sync(Conn). +attach_source_queue_dynamic(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing configure permission to queue + ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>), + + Source = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + AttachArgs = #{name => <<"my link">>, + role => {receiver, Source, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + receive {amqp10_event, + {session, Session, + {ended, Error}}} -> + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description}} = Error, + ?assertEqual( + match, + re:run(Description, + <<"^configure access to queue 'amq\.dyn-.*' in vhost " + "'test vhost' refused for user 'test user'$">>, + [{capture, none}])) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + attach_target_exchange(Config) -> XName = <<"amq.fanout">>, Address1 = rabbitmq_amqp_address:exchange(XName), @@ -485,6 +521,61 @@ attach_target_queue(Config) -> end, ok = amqp10_client:close_connection(Conn). +attach_target_queue_dynamic_exchange_write(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing write permission to default exchange + ok = set_permissions(Config, <<".*">>, <<>>, <<".*">>), + + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + AttachArgs = #{name => <<"my link">>, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + ExpectedErr = error_unauthorized( + <<"write access to exchange 'amq.default' ", + "in vhost 'test vhost' refused for user 'test user'">>), + receive {amqp10_event, {session, Session, {ended, ExpectedErr}}} -> ok + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + +attach_target_queue_dynamic_queue_configure(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% missing configure permission to queue + ok = set_permissions(Config, <<>>, <<".*">>, <<".*">>), + + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + AttachArgs = #{name => <<"my link">>, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, _Recv} = amqp10_client:attach_link(Session, AttachArgs), + receive {amqp10_event, + {session, Session, + {ended, Error}}} -> + #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, + description = {utf8, Description}} = Error, + ?assertEqual( + match, + re:run(Description, + <<"^configure access to queue 'amq\.dyn-.*' in vhost " + "'test vhost' refused for user 'test user'$">>, + [{capture, none}])) + after ?TIMEOUT -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + target_per_message_exchange(Config) -> TargetAddress = null, To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 17d997a78a55..3c3f47574d57 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -130,6 +130,10 @@ groups() -> handshake_timeout, credential_expires, attach_to_exclusive_queue, + dynamic_target_short_link_name, + dynamic_target_long_link_name, + dynamic_source_rpc, + dynamic_terminus_delete, modified_classic_queue, modified_quorum_queue, modified_dead_letter_headers_exchange, @@ -4762,6 +4766,230 @@ attach_to_exclusive_queue(Config) -> #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), ok = rabbit_ct_client_helpers:close_channel(Ch). +dynamic_target_short_link_name(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"my-container">>, + notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% "The address of the target MUST NOT be set" [3.5.4] + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + ShortLinkName = <<"my/sender">>, + AttachArgs = #{name => ShortLinkName, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} -> + #'v1_0.attach'{ + target = #'v1_0.target'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to contain both container ID and link name since they are short. + ?assertMatch(<<"/queues/amq.dyn-my-container-my%2Fsender-", _GUID/binary>>, Addr), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = wait_for_accepted(<<"t1">>), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + + %% The exclusive queue should be deleted when we close our connection. + ?assertMatch([_ExclusiveQueue], rpc(Config, rabbit_amqqueue, list, [])), + ok = close_connection_sync(Connection), + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, []))), + ok. + +dynamic_target_long_link_name(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"my-container">>, + notify_with_performative => true}, + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session} = amqp10_client:begin_session_sync(Connection), + + %% "The address of the target MUST NOT be set" [3.5.4] + Target = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>]}, + LongLinkName = binary:copy(<<"z">>, 200), + AttachArgs = #{name => LongLinkName, + role => {sender, Target}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + {ok, Sender} = amqp10_client:attach_link(Session, AttachArgs), + Addr = receive {amqp10_event, {link, Sender, {attached, Attach}}} -> + #'v1_0.attach'{ + target = #'v1_0.target'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to not contain the long link name. + ?assertMatch(<<"/queues/amq.dyn.gen-", _GUID/binary>>, Addr), + ok = wait_for_credit(Sender), + ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"t1">>, <<"m1">>)), + ok = wait_for_accepted(<<"t1">>), + + {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"my-receiver">>, Addr, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<"m1">>, amqp10_msg:body_bin(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + flush(accepted), + + %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be + %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10] + ok = amqp10_client:detach_link(Sender), + receive {amqp10_event, {link, Receiver, {detached, Detach}}} -> + ?assertMatch( + #'v1_0.detach'{error = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED}}, + Detach) + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + ok = close_connection_sync(Connection). + +%% Test the following RPC workflow: +%% RPC client -> queue -> RPC server +%% RPC server -> dynamic queue -> RPC client +dynamic_source_rpc(Config) -> + OpnConf0 = connection_config(Config), + OpnConf = OpnConf0#{container_id := <<"rpc-client">>, + notify_with_performative => true}, + {ok, ConnectionClient} = amqp10_client:open_connection(OpnConf), + {ok, SessionClient} = amqp10_client:begin_session_sync(ConnectionClient), + + %% "The address of the source MUST NOT be set" [3.5.3] + Source = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + AttachArgs = #{name => <<"rpc-client-receiver🥕"/utf8>>, + role => {receiver, Source, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + {ok, ReceiverClient} = amqp10_client:attach_link(SessionClient, AttachArgs), + RespAddr = receive {amqp10_event, {link, ReceiverClient, {attached, Attach}}} -> + #'v1_0.attach'{ + source = #'v1_0.source'{ + address = {utf8, Address}, + dynamic = true}} = Attach, + Address + after 30000 -> ct:fail({missing_event, ?LINE}) + end, + %% The client doesn't really care what the address looks like. + %% However let's do whitebox testing here and check the address format. + %% We expect the address to contain both container ID and link name since they are short. + ?assertMatch(<<"/queues/amq.dyn-rpc-client-rpc-client-receiver", _CarrotAndGUID/binary>>, + RespAddr), + + %% Let's use a separate connection for the RPC server. + {_, SessionServer, LinkPair} = RpcServer = init(Config), + ReqQName = atom_to_binary(?FUNCTION_NAME), + ReqAddr = rabbitmq_amqp_address:queue(ReqQName), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, ReqQName, #{}), + {ok, ReceiverServer} = amqp10_client:attach_receiver_link(SessionServer, <<"rpc-server-receiver">>, ReqAddr, unsettled), + {ok, SenderServer} = amqp10_client:attach_sender_link(SessionServer, <<"rpc-server-sender">>, null), + ok = wait_for_credit(SenderServer), + + {ok, SenderClient} = amqp10_client:attach_sender_link(SessionClient, <<"rpc-client-sender">>, ReqAddr), + wait_for_credit(SenderClient), + flush(attached), + + ok = amqp10_client:send_msg( + SenderClient, + amqp10_msg:set_properties( + #{reply_to => RespAddr}, + amqp10_msg:new(<<"t1">>, <<"hello">>))), + ok = wait_for_accepted(<<"t1">>), + + {ok, ReqMsg} = amqp10_client:get_msg(ReceiverServer), + ReqBody = amqp10_msg:body_bin(ReqMsg), + RespBody = string:uppercase(ReqBody), + #{reply_to := ReplyTo} = amqp10_msg:properties(ReqMsg), + ok = amqp10_client:send_msg( + SenderServer, + amqp10_msg:set_properties( + #{to => ReplyTo}, + amqp10_msg:new(<<"t2">>, RespBody))), + ok = wait_for_accepted(<<"t2">>), + ok = amqp10_client:accept_msg(ReceiverServer, ReqMsg), + + {ok, RespMsg} = amqp10_client:get_msg(ReceiverClient), + ?assertEqual(<<"HELLO">>, amqp10_msg:body_bin(RespMsg)), + ok = amqp10_client:accept_msg(ReceiverClient, RespMsg), + + ok = detach_link_sync(ReceiverServer), + ok = detach_link_sync(SenderClient), + {ok, #{message_count := 0}} = rabbitmq_amqp_client:delete_queue(LinkPair, ReqQName), + ok = detach_link_sync(SenderServer), + ok = close(RpcServer), + ok = close_connection_sync(ConnectionClient). + +dynamic_terminus_delete(Config) -> + OpnConf = connection_config(Config), + {ok, Connection} = amqp10_client:open_connection(OpnConf), + {ok, Session1} = amqp10_client:begin_session_sync(Connection), + {ok, Session2} = amqp10_client:begin_session_sync(Connection), + + Terminus = #{address => undefined, + dynamic => true, + capabilities => [<<"temporary-queue">>], + durable => none}, + RcvAttachArgs = #{role => {receiver, Terminus, self()}, + snd_settle_mode => unsettled, + rcv_settle_mode => first, + filter => #{}}, + SndAttachArgs = #{role => {sender, Terminus}, + snd_settle_mode => mixed, + rcv_settle_mode => first}, + RcvAttachArgs1 = RcvAttachArgs#{name => <<"receiver 1">>}, + RcvAttachArgs2 = RcvAttachArgs#{name => <<"receiver 2">>}, + RcvAttachArgs3 = RcvAttachArgs#{name => <<"receiver 3">>}, + SndAttachArgs1 = SndAttachArgs#{name => <<"sender 1">>}, + SndAttachArgs2 = SndAttachArgs#{name => <<"sender 2">>}, + SndAttachArgs3 = SndAttachArgs#{name => <<"sender 3">>}, + {ok, _R1} = amqp10_client:attach_link(Session1, RcvAttachArgs1), + {ok, _R2} = amqp10_client:attach_link(Session2, RcvAttachArgs2), + {ok, R3} = amqp10_client:attach_link(Session2, RcvAttachArgs3), + {ok, _S1} = amqp10_client:attach_link(Session1, SndAttachArgs1), + {ok, _S2} = amqp10_client:attach_link(Session2, SndAttachArgs2), + {ok, S3} = amqp10_client:attach_link(Session2, SndAttachArgs3), + [receive {amqp10_event, {link, _LinkRef, attached}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end + || _ <- lists:seq(1, 6)], + + %% We should now have 6 exclusive queues. + ?assertEqual(6, rpc(Config, rabbit_amqqueue, count, [])), + + %% Since RabbitMQ uses the delete-on-close lifetime policy, the exclusive queue should be + %% "deleted at the point that the link which caused its creation ceases to exist" [3.5.10] + ok = detach_link_sync(R3), + ok = detach_link_sync(S3), + ?assertEqual(4, rpc(Config, rabbit_amqqueue, count, [])), + + %% When a session is ended, the sessions's links cease to exist. + ok = end_session_sync(Session2), + eventually(?_assertEqual(2, rpc(Config, rabbit_amqqueue, count, []))), + + %% When a connection is closed, the connection's links cease to exist. + ok = close_connection_sync(Connection), + eventually(?_assertEqual(0, rpc(Config, rabbit_amqqueue, count, []))), + ok. + priority_classic_queue(Config) -> QArgs = #{<<"x-queue-type">> => {utf8, <<"classic">>}, <<"x-max-priority">> => {ulong, 10}}, diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index a97bd5d68b0e..baad72b01465 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -14,6 +14,10 @@ -compile(nowarn_export_all). -compile(export_all). +-import(rabbit_ct_broker_helpers, + [rpc/4]). +-import(rabbit_ct_helpers, + [eventually/3]). -import(amqp_utils, [init/1, close/1, @@ -30,8 +34,15 @@ all() -> groups() -> [{cluster_size_1, [shuffle], [ + %% CT test case per Java class + jms_connection, + jms_temporary_queue, + + %% CT test case per test in Java class JmsTest message_types_jms_to_jms, - message_types_jms_to_amqp + message_types_jms_to_amqp, + temporary_queue_rpc, + temporary_queue_delete ] }]. @@ -54,7 +65,9 @@ end_per_suite(Config) -> init_per_group(cluster_size_1, Config) -> Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"), - Config1 = rabbit_ct_helpers:set_config(Config, {rmq_nodename_suffix, Suffix}), + Config1 = rabbit_ct_helpers:set_config( + Config, + {rmq_nodename_suffix, Suffix}), Config2 = rabbit_ct_helpers:merge_app_env( Config1, {rabbit, @@ -82,6 +95,9 @@ init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> + %% Assert that every testcase cleaned up. + eventually(?_assertEqual([], rpc(Config, rabbit_amqqueue, list, [])), 1000, 5), + eventually(?_assertEqual([], rpc(Config, rabbit_amqp_session, list_local, [])), 1000, 5), rabbit_ct_helpers:testcase_finished(Config, Testcase). build_maven_test_project(Config) -> @@ -98,11 +114,17 @@ build_maven_test_project(Config) -> %% Testcases. %% ------------------------------------------------------------------- +jms_connection(Config) -> + ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsConnectionTest">>]}], Config). + +jms_temporary_queue(Config) -> + ok = run(?FUNCTION_NAME, [{"-Dtest=~s", [<<"JmsTemporaryQueueTest">>]}], Config). + %% Send different message types from JMS client to JMS client. message_types_jms_to_jms(Config) -> TestName = QName = atom_to_binary(?FUNCTION_NAME), ok = declare_queue(QName, <<"quorum">>, Config), - ok = run(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), ok = delete_queue(QName, Config). %% Send different message types from JMS client to Erlang AMQP 1.0 client. @@ -112,7 +134,7 @@ message_types_jms_to_amqp(Config) -> Address = rabbitmq_amqp_address:queue(QName), %% The JMS client sends messaegs. - ok = run(TestName, [{"-Dqueue=~ts", [Address]}], Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config), %% The Erlang AMQP 1.0 client receives messages. OpnConf = connection_config(Config), @@ -120,6 +142,7 @@ message_types_jms_to_amqp(Config) -> {ok, Session} = amqp10_client:begin_session_sync(Connection), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), {ok, Msg1} = amqp10_client:get_msg(Receiver), + ?assertEqual( #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}}, amqp10_msg:body(Msg1)), @@ -149,16 +172,31 @@ message_types_jms_to_amqp(Config) -> ok = close_connection_sync(Connection), ok = delete_queue(QName, Config). +temporary_queue_rpc(Config) -> + TestName = QName = atom_to_binary(?FUNCTION_NAME), + ok = declare_queue(QName, <<"classic">>, Config), + ok = run_jms_test(TestName, [{"-Dqueue=~ts", [rabbitmq_amqp_address:queue(QName)]}], Config), + ok = delete_queue(QName, Config). + +temporary_queue_delete(Config) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ok = run_jms_test(TestName, [], Config). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +run_jms_test(TestName, JavaProps, Config) -> + run(TestName, [{"-Dtest=JmsTest#~ts", [TestName]} | JavaProps], Config). + run(TestName, JavaProps, Config) -> TestProjectDir = ?config(data_dir, Config), + Cmd = [filename:join([TestProjectDir, "mvnw"]), "test", - {"-Dtest=JmsTest#~ts", [TestName]}, - {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]} + {"-Drmq_broker_uri=~ts", [rabbit_ct_broker_helpers:node_uri(Config, 0)]}, + {"-Dnodename=~ts", [rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)]}, + {"-Drabbitmqctl.bin=~ts", [rabbit_ct_helpers:get_config(Config, rabbitmqctl_cmd)]} ] ++ JavaProps, case rabbit_ct_helpers:exec(Cmd, [{cd, TestProjectDir}]) of {ok, _Stdout_} -> diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java new file mode 100644 index 000000000000..2dc08413eae4 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/Cli.java @@ -0,0 +1,163 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.UnknownHostException; + +final class Cli { + + private Cli() {} + + static void startBroker() { + rabbitmqctl("start_app"); + } + + static void stopBroker() { + rabbitmqctl("stop_app"); + } + + private static ProcessState rabbitmqctl(String command) { + return rabbitmqctl(command, nodename()); + } + + private static ProcessState rabbitmqctl(String command, String nodename) { + return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command); + } + + private static String rabbitmqctlCommand() { + return System.getProperty("rabbitmqctl.bin"); + } + + public static String nodename() { + return System.getProperty("nodename", "rabbit@" + hostname()); + } + + public static String hostname() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + try { + return executeCommand("hostname").output(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + private static ProcessState executeCommand(String command) { + return executeCommand(command, false); + } + + private static ProcessState executeCommand(String command, boolean ignoreError) { + Process pr = executeCommandProcess(command); + InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream()); + InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream()); + + int ev = waitForExitValue(pr, inputState, errorState); + inputState.pump(); + errorState.pump(); + if (ev != 0 && !ignoreError) { + throw new RuntimeException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + inputState.buffer.toString() + + "\nstderr:\n" + + errorState.buffer.toString() + + "\n"); + } + return new ProcessState(inputState); + } + + private static int waitForExitValue( + Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) { + while (true) { + try { + inputState.pump(); + errorState.pump(); + pr.waitFor(); + break; + } catch (InterruptedException ignored) { + } + } + return pr.exitValue(); + } + + private static Process executeCommandProcess(String command) { + String[] finalCommand; + if (System.getProperty("os.name").toLowerCase().contains("windows")) { + finalCommand = new String[4]; + finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[1] = "/y"; + finalCommand[2] = "/c"; + finalCommand[3] = command; + } else { + finalCommand = new String[3]; + finalCommand[0] = "/bin/sh"; + finalCommand[1] = "-c"; + finalCommand[2] = command; + } + try { + return Runtime.getRuntime().exec(finalCommand); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static class ProcessState { + + private final InputStreamPumpState inputState; + + ProcessState(InputStreamPumpState inputState) { + this.inputState = inputState; + } + + String output() { + return inputState.buffer.toString(); + } + } + + private static class InputStreamPumpState { + + private final BufferedReader reader; + private final StringBuilder buffer; + + private InputStreamPumpState(InputStream in) { + this.reader = new BufferedReader(new InputStreamReader(in)); + this.buffer = new StringBuilder(); + } + + void pump() { + String line; + while (true) { + try { + if ((line = reader.readLine()) == null) break; + } catch (IOException e) { + throw new RuntimeException(e); + } + buffer.append(line).append("\n"); + } + } + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java new file mode 100644 index 000000000000..210f28c043c1 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java @@ -0,0 +1,199 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static com.rabbitmq.amqp.tests.jms.Cli.startBroker; +import static com.rabbitmq.amqp.tests.jms.Cli.stopBroker; +import static com.rabbitmq.amqp.tests.jms.TestUtils.*; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import jakarta.jms.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +/** + * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + */ +public class JmsConnectionTest { + + @Test + @Timeout(30) + public void testCreateConnection() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionAndStart() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + // Currently not supported by RabbitMQ. + @Disabled + public void testCreateWithDuplicateClientIdFails() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnection connection1 = (JmsConnection) factory.createConnection(); + connection1.setClientID("Test"); + assertNotNull(connection1); + connection1.start(); + JmsConnection connection2 = (JmsConnection) factory.createConnection(); + try { + connection2.setClientID("Test"); + fail("should have thrown a JMSException"); + } catch (InvalidClientIDException ex) { + // OK + } catch (Exception unexpected) { + fail("Wrong exception type thrown: " + unexpected); + } + + connection1.close(); + connection2.close(); + } + + @Test + public void testSetClientIdAfterStartedFails() { + assertThrows( + JMSException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection()) { + connection.setClientID("Test"); + connection.start(); + connection.setClientID("NewTest"); + } + }); + } + + @Test + @Timeout(30) + public void testCreateConnectionAsSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + factory.setUsername(adminUsername()); + factory.setPassword(adminPassword()); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionCallSystemAdmin() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) { + assertNotNull(connection); + connection.start(); + } + } + + @Test + @Timeout(30) + public void testCreateConnectionAsUnknwonUser() { + assertThrows( + JMSSecurityException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(TestUtils.brokerUri()); + factory.setUsername("unknown"); + factory.setPassword("unknown"); + try (Connection connection = factory.createConnection()) { + assertNotNull(connection); + connection.start(); + } + }); + } + + @Test + @Timeout(30) + public void testCreateConnectionCallUnknwonUser() { + assertThrows( + JMSSecurityException.class, + () -> { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + try (Connection connection = factory.createConnection("unknown", "unknown")) { + assertNotNull(connection); + connection.start(); + } + }); + } + + @Test + @Timeout(30) + public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exception { + Connection connection = new JmsConnectionFactory(brokerUri()).createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // TODO use a "regular" queue + TemporaryQueue queue = session.createTemporaryQueue(); + // String destinationName = name(info); + // Queue queue = session.createQueue("/queues/" + destinationName); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + producer.send(m); + + try { + stopBroker(); + try { + connection.close(); + } catch (Exception ex) { + fail("Should not have thrown an exception."); + } + } finally { + startBroker(); + } + } + + @Test + @Timeout(60) + public void testConnectionExceptionBrokerStop() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + try (Connection connection = new JmsConnectionFactory(brokerUri()).createConnection()) { + connection.setExceptionListener(exception -> latch.countDown()); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + try { + stopBroker(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + } finally { + startBroker(); + } + } + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java new file mode 100644 index 000000000000..3da83a90664e --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java @@ -0,0 +1,135 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri; +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.fail; + +import jakarta.jms.*; +import jakarta.jms.IllegalStateException; +import java.util.UUID; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + */ +public class JmsTemporaryQueueTest { + + Connection connection; + + @AfterEach + void tearDown() throws JMSException { + connection.close(); + } + + @Test + @Timeout(60) + public void testCreatePublishConsumeTemporaryQueue() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = session.createProducer(queue); + String body = UUID.randomUUID().toString(); + producer.send(session.createTextMessage(body)); + assertEquals(body, consumer.receive(60_000).getBody(String.class)); + } + + @Test + @Timeout(60) + public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + session.createConsumer(tempQueue); + + Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection(); + try { + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + session2.createConsumer(tempQueue); + fail("should not be able to consumer from temporary queue from another connection"); + } catch (InvalidDestinationException ide) { + // expected + } + } finally { + connection2.close(); + } + } + + @Test + @Timeout(60) + public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + + Connection connection2 = new JmsConnectionFactory(brokerUri()).createConnection(); + try { + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Message msg = session2.createMessage(); + MessageProducer producer = session2.createProducer(tempQueue); + + // Close the original connection + connection.close(); + + try { + producer.send(msg); + fail("should not be able to send to temporary queue from closed connection"); + } catch (jakarta.jms.IllegalStateException ide) { + // expected + } + } finally { + connection2.close(); + } + } + + @Test + @Timeout(60) + public void testCantDeleteTemporaryQueueWithConsumers() throws Exception { + connection = new JmsConnectionFactory(brokerUri()).createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(tempQueue); + + try { + tempQueue.delete(); + fail("should not be able to delete temporary queue with active consumers"); + } catch (IllegalStateException ide) { + // expected + } + + consumer.close(); + + // Now it should be allowed + tempQueue.delete(); + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java index f5c5bffba2b2..23b66512fa3a 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java @@ -1,7 +1,6 @@ package com.rabbitmq.amqp.tests.jms; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.*; import jakarta.jms.*; import java.util.*; @@ -104,8 +103,6 @@ public void message_types_jms_to_amqp() throws Exception { Session session = connection.createSession(); Destination queue = (Destination) context.lookup("myQueue"); MessageProducer producer = session.createProducer(queue); - MessageConsumer consumer = session.createConsumer(queue); - connection.start(); // TextMessage String msg1 = "msg1🥕"; @@ -128,5 +125,57 @@ public void message_types_jms_to_amqp() throws Exception { streamMessage.writeLong(-1L); producer.send(streamMessage); } + + } + + // Test that Request/reply pattern using a TemporaryQueue works. + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee + @Test + public void temporary_queue_rpc() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (JMSContext clientContext = factory.createContext()) { + Destination responseQueue = clientContext.createTemporaryQueue(); + JMSConsumer clientConsumer = clientContext.createConsumer(responseQueue); + + Destination requestQueue = (Destination) context.lookup("myQueue"); + TextMessage clientRequestMessage = clientContext.createTextMessage("hello"); + clientContext.createProducer(). + setJMSReplyTo(responseQueue). + send(requestQueue, clientRequestMessage); + + // Let's open a new connection to simulate the RPC server. + try (JMSContext serverContext = factory.createContext()) { + JMSConsumer serverConsumer = serverContext.createConsumer(requestQueue); + TextMessage serverRequestMessage = (TextMessage) serverConsumer.receive(5000); + + TextMessage serverResponseMessage = serverContext.createTextMessage( + serverRequestMessage.getText().toUpperCase()); + serverContext.createProducer(). + send(serverRequestMessage.getJMSReplyTo(), serverResponseMessage); + } + + TextMessage clientResponseMessage = (TextMessage) clientConsumer.receive(5000); + assertEquals("HELLO", clientResponseMessage.getText()); + } + } + + // Test that a temporary queue can be deleted. + @Test + public void temporary_queue_delete() throws Exception { + Context context = getContext(); + ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + + try (JMSContext clientContext = factory.createContext()) { + TemporaryQueue queue = clientContext.createTemporaryQueue(); + queue.delete(); + try { + clientContext.createProducer().send(queue, "hello"); + fail("should not be able to create producer for deleted temporary queue"); + } catch (IllegalStateRuntimeException expectedException) { + assertEquals("Temporary destination has been deleted", expectedException.getMessage()); + } + } } } diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java new file mode 100644 index 000000000000..d53a6bd26fd7 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -0,0 +1,66 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// + +package com.rabbitmq.amqp.tests.jms; + +import static java.lang.String.format; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Method; +import java.util.UUID; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +final class TestUtils { + + private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672"; + + private TestUtils() { } + + static String brokerUri() { + String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); + return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri; + } + + static String adminUsername() { + return "guest"; + } + + static String adminPassword() { + return "guest"; + } + + static String name(TestInfo info) { + return name(info.getTestClass().get(), info.getTestMethod().get()); + } + + + private static String name(Class testClass, Method testMethod) { + return name(testClass, testMethod.getName()); + } + + private static String name(Class testClass, String testMethod) { + String uuid = UUID.randomUUID().toString(); + return format( + "%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2)); + } + +} From 5c4ef25b57c83b059dba06e6a9027906f2e2ec03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 10 Feb 2025 11:45:58 +0100 Subject: [PATCH 3/5] Add helpers for JMS tests (cherry picked from commit fd350386a9b298866e9e336a86853d8eb3d2654c) --- deps/rabbit/test/amqp_jms_SUITE_data/pom.xml | 20 ++++- .../amqp/tests/jms/JmsConnectionTest.java | 43 +++++----- .../amqp/tests/jms/JmsTemporaryQueueTest.java | 17 ++-- .../amqp/tests/jms/JmsTestInfrastructure.java | 26 ++++++ .../jms/JmsTestInfrastructureExtension.java | 83 +++++++++++++++++++ .../rabbitmq/amqp/tests/jms/TestUtils.java | 36 +++++--- 6 files changed, 185 insertions(+), 40 deletions(-) create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java create mode 100644 deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml index cce3ecb58f45..ff312c90a8dc 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml +++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml @@ -10,6 +10,7 @@ 5.10.2 2.6.1 + [0.5.0-SNAPSHOT,) 1.2.13 2.43.0 1.25.2 @@ -30,13 +31,18 @@ ${qpid-jms-client.version} test - ch.qos.logback logback-classic ${logback.version} test + + com.rabbitmq.client + amqp-client + ${amqp-client.version} + test + @@ -81,4 +87,16 @@ + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + true + false + + + + diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java index 210f28c043c1..d526cbbee4ff 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsConnectionTest.java @@ -11,7 +11,8 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; @@ -31,19 +32,21 @@ import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; /** - * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + * Based on + * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. */ +@JmsTestInfrastructure public class JmsConnectionTest { + String destination; + @Test @Timeout(30) public void testCreateConnection() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { assertNotNull(connection); } } @@ -51,8 +54,7 @@ public void testCreateConnection() throws Exception { @Test @Timeout(30) public void testCreateConnectionAndStart() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { assertNotNull(connection); connection.start(); } @@ -63,7 +65,7 @@ public void testCreateConnectionAndStart() throws Exception { // Currently not supported by RabbitMQ. @Disabled public void testCreateWithDuplicateClientIdFails() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); JmsConnection connection1 = (JmsConnection) factory.createConnection(); connection1.setClientID("Test"); assertNotNull(connection1); @@ -87,8 +89,7 @@ public void testSetClientIdAfterStartedFails() { assertThrows( JMSException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection()) { + try (Connection connection = connection()) { connection.setClientID("Test"); connection.start(); connection.setClientID("NewTest"); @@ -99,7 +100,7 @@ public void testSetClientIdAfterStartedFails() { @Test @Timeout(30) public void testCreateConnectionAsSystemAdmin() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); factory.setUsername(adminUsername()); factory.setPassword(adminPassword()); try (Connection connection = factory.createConnection()) { @@ -111,8 +112,8 @@ public void testCreateConnectionAsSystemAdmin() throws Exception { @Test @Timeout(30) public void testCreateConnectionCallSystemAdmin() throws Exception { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection(adminUsername(), adminPassword())) { + try (Connection connection = + connectionFactory().createConnection(adminUsername(), adminPassword())) { assertNotNull(connection); connection.start(); } @@ -124,7 +125,7 @@ public void testCreateConnectionAsUnknwonUser() { assertThrows( JMSSecurityException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(TestUtils.brokerUri()); + JmsConnectionFactory factory = (JmsConnectionFactory) connectionFactory(); factory.setUsername("unknown"); factory.setPassword("unknown"); try (Connection connection = factory.createConnection()) { @@ -140,8 +141,7 @@ public void testCreateConnectionCallUnknwonUser() { assertThrows( JMSSecurityException.class, () -> { - JmsConnectionFactory factory = new JmsConnectionFactory(brokerUri()); - try (Connection connection = factory.createConnection("unknown", "unknown")) { + try (Connection connection = connectionFactory().createConnection("unknown", "unknown")) { assertNotNull(connection); connection.start(); } @@ -150,14 +150,11 @@ public void testCreateConnectionCallUnknwonUser() { @Test @Timeout(30) - public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exception { - Connection connection = new JmsConnectionFactory(brokerUri()).createConnection(); + public void testBrokerStopWontHangConnectionClose() throws Exception { + Connection connection = connection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // TODO use a "regular" queue - TemporaryQueue queue = session.createTemporaryQueue(); - // String destinationName = name(info); - // Queue queue = session.createQueue("/queues/" + destinationName); + Queue queue = queue(destination); connection.start(); MessageProducer producer = session.createProducer(queue); @@ -182,7 +179,7 @@ public void testBrokerStopWontHangConnectionClose(TestInfo info) throws Exceptio @Timeout(60) public void testConnectionExceptionBrokerStop() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - try (Connection connection = new JmsConnectionFactory(brokerUri()).createConnection()) { + try (Connection connection = connection()) { connection.setExceptionListener(exception -> latch.countDown()); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java index 3da83a90664e..ae60fa4b8a31 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTemporaryQueueTest.java @@ -11,12 +11,14 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; import static com.rabbitmq.amqp.tests.jms.TestUtils.brokerUri; +import static com.rabbitmq.amqp.tests.jms.TestUtils.connection; import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.fail; @@ -25,16 +27,23 @@ import java.util.UUID; import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; /** - * Based on https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. + * Based on + * https://github.com/apache/qpid-jms/tree/main/qpid-jms-interop-tests/qpid-jms-activemq-tests. */ public class JmsTemporaryQueueTest { Connection connection; + @BeforeEach + void init() throws JMSException { + connection = connection(); + } + @AfterEach void tearDown() throws JMSException { connection.close(); @@ -43,7 +52,6 @@ void tearDown() throws JMSException { @Test @Timeout(60) public void testCreatePublishConsumeTemporaryQueue() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -60,7 +68,6 @@ public void testCreatePublishConsumeTemporaryQueue() throws Exception { @Test @Timeout(60) public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -84,7 +91,6 @@ public void testCantConsumeFromTemporaryQueueCreatedOnAnotherConnection() throws @Test @Timeout(60) public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -113,7 +119,6 @@ public void testCantSendToTemporaryQueueFromClosedConnection() throws Exception @Test @Timeout(60) public void testCantDeleteTemporaryQueueWithConsumers() throws Exception { - connection = new JmsConnectionFactory(brokerUri()).createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java new file mode 100644 index 000000000000..0fbb689eb83b --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructure.java @@ -0,0 +1,26 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + +import java.lang.annotation.*; +import org.junit.jupiter.api.extension.ExtendWith; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ExtendWith(JmsTestInfrastructureExtension.class) +public @interface JmsTestInfrastructure {} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java new file mode 100644 index 000000000000..2254b00ab278 --- /dev/null +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTestInfrastructureExtension.java @@ -0,0 +1,83 @@ +// The contents of this file are subject to the Mozilla Public License +// Version 2.0 (the "License"); you may not use this file except in +// compliance with the License. You may obtain a copy of the License +// at https://www.mozilla.org/en-US/MPL/2.0/ +// +// Software distributed under the License is distributed on an "AS IS" +// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +// the License for the specific language governing rights and +// limitations under the License. +// +// The Original Code is RabbitMQ. +// +// The Initial Developer of the Original Code is Pivotal Software, Inc. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// +package com.rabbitmq.amqp.tests.jms; + + +import com.rabbitmq.client.amqp.Connection; +import com.rabbitmq.client.amqp.Environment; +import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder; +import java.lang.reflect.Field; +import org.junit.jupiter.api.extension.*; + +final class JmsTestInfrastructureExtension + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(JmsTestInfrastructureExtension.class); + + private static ExtensionContext.Store store(ExtensionContext extensionContext) { + return extensionContext.getRoot().getStore(NAMESPACE); + } + + private static Field field(Class cls, String name) { + Field field = null; + while (field == null && cls != null) { + try { + field = cls.getDeclaredField(name); + } catch (NoSuchFieldException e) { + cls = cls.getSuperclass(); + } + } + return field; + } + + @Override + public void beforeAll(ExtensionContext context) { + + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Field field = field(context.getTestInstance().get().getClass(), "destination"); + if (field != null) { + field.setAccessible(true); + String destination = TestUtils.name(context); + field.set(context.getTestInstance().get(), destination); + try (Environment environment = new AmqpEnvironmentBuilder().build(); + Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { + connection.management().queue(destination).declare(); + } + } + } + + @Override + public void afterEach(ExtensionContext context) throws Exception { + Field field = field(context.getTestInstance().get().getClass(), "destination"); + if (field != null) { + field.setAccessible(true); + String destination = (String) field.get(context.getTestInstance().get()); + try (Environment environment = new AmqpEnvironmentBuilder().build(); + Connection connection = environment.connectionBuilder().uri(TestUtils.brokerUri()).build()) { + connection.management().queueDelete(destination); + } + } + } + + @Override + public void afterAll(ExtensionContext context) { + + } +} diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java index d53a6bd26fd7..192babb84ddf 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -11,29 +11,30 @@ // The Original Code is RabbitMQ. // // The Initial Developer of the Original Code is Pivotal Software, Inc. -// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +// Copyright (c) 2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. +// and/or its subsidiaries. All rights reserved. // package com.rabbitmq.amqp.tests.jms; import static java.lang.String.format; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import jakarta.jms.Connection; +import jakarta.jms.ConnectionFactory; +import jakarta.jms.JMSException; +import jakarta.jms.Queue; import java.lang.reflect.Method; import java.util.UUID; - -import org.junit.jupiter.api.Tag; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsQueue; import org.junit.jupiter.api.TestInfo; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.ExtensionContext; final class TestUtils { private static final String DEFAULT_BROKER_URI = "amqp://localhost:5672"; - private TestUtils() { } + private TestUtils() {} static String brokerUri() { String uri = System.getProperty("rmq_broker_uri", "amqp://localhost:5672"); @@ -48,10 +49,26 @@ static String adminPassword() { return "guest"; } + static ConnectionFactory connectionFactory() { + return new JmsConnectionFactory(brokerUri()); + } + + static Connection connection() throws JMSException { + return connectionFactory().createConnection(); + } + + static Queue queue(String name) { + // no path encoding, use names with e.g. ASCII characters only + return new JmsQueue("/queues/" + name); + } + static String name(TestInfo info) { return name(info.getTestClass().get(), info.getTestMethod().get()); } + static String name(ExtensionContext context) { + return name(context.getTestInstance().get().getClass(), context.getTestMethod().get()); + } private static String name(Class testClass, Method testMethod) { return name(testClass, testMethod.getName()); @@ -62,5 +79,4 @@ private static String name(Class testClass, String testMethod) { return format( "%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2)); } - } From 34a963e366bb1136b941402aab3d107024e91833 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 11 Feb 2025 15:47:01 +0100 Subject: [PATCH 4/5] Use ProtonJ2 in JMS-to-AMQP interop test (cherry picked from commit 4ec2b755eec918e599373be13cd73956298bea5c) --- deps/rabbit/test/amqp_jms_SUITE.erl | 44 +------------------ deps/rabbit/test/amqp_jms_SUITE_data/pom.xml | 7 +++ .../com/rabbitmq/amqp/tests/jms/JmsTest.java | 44 ++++++++++++++++--- .../rabbitmq/amqp/tests/jms/TestUtils.java | 37 ++++++++++++++++ 4 files changed, 84 insertions(+), 48 deletions(-) diff --git a/deps/rabbit/test/amqp_jms_SUITE.erl b/deps/rabbit/test/amqp_jms_SUITE.erl index baad72b01465..7a5462eda3b0 100644 --- a/deps/rabbit/test/amqp_jms_SUITE.erl +++ b/deps/rabbit/test/amqp_jms_SUITE.erl @@ -129,48 +129,8 @@ message_types_jms_to_jms(Config) -> %% Send different message types from JMS client to Erlang AMQP 1.0 client. message_types_jms_to_amqp(Config) -> - TestName = QName = atom_to_binary(?FUNCTION_NAME), - ok = declare_queue(QName, <<"quorum">>, Config), - Address = rabbitmq_amqp_address:queue(QName), - - %% The JMS client sends messaegs. - ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config), - - %% The Erlang AMQP 1.0 client receives messages. - OpnConf = connection_config(Config), - {ok, Connection} = amqp10_client:open_connection(OpnConf), - {ok, Session} = amqp10_client:begin_session_sync(Connection), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled), - {ok, Msg1} = amqp10_client:get_msg(Receiver), - - ?assertEqual( - #'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}}, - amqp10_msg:body(Msg1)), - {ok, Msg2} = amqp10_client:get_msg(Receiver), - ?assertEqual( - #'v1_0.amqp_value'{ - content = {map, [ - {{utf8, <<"key1">>}, {utf8, <<"value">>}}, - {{utf8, <<"key2">>}, true}, - {{utf8, <<"key3">>}, {double, -1.1}}, - {{utf8, <<"key4">>}, {long, -1}} - ]}}, - amqp10_msg:body(Msg2)), - {ok, Msg3} = amqp10_client:get_msg(Receiver), - ?assertEqual( - [ - #'v1_0.amqp_sequence'{ - content = [{utf8, <<"value">>}, - true, - {double, -1.1}, - {long, -1}]} - ], - amqp10_msg:body(Msg3)), - - ok = detach_link_sync(Receiver), - ok = end_session_sync(Session), - ok = close_connection_sync(Connection), - ok = delete_queue(QName, Config). + TestName = atom_to_binary(?FUNCTION_NAME), + ok = run_jms_test(TestName, [], Config). temporary_queue_rpc(Config) -> TestName = QName = atom_to_binary(?FUNCTION_NAME), diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml index ff312c90a8dc..8b06c85521b0 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml +++ b/deps/rabbit/test/amqp_jms_SUITE_data/pom.xml @@ -9,6 +9,7 @@ https://www.rabbitmq.com 5.10.2 + 3.27.3 2.6.1 [0.5.0-SNAPSHOT,) 1.2.13 @@ -43,6 +44,12 @@ ${amqp-client.version} test + + org.assertj + assertj-core + ${assertj.version} + test + diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java index 23b66512fa3a..71e736a4e016 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java @@ -1,12 +1,22 @@ package com.rabbitmq.amqp.tests.jms; +import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient; +import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; import jakarta.jms.*; import java.util.*; +import java.util.concurrent.TimeUnit; import javax.naming.Context; + +import com.rabbitmq.qpid.protonj2.client.Client; +import com.rabbitmq.qpid.protonj2.client.Delivery; +import com.rabbitmq.qpid.protonj2.client.Receiver; +import jakarta.jms.Queue; import org.junit.jupiter.api.Test; +@JmsTestInfrastructure public class JmsTest { private javax.naming.Context getContext() throws Exception{ @@ -94,18 +104,20 @@ public void message_types_jms_to_jms() throws Exception { } } + String destination; + @Test public void message_types_jms_to_amqp() throws Exception { Context context = getContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); + Queue queue = TestUtils.queue(destination); + String msg1 = "msg1🥕"; try (Connection connection = factory.createConnection()) { Session session = connection.createSession(); - Destination queue = (Destination) context.lookup("myQueue"); MessageProducer producer = session.createProducer(queue); // TextMessage - String msg1 = "msg1🥕"; TextMessage textMessage = session.createTextMessage(msg1); producer.send(textMessage); @@ -126,12 +138,32 @@ public void message_types_jms_to_amqp() throws Exception { producer.send(streamMessage); } + try (Client client = protonClient(); + com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) { + Receiver receiver = amqpConnection.openReceiver(queue.getQueueName()); + Delivery delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + assertEquals(msg1, delivery.message().body()); + + delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + com.rabbitmq.qpid.protonj2.client.Message> mapMessage = delivery.message(); + assertThat(mapMessage.body()).containsEntry("key1", "value") + .containsEntry("key2", true) + .containsEntry("key3", -1.1) + .containsEntry("key4", -1L); + + delivery = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(delivery); + com.rabbitmq.qpid.protonj2.client.Message> listMessage = delivery.message(); + assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L); } + } - // Test that Request/reply pattern using a TemporaryQueue works. - // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee - @Test - public void temporary_queue_rpc() throws Exception { + // Test that Request/reply pattern using a TemporaryQueue works. + // https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee + @Test + public void temporary_queue_rpc() throws Exception { Context context = getContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection"); diff --git a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java index 192babb84ddf..8cb972cbbbe2 100644 --- a/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java +++ b/deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java @@ -19,11 +19,16 @@ import static java.lang.String.format; +import com.rabbitmq.qpid.protonj2.client.Client; +import com.rabbitmq.qpid.protonj2.client.ConnectionOptions; +import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException; import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.JMSException; import jakarta.jms.Queue; import java.lang.reflect.Method; +import java.net.URI; +import java.net.URISyntaxException; import java.util.UUID; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsQueue; @@ -41,6 +46,24 @@ static String brokerUri() { return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri; } + static String brokerHost() { + try { + URI uri = new URI(brokerUri()); + return uri.getHost(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + static int brokerPort() { + try { + URI uri = new URI(brokerUri()); + return uri.getPort(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + static String adminUsername() { return "guest"; } @@ -62,6 +85,20 @@ static Queue queue(String name) { return new JmsQueue("/queues/" + name); } + static Client protonClient() { + return Client.create(); + } + + static com.rabbitmq.qpid.protonj2.client.Connection protonConnection(Client client) { + ConnectionOptions connectionOptions = new ConnectionOptions().virtualHost("vhost:/"); + connectionOptions.saslOptions().addAllowedMechanism("ANONYMOUS"); + try { + return client.connect(brokerHost(), brokerPort(), connectionOptions); + } catch (ClientException e) { + throw new RuntimeException(e); + } + } + static String name(TestInfo info) { return name(info.getTestClass().get(), info.getTestMethod().get()); } From 07aa0db6c8f5773ae625369708b195fed76245f3 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 11 Feb 2025 16:15:35 +0100 Subject: [PATCH 5/5] Add 4.1.0 release notes (cherry picked from commit c5867a7bd373d01587547973105e855ca80d4912) --- deps/rabbit/src/rabbit_reader.erl | 2 +- release-notes/4.1.0.md | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 498e333bc8c0..723ca4b5df58 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -202,7 +202,7 @@ conserve_resources(Pid, Source, {_, Conserve, _}) -> server_properties(Protocol) -> {ok, Product} = application:get_key(rabbit, description), - {ok, Version} = application:get_key(rabbit, vsn), + Version = rabbit_misc:version(), %% Get any configuration-specified server properties {ok, RawConfigServerProps} = application:get_env(rabbit, diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index d61c8d9ee48f..3a82c3bed0cf 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -124,13 +124,6 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas GitHub issue: [#12599](https://github.com/rabbitmq/rabbitmq-server/pull/12599) - * Nodes will now fall back to system CA certificate list (if available) when no CA certificate - is explicitly configured. - - Contributed by @LoisSotoLopez. - - GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564) - * AMQP 1.0 filters now have capped complexity: filtering on more than 16 properties won't be possible. This is a protection mechanism recommended in the AMQP 1.0 spec. @@ -145,6 +138,19 @@ This section can be incomplete and will be expanded as 4.1 approaches its releas GitHub issue: [#12559](https://github.com/rabbitmq/rabbitmq-server/pull/12559) + * Support field `dynamic` of AMQP 1.0 [source](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source) and [target](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target). + + This allows AMQP clients to dynamically create [exclusive queues](https://www.rabbitmq.com/docs/queues#exclusive-queues), which can be useful for RPC workloads. + + GitHub issue: [#13231](https://github.com/rabbitmq/rabbitmq-server/pull/13231) + + * Nodes will now fall back to system CA certificate list (if available) when no CA certificate + is explicitly configured. + + Contributed by @LoisSotoLopez. + + GitHub issue: [#10519](https://github.com/rabbitmq/rabbitmq-server/issues/10519), [#12564](https://github.com/rabbitmq/rabbitmq-server/pull/12564) + * Peer discovery resilience improvements. GitHub issues: [#12801](https://github.com/rabbitmq/rabbitmq-server/pull/12801), [#12809](https://github.com/rabbitmq/rabbitmq-server/pull/12809)