Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/test-make-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ jobs:
- rabbitmq_cli
- rabbitmq_consistent_hash_exchange
- rabbitmq_event_exchange
- rabbitmq_federation
- rabbitmq_exchange_federation
- rabbitmq_federation_common
- rabbitmq_federation_management
- rabbitmq_federation_prometheus
- rabbitmq_jms_topic_exchange
Expand All @@ -98,6 +99,7 @@ jobs:
- rabbitmq_peer_discovery_k8s
- rabbitmq_prelaunch
- rabbitmq_prometheus
- rabbitmq_queue_federation
- rabbitmq_recent_history_exchange
- rabbitmq_sharding
- rabbitmq_shovel
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/test-make-type-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ jobs:
- rabbitmq_aws
- rabbitmq_consistent_hash_exchange
- rabbitmq_event_exchange
- rabbitmq_federation
- rabbitmq_exchange_federation
- rabbitmq_federation_common
- rabbitmq_federation_management
- rabbitmq_federation_prometheus
- rabbitmq_jms_topic_exchange
Expand All @@ -50,6 +51,7 @@ jobs:
- rabbitmq_peer_discovery_k8s
- rabbitmq_prelaunch
- rabbitmq_prometheus
- rabbitmq_queue_federation
- rabbitmq_recent_history_exchange
- rabbitmq_sharding
- rabbitmq_shovel
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ elvis
!/deps/rabbitmq_ct_helpers/
!/deps/rabbitmq_ct_client_helpers/
!/deps/rabbitmq_event_exchange/
!/deps/rabbitmq_exchange_federation/
!/deps/rabbitmq_federation/
!/deps/rabbitmq_federation_common/
!/deps/rabbitmq_federation_management/
!/deps/rabbitmq_federation_prometheus/
!/deps/rabbitmq_jms_topic_exchange/
Expand All @@ -62,6 +64,7 @@ elvis
!/deps/rabbitmq_peer_discovery_k8s/
!/deps/rabbitmq_prelaunch/
!/deps/rabbitmq_prometheus/
!/deps/rabbitmq_queue_federation/
!/deps/rabbitmq_random_exchange/
!/deps/rabbitmq_recent_history_exchange/
!/deps/rabbitmq_sharding/
Expand Down
28 changes: 14 additions & 14 deletions deps/rabbitmq_cli/test/plugins/disable_plugins_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ defmodule DisablePluginsCommandTest do
@command.run(["rabbitmq_stomp"], Map.merge(context[:opts], %{node: :nonode}))

assert [
[:rabbitmq_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_federation]}
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
] ==
Enum.to_list(test_stream)

assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] ==
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand All @@ -125,13 +125,13 @@ defmodule DisablePluginsCommandTest do
)

assert [
[:rabbitmq_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_federation]}
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :offline, disabled: [:rabbitmq_stomp], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
] == Enum.to_list(test_stream)

assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] ==
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand All @@ -145,7 +145,7 @@ defmodule DisablePluginsCommandTest do

assert [
[:rabbitmq_stomp],
%{mode: :offline, disabled: [:rabbitmq_federation], set: [:rabbitmq_stomp]}
%{mode: :offline, disabled: [:rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_exchange_federation, :rabbitmq_federation], set: [:rabbitmq_stomp]}
] == Enum.to_list(test_stream0)

assert {:ok, [[:rabbitmq_stomp]]} == :file.consult(context[:opts][:enabled_plugins_file])
Expand All @@ -166,20 +166,20 @@ defmodule DisablePluginsCommandTest do
assert {:stream, test_stream0} = @command.run(["rabbitmq_stomp"], context[:opts])

assert [
[:rabbitmq_federation],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{
mode: :online,
started: [],
stopped: [:rabbitmq_stomp],
disabled: [:rabbitmq_stomp],
set: [:rabbitmq_federation]
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]
}
] ==
Enum.to_list(test_stream0)

assert {:ok, [[:rabbitmq_federation]]} == :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation] ==
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation] ==
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))

