Permalink
Browse files

Report URIs not params in status.

  • Loading branch information...
1 parent eb1e5f5 commit dd9ce3761b42eb24f4edd7eb2a23a7639411d25e Simon MacMullen committed Nov 29, 2013
Showing with 25 additions and 24 deletions.
  1. +1 −1 include/rabbit_shovel.hrl
  2. +5 −5 src/rabbit_shovel_config.erl
  3. +6 −6 src/rabbit_shovel_parameters.erl
  4. +13 −12 src/rabbit_shovel_worker.erl
@@ -15,7 +15,7 @@
%%
-record(endpoint,
- {amqp_params,
+ {uris,
resource_declaration
}).
@@ -113,7 +113,7 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
end,
{[], Brokers1} = run_state_monad(
lists:duplicate(length(Brokers),
- fun parse_uri/1),
+ fun check_uri/1),
{Brokers, []}),
ResourceDecls =
@@ -132,16 +132,16 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
fun (_Conn, Ch) ->
[amqp_channel:call(Ch, M) || M <- lists:reverse(ResourceDecls1)]
end,
- return({#endpoint{amqp_params = Brokers1,
+ return({#endpoint{uris = Brokers1,
resource_declaration = DeclareFun},
Pos});
parse_endpoint({Endpoint, _Pos}) ->
fail({require_list, Endpoint}).
-parse_uri({[Uri | Uris], Acc}) ->
+check_uri({[Uri | Uris], Acc}) ->
case amqp_uri:parse(Uri) of
- {ok, Params} ->
- return({Uris, [Params | Acc]});
+ {ok, _Params} ->
+ return({Uris, [Uri | Acc]});
{error, _} = Err ->
throw(Err)
end.
@@ -102,8 +102,8 @@ validate_uri(Name, Term) ->
%%----------------------------------------------------------------------------
parse(Def) ->
- SrcParams = parse_uri(<<"src-uri">>, Def),
- DestParams = parse_uri(<<"dest-uri">>, Def),
+ SrcURIs = get_uris(<<"src-uri">>, Def),
+ DestURIs = get_uris(<<"dest-uri">>, Def),
SrcExch = pget(<<"src-exchange">>, Def, none),
SrcExchKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQueue = pget(<<"src-queue">>, Def, none),
@@ -143,9 +143,9 @@ parse(Def) ->
end
end,
{ok, #shovel{
- sources = #endpoint{amqp_params = SrcParams,
+ sources = #endpoint{uris = SrcURIs,
resource_declaration = SrcFun},
- destinations = #endpoint{amqp_params = DestParams,
+ destinations = #endpoint{uris = DestURIs,
resource_declaration = DestFun},
prefetch_count = pget(<<"prefetch-count">>, Def, 1000),
ack_mode = translate_ack_mode(
@@ -155,12 +155,12 @@ parse(Def) ->
queue = Queue,
reconnect_delay = pget(<<"reconnect-delay">>, Def, 1)}}.
-parse_uri(Key, Def) ->
+get_uris(Key, Def) ->
URIs = case pget(Key, Def) of
B when is_binary(B) -> [B];
L when is_list(L) -> L
end,
- [P || URI <- URIs, {ok, P} <- [amqp_uri:parse(binary_to_list(URI))]].
+ [binary_to_list(URI) || URI <- URIs].
translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
@@ -27,7 +27,7 @@
-define(MAX_CONNECTION_CLOSE_TIMEOUT, 10000).
-record(state, {inbound_conn, inbound_ch, outbound_conn, outbound_ch,
- name, type, config, inbound_params, outbound_params, unacked}).
+ name, type, config, inbound_uri, outbound_uri, unacked}).
start_link(Type, Name, Config) ->
ok = rabbit_shovel_status:report(Name, Type, starting),
@@ -52,10 +52,10 @@ handle_cast(init, State = #state{config = Config}) ->
process_flag(trap_exit, true),
random:seed(now()),
#shovel{sources = Sources, destinations = Destinations} = Config,
- {InboundConn, InboundChan, InboundParams} =
- make_conn_and_chan(Sources#endpoint.amqp_params),
- {OutboundConn, OutboundChan, OutboundParams} =
- make_conn_and_chan(Destinations#endpoint.amqp_params),
+ {InboundConn, InboundChan, InboundURI} =
+ make_conn_and_chan(Sources#endpoint.uris),
+ {OutboundConn, OutboundChan, OutboundURI} =
+ make_conn_and_chan(Destinations#endpoint.uris),
(Sources#endpoint.resource_declaration)(InboundConn, InboundChan),
(Destinations#endpoint.resource_declaration)(OutboundConn, OutboundChan),
@@ -84,8 +84,8 @@ handle_cast(init, State = #state{config = Config}) ->
State1 =
State#state{inbound_conn = InboundConn, inbound_ch = InboundChan,
outbound_conn = OutboundConn, outbound_ch = OutboundChan,
- inbound_params = InboundParams,
- outbound_params = OutboundParams,
+ inbound_uri = InboundURI,
+ outbound_uri = OutboundURI,
unacked = gb_trees:empty()},
ok = report_status(running, State1),
{noreply, State1}.
@@ -172,8 +172,8 @@ remove_delivery_tags(Seq, true, Unacked) ->
report_status(Verb, State) ->
rabbit_shovel_status:report(
State#state.name, State#state.type,
- {Verb, {source, State#state.inbound_params},
- {destination, State#state.outbound_params}}).
+ {Verb, {source, State#state.inbound_uri},
+ {destination, State#state.outbound_uri}}).
publish(Tag, Method, Msg,
State = #state{inbound_ch = InboundChan, outbound_ch = OutboundChan,
@@ -191,9 +191,10 @@ publish(Tag, Method, Msg,
State
end.
-make_conn_and_chan(AmqpParams) ->
- AmqpParam = lists:nth(random:uniform(length(AmqpParams)), AmqpParams),
+make_conn_and_chan(URIs) ->
+ URI = lists:nth(random:uniform(length(URIs)), URIs),
+ {ok, AmqpParam} = amqp_uri:parse(URI),
{ok, Conn} = amqp_connection:start(AmqpParam),
link(Conn),
{ok, Chan} = amqp_connection:open_channel(Conn),
- {Conn, Chan, AmqpParam}.
+ {Conn, Chan, amqp_uri:remove_credentials(URI)}.

0 comments on commit dd9ce37

Please sign in to comment.