Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/rabbitmq_peer_discovery_etcd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ PROJECT_MOD = rabbitmq_peer_discovery_etcd_app
DEPS = rabbit_common rabbitmq_peer_discovery_common rabbit eetcd gun
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper meck
dep_ct_helper = git https://github.com/extend/ct_helper.git master
dep_gun = hex 1.3.3
dep_eetcd = hex 0.3.6
dep_gun = hex 2.1.0
dep_eetcd = hex 0.4.0

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,6 @@ end}.
{mapping, "cluster_formation.etcd.ssl_options.verify", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.verify", [
{datatype, {enum, [verify_peer, verify_none]}}]}.

{mapping, "cluster_formation.etcd.ssl_options.fail_if_no_peer_cert", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.fail_if_no_peer_cert", [
{datatype, {enum, [true, false]}}]}.

{mapping, "cluster_formation.etcd.ssl_options.cacertfile", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.cacertfile",
[{datatype, string}, {validators, ["file_accessible"]}]}.

Expand Down Expand Up @@ -214,17 +211,6 @@ end}.
{mapping, "cluster_formation.etcd.ssl_options.depth", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.depth",
[{datatype, integer}, {validators, ["byte"]}]}.

{mapping, "cluster_formation.etcd.ssl_options.dh", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.dh",
[{datatype, string}]}.

{translation, "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.dh",
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("cluster_formation.etcd.ssl_options.dh", Conf))
end}.

{mapping, "cluster_formation.etcd.ssl_options.dhfile", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.dhfile",
[{datatype, string}, {validators, ["file_accessible"]}]}.