assert {:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])
Expand All @@ -189,8 +189,8 @@ defmodule DisablePluginsCommandTest do
%{
mode: :online,
started: [],
stopped: [:rabbitmq_federation],
disabled: [:rabbitmq_federation],
stopped: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
disabled: [:rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_exchange_federation, :rabbitmq_federation],
set: []
}
] ==
Expand All @@ -214,7 +214,7 @@ defmodule DisablePluginsCommandTest do
|> Map.update!(:stopped, &Enum.sort/1)
|> Map.update!(:disabled, &Enum.sort/1)

expected_list = Enum.sort([:rabbitmq_federation, :rabbitmq_stomp])
expected_list = Enum.sort([:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp])

assert [
[],
Expand Down Expand Up @@ -243,7 +243,7 @@ defmodule DisablePluginsCommandTest do
|> Map.update!(:stopped, &Enum.sort/1)
|> Map.update!(:disabled, &Enum.sort/1)

expected_list = Enum.sort([:rabbitmq_federation, :rabbitmq_stomp])
expected_list = Enum.sort([:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp])

assert [
[],
Expand Down
36 changes: 18 additions & 18 deletions deps/rabbitmq_cli/test/plugins/enable_plugins_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule EnablePluginsCommandTest do

check_plugins_enabled([:rabbitmq_stomp], context)

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] ==
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] ==
currently_active_plugins(context)
end

Expand All @@ -144,7 +144,7 @@ defmodule EnablePluginsCommandTest do
check_plugins_enabled([:rabbitmq_stomp], context)

assert_equal_sets(
[:amqp_client, :rabbitmq_federation, :rabbitmq_stomp],
[:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
currently_active_plugins(context)
)
end
Expand Down Expand Up @@ -174,11 +174,11 @@ defmodule EnablePluginsCommandTest do
)

assert [
[:rabbitmq_federation, :rabbitmq_stomp],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
%{
mode: :offline,
enabled: [:rabbitmq_federation],
set: [:rabbitmq_federation, :rabbitmq_stomp]
enabled: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
}
] ==
Enum.to_list(test_stream1)
Expand Down Expand Up @@ -210,21 +210,21 @@ defmodule EnablePluginsCommandTest do
{:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])

assert [
[:rabbitmq_federation, :rabbitmq_stomp],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
%{
mode: :online,
started: [:rabbitmq_federation],
started: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
stopped: [],
enabled: [:rabbitmq_federation],
set: [:rabbitmq_federation, :rabbitmq_stomp]
enabled: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
}
] ==
Enum.to_list(test_stream1)

check_plugins_enabled([:rabbitmq_stomp, :rabbitmq_federation], context)

assert_equal_sets(
[:amqp_client, :rabbitmq_federation, :rabbitmq_stomp],
[:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
currently_active_plugins(context)
)

Expand All @@ -239,21 +239,21 @@ defmodule EnablePluginsCommandTest do
@command.run(["rabbitmq_stomp", "rabbitmq_federation"], context[:opts])

assert [
[:rabbitmq_federation, :rabbitmq_stomp],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
%{
mode: :online,
started: [:rabbitmq_federation, :rabbitmq_stomp],
started: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
stopped: [],
enabled: [:rabbitmq_federation, :rabbitmq_stomp],
set: [:rabbitmq_federation, :rabbitmq_stomp]
enabled: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
}
] ==
Enum.to_list(test_stream)

check_plugins_enabled([:rabbitmq_stomp, :rabbitmq_federation], context)

assert_equal_sets(
[:amqp_client, :rabbitmq_federation, :rabbitmq_stomp],
[:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
currently_active_plugins(context)
)

Expand All @@ -266,14 +266,14 @@ defmodule EnablePluginsCommandTest do
assert {:stream, test_stream} = @command.run(["amqp_client"], context[:opts])

assert [
[:rabbitmq_federation],
%{mode: :online, started: [], stopped: [], enabled: [], set: [:rabbitmq_federation]}
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{mode: :online, started: [], stopped: [], enabled: [], set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]}
] ==
Enum.to_list(test_stream)

check_plugins_enabled([:rabbitmq_federation], context)

assert [:amqp_client, :rabbitmq_federation] ==
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation] ==
currently_active_plugins(context)

reset_enabled_plugins_to_preconfigured_defaults(context)
Expand Down
24 changes: 12 additions & 12 deletions deps/rabbitmq_cli/test/plugins/set_plugins_command_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ defmodule SetPluginsCommandTest do

assert {:ok, [[:rabbitmq_stomp]]} = :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] =
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] =
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand All @@ -108,7 +108,7 @@ defmodule SetPluginsCommandTest do

assert {:ok, [[:rabbitmq_stomp]]} = :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] =
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] =
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand All @@ -120,7 +120,7 @@ defmodule SetPluginsCommandTest do
%{
mode: :online,
started: [],
stopped: [:rabbitmq_federation],
stopped: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
set: [:rabbitmq_stomp]
}
] = Enum.to_list(test_stream0)
Expand All @@ -133,18 +133,18 @@ defmodule SetPluginsCommandTest do
assert {:stream, test_stream1} = @command.run(["rabbitmq_federation"], context[:opts])

assert [
[:rabbitmq_federation],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
%{
mode: :online,
started: [:rabbitmq_federation],
started: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation],
stopped: [:rabbitmq_stomp],
set: [:rabbitmq_federation]
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation]
}
] = Enum.to_list(test_stream1)

assert {:ok, [[:rabbitmq_federation]]} = :file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation] =
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation] =
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand All @@ -156,7 +156,7 @@ defmodule SetPluginsCommandTest do
%{
mode: :online,
started: [],
stopped: [:rabbitmq_federation, :rabbitmq_stomp],
stopped: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
set: []
}
] = Enum.to_list(test_stream)
Expand All @@ -174,19 +174,19 @@ defmodule SetPluginsCommandTest do
@command.run(["rabbitmq_federation", "rabbitmq_stomp"], context[:opts])

assert [
[:rabbitmq_federation, :rabbitmq_stomp],
[:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
%{
mode: :online,
started: [:rabbitmq_federation, :rabbitmq_stomp],
started: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp],
stopped: [],
set: [:rabbitmq_federation, :rabbitmq_stomp]
set: [:rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp]
}
] = Enum.to_list(test_stream)

assert {:ok, [[:rabbitmq_federation, :rabbitmq_stomp]]} =
:file.consult(context[:opts][:enabled_plugins_file])

assert [:amqp_client, :rabbitmq_federation, :rabbitmq_stomp] =
assert [:amqp_client, :rabbitmq_exchange_federation, :rabbitmq_federation, :rabbitmq_federation_common, :rabbitmq_queue_federation, :rabbitmq_stomp] =
Enum.sort(:rabbit_misc.rpc_call(context[:opts][:node], :rabbit_plugins, :active, []))
end

Expand Down
25 changes: 25 additions & 0 deletions deps/rabbitmq_exchange_federation/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
PROJECT = rabbitmq_exchange_federation
PROJECT_DESCRIPTION = RabbitMQ Exchange Federation
PROJECT_MOD = rabbit_exchange_federation_app

define PROJECT_ENV
[
{pgroup_name_cluster_id, false},
{internal_exchange_check_interval, 90000}
]
endef

define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef

DEPS = rabbit_common rabbit amqp_client rabbitmq_federation_common
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers

PLT_APPS += rabbitmq_cli

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk

include ../../rabbitmq-components.mk
include ../../erlang.mk
23 changes: 23 additions & 0 deletions deps/rabbitmq_exchange_federation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## RabbitMQ Federation

RabbitMQ federation offers a group of features for loosely
coupled and WAN-friendly distributed RabbitMQ setups. Note that
this is not an alternative to queue mirroring.


## Supported RabbitMQ Versions

This plugin ships with RabbitMQ, there is no need to
install it separately.


## Documentation

See [RabbitMQ federation plugin](https://www.rabbitmq.com/federation.html) on rabbitmq.com.


## License and Copyright

Released under [the same license as RabbitMQ](https://www.rabbitmq.com/mpl.html).

2007-2015 (c) 2007-2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-define(FEDERATION_PG_SCOPE, rabbitmq_exchange_federation_pg_scope).
Loading