{mapping, "cluster_formation.etcd.ssl_options.key.RSAPrivateKey", "rabbit.cluster_formation.peer_discovery_etcd.ssl_options.key",
[{datatype, string}]}.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,12 @@ recover(internal, start, Data = #statem_data{endpoints = Endpoints, connection_m
rabbit_log:debug("etcd v3 API client will attempt to connect, endpoints: ~ts",
[string:join(Endpoints, ",")]),
maybe_demonitor(Ref),
{Transport, TransportOpts} = pick_transport(Data),
case Transport of
tcp -> rabbit_log:info("etcd v3 API client is configured to connect over plain TCP, without using TLS");
tls -> rabbit_log:info("etcd v3 API client is configured to use TLS")
end,
ConnName = ?ETCD_CONN_NAME,
case connect(ConnName, Endpoints, Transport, TransportOpts, Data) of
case connect(?ETCD_CONN_NAME, Endpoints, Data) of
{ok, Pid} ->
rabbit_log:debug("etcd v3 API client connection: ~tp", [Pid]),
rabbit_log:debug("etcd v3 API client: total number of connections to etcd is ~tp", [length(eetcd_conn_sup:info())]),
{next_state, connected, Data#statem_data{
connection_name = ConnName,
connection_name = ?ETCD_CONN_NAME,
connection_pid = Pid,
connection_monitor = monitor(process, Pid)
}};
Expand Down Expand Up @@ -213,8 +207,12 @@ connected({call, From}, {unlock, GeneratedKey}, Data = #statem_data{connection_n
connected({call, From}, register, Data = #statem_data{connection_name = Conn}) ->
Ctx = registration_context(Conn, Data),
Key = node_key(Data),
eetcd_kv:put(Ctx, Key, registration_value(Data)),
rabbit_log:debug("etcd peer discovery: put key ~tp, done with registration", [Key]),
case eetcd_kv:put(Ctx, Key, registration_value(Data)) of
{ok, _} ->
rabbit_log:debug("etcd peer discovery: put key ~tp, done with registration", [Key]);
{error, Reason} ->
rabbit_log:error("etcd peer discovery: put key ~tp failed: ~p", [Key, Reason])
end,
gen_statem:reply(From, ok),
keep_state_and_data;
connected({call, From}, unregister, Data = #statem_data{connection_name = Conn}) ->
Expand Down Expand Up @@ -320,20 +318,21 @@ error_is_already_started({_Endpoint, already_started}) ->
error_is_already_started({_Endpoint, _}) ->
false.

connect(Name, Endpoints, Transport, TransportOpts, Data) ->
connect(Name, Endpoints, Data) ->
case eetcd_conn:lookup(Name) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, eetcd_conn_unavailable} ->
do_connect(Name, Endpoints, Transport, TransportOpts, Data)
do_connect(Name, Endpoints, Data)
end.

do_connect(Name, Endpoints, Transport, TransportOpts, Data = #statem_data{username = Username}) ->
do_connect(Name, Endpoints, Data = #statem_data{username = Username}) ->
Opts = connection_options(Data),
case Username of
undefined -> rabbit_log:info("etcd peer discovery: will connect to etcd without authentication (no credentials configured)");
_ -> rabbit_log:info("etcd peer discovery: will connect to etcd as user '~ts'", [Username])
end,
case eetcd:open(Name, Endpoints, connection_options(Data), Transport, TransportOpts) of
case eetcd:open(Name, Endpoints, Opts) of
{ok, Pid} -> {ok, Pid};
{error, Errors0} ->
Errors = case is_list(Errors0) of
Expand All @@ -354,16 +353,6 @@ do_connect(Name, Endpoints, Transport, TransportOpts, Data = #statem_data{userna
end
end.

connection_options(#statem_data{username = Username, obfuscated_password = Password}) ->
SharedOpts = [{mode, random}],
case {Username, Password} of
{undefined, _} -> SharedOpts;
{_, undefined} -> SharedOpts;
{UVal, PVal} ->
[{name, UVal}, {password, to_list(deobfuscate(PVal))}] ++ SharedOpts
end.


obfuscate(undefined) -> undefined;
obfuscate(Password) ->
credentials_obfuscation:encrypt(to_binary(Password)).
Expand All @@ -379,9 +368,9 @@ disconnect(ConnName, #statem_data{connection_monitor = Ref}) ->
unregister(Conn, Data = #statem_data{node_key_lease_id = LeaseID, node_lease_keepalive_pid = KAPid}) ->
Ctx = unregistration_context(Conn, Data),
Key = node_key(Data),
eetcd_kv:delete(Ctx, Key),
_ = eetcd_kv:delete(Ctx, Key),
rabbit_log:debug("etcd peer discovery: deleted key ~ts, done with unregistration", [Key]),
eetcd_lease:revoke(Ctx, LeaseID),
_ = eetcd_lease:revoke(Ctx, LeaseID),
exit(KAPid, normal),
rabbit_log:debug("etcd peer discovery: revoked a lease ~tp for node key ~ts", [LeaseID, Key]),
ok.
Expand Down Expand Up @@ -429,7 +418,24 @@ normalize_settings(Map) when is_map(Map) ->
maps:merge(maps:without([etcd_prefix, lock_wait_time], Map),
#{endpoints => AllEndpoints}).

pick_transport(#statem_data{tls_options = []}) ->
{tcp, []};
pick_transport(#statem_data{tls_options = Opts}) ->
{tls, Opts}.
connection_options(#statem_data{tls_options = TlsOpts,
username = Username,
obfuscated_password = Password}) ->
Opts0 = case TlsOpts of
[] ->
rabbit_log:info("etcd v3 API client is configured to use plain TCP (without TLS)"),
[{transport, tcp}];
_ ->
rabbit_log:info("etcd v3 API client is configured to use TLS"),
[{transport, tls},
{tls_opts, TlsOpts}]
end,
Opts = [{mode, random} | Opts0],
case Username =:= undefined orelse
Password =:= undefined of
true ->
Opts;
false ->
[{name, Username},
{password, to_list(deobfuscate(Password))}] ++ Opts
end.
6 changes: 6 additions & 0 deletions release-notes/4.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ for the complete list of related changes.
This default can be overridden by [configuring](https://www.rabbitmq.com/docs/configure#config-file) `mqtt.max_packet_size_authenticated`.
Note that this value must not be greater than `max_message_size` (which also defaults to 16 MiB).

### etcd Peer Discovery

The following `rabbitmq.conf` settings are unsupported:
* `cluster_formation.etcd.ssl_options.fail_if_no_peer_cert`
* `cluster_formation.etcd.ssl_options.dh`
* `cluster_formation.etcd.ssl_options.dhfile`

## Erlang/OTP Compatibility Notes

Expand Down
Loading