Permalink
Browse files

stable to default

  • Loading branch information...
2 parents 0ff2370 + 3bb95e5 commit 39f1572deb512061483286a05288e79356926c29 Simon MacMullen committed Jan 14, 2014
@@ -15,8 +15,8 @@
%%
-record(endpoint,
- {amqp_params,
- resource_declarations
+ {uris,
+ resource_declaration
}).
-record(shovel,
View
@@ -1,3 +1,3 @@
RELEASABLE:=true
DEPS:=rabbitmq-erlang-client
-WITH_BROKER_TEST_COMMANDS:=rabbit_shovel_test:test()
+WITH_BROKER_TEST_COMMANDS:=rabbit_shovel_test_all:all_tests()
@@ -113,7 +113,7 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
end,
{[], Brokers1} = run_state_monad(
lists:duplicate(length(Brokers),
- fun parse_uri/1),
+ fun check_uri/1),
{Brokers, []}),
ResourceDecls =
@@ -128,16 +128,20 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
lists:duplicate(length(ResourceDecls), fun parse_declaration/1),
{ResourceDecls, []}),
- return({#endpoint{amqp_params = Brokers1,
- resource_declarations = lists:reverse(ResourceDecls1)},
+ DeclareFun =
+ fun (_Conn, Ch) ->
+ [amqp_channel:call(Ch, M) || M <- lists:reverse(ResourceDecls1)]
+ end,
+ return({#endpoint{uris = Brokers1,
+ resource_declaration = DeclareFun},
Pos});
parse_endpoint({Endpoint, _Pos}) ->
fail({require_list, Endpoint}).
-parse_uri({[Uri | Uris], Acc}) ->
+check_uri({[Uri | Uris], Acc}) ->
case amqp_uri:parse(Uri) of
- {ok, Params} ->
- return({Uris, [Params | Acc]});
+ {ok, _Params} ->
+ return({Uris, [Uri | Acc]});
{error, _} = Err ->
throw(Err)
end.
@@ -205,7 +209,7 @@ make_publish_fun(Fields, Pos, ValidFields) ->
case SuppliedFields -- ValidFields of
[] ->
FieldIndices = make_field_indices(ValidFields, Fields),
- Fun = fun (Publish) ->
+ Fun = fun (_SrcUri, _DestUri, Publish) ->
lists:foldl(fun ({Pos1, Value}, Pub) ->
setelement(Pos1, Pub, Value)
end, Publish, FieldIndices)
@@ -0,0 +1,40 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_dyn_worker_sup).
+-behaviour(supervisor2).
+
+-export([start_link/2, init/1]).
+
+-import(rabbit_misc, [pget/3]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+start_link(Name, Config) ->
+ supervisor2:start_link(?MODULE, [Name, Config]).
+
+%%----------------------------------------------------------------------------
+
+init([Name, Config]) ->
+ {ok, {{one_for_one, 3, 10},
+ [{Name,
+ {rabbit_shovel_worker, start_link, [dynamic, Name, Config]},
+ case pget(<<"reconnect-delay">>, Config, 1) of
+ N when is_integer(N) andalso N > 0 -> {permanent, N};
+ _ -> temporary
+ end,
+ 16#ffffffff, worker, [rabbit_shovel_worker]}]}}.
@@ -0,0 +1,59 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_dyn_worker_sup_sup).
+-behaviour(mirrored_supervisor).
+
+-export([start_link/0, init/1, adjust_or_start_child/2, stop_child/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+start_link() ->
+ {ok, Pid} = mirrored_supervisor:start_link({local, ?SUPERVISOR},
+ ?SUPERVISOR, ?MODULE, []),
+ Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>),
+ [adjust_or_start_child({pget(vhost, Shovel), pget(name, Shovel)},
+ pget(value, Shovel)) || Shovel <- Shovels],
+ {ok, Pid}.
+
+adjust_or_start_child(Name, Def) ->
+ case child_exists(Name) of
+ true -> stop_child(Name);
+ false -> ok
+ end,
+ {ok, _Pid} =
+ mirrored_supervisor:start_child(
+ ?SUPERVISOR,
+ {Name, {rabbit_shovel_dyn_worker_sup, start_link, [Name, Def]},
+ transient, ?MAX_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}),
+ ok.
+
+child_exists(Name) ->
+ lists:any(fun ({N, _, _, _}) -> N =:= Name end,
+ mirrored_supervisor:which_children(?SUPERVISOR)).
+
+stop_child(Name) ->
+ ok = mirrored_supervisor:terminate_child(?SUPERVISOR, Name),
+ ok = mirrored_supervisor:delete_child(?SUPERVISOR, Name),
+ rabbit_shovel_status:remove(Name).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_one, 3, 10}, []}}.
@@ -0,0 +1,210 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_parameters).
+-behaviour(rabbit_runtime_parameter).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_shovel.hrl").
+
+-export([validate/4, notify/4, notify_clear/3]).
+-export([register/0, parse/2]).
+
+-import(rabbit_misc, [pget/2, pget/3]).
+
+-define(ROUTING_HEADER, <<"x-shovelled">>).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "shovel parameters"},
+ {mfa, {rabbit_shovel_parameters, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE).
+
+validate(_VHost, <<"shovel">>, Name, Def) ->
+ [case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of
+ zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};
+ one -> ok;
+ both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []}
+ end,
+ case pget2(<<"dest-exchange">>, <<"dest-queue">>, Def) of
+ zero -> ok;
+ one -> ok;
+ both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
+ end | rabbit_parameter_validation:proplist(Name, validation(), Def)];
+
+validate(_VHost, _Component, Name, _Term) ->
+ {error, "name not recognised: ~p", [Name]}.
+
+pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of
+ {undefined, undefined} -> zero;
+ {undefined, _} -> one;
+ {_, undefined} -> one;
+ {_, _} -> both
+ end.
+
+notify(VHost, <<"shovel">>, Name, Definition) ->
+ rabbit_shovel_dyn_worker_sup_sup:adjust_or_start_child(
+ {VHost, Name}, Definition).
+
+notify_clear(VHost, <<"shovel">>, Name) ->
+ rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}).
+
+%%----------------------------------------------------------------------------
+
+validation() ->
+ [{<<"src-uri">>, fun validate_uri/2, mandatory},
+ {<<"dest-uri">>, fun validate_uri/2, mandatory},
+ {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
+ {<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
+ {<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
+ {<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
+ {<<"ack-mode">>, rabbit_parameter_validation:enum(
+ ['no-ack', 'on-publish', 'on-confirm']), optional}
+ ].
+
+%% TODO this function is duplicated from federation. Move to amqp_uri module?
+validate_uri(Name, Term) when is_binary(Term) ->
+ case rabbit_parameter_validation:binary(Name, Term) of
+ ok -> case amqp_uri:parse(binary_to_list(Term)) of
+ {ok, _} -> ok;
+ {error, E} -> {error, "\"~s\" not a valid URI: ~p", [Term, E]}
+ end;
+ E -> E
+ end;
+validate_uri(Name, Term) ->
+ case rabbit_parameter_validation:list(Name, Term) of
+ ok -> case [V || U <- Term,
+ V <- [validate_uri(Name, U)],
+ element(1, V) =:= error] of
+ [] -> ok;
+ [E | _] -> E
+ end;
+ E -> E
+ end.
+
+%%----------------------------------------------------------------------------
+
+parse({VHost, Name}, Def) ->
+ SrcURIs = get_uris(<<"src-uri">>, Def),
+ DestURIs = get_uris(<<"dest-uri">>, Def),
+ SrcX = pget(<<"src-exchange">>, Def, none),
+ SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
+ SrcQ = pget(<<"src-queue">>, Def, none),
+ DestX = pget(<<"dest-exchange">>, Def, none),
+ DestXKey = pget(<<"dest-exchange-key">>, Def, none),
+ DestQ = pget(<<"dest-queue">>, Def, none),
+ %% [1] src-exchange-key is never ignored if src-exchange is set
+ {SrcFun, Queue, Table1} =
+ case SrcQ of
+ none -> {fun (_Conn, Ch) ->
+ Ms = [#'queue.declare'{exclusive = true},
+ #'queue.bind'{routing_key = SrcXKey,
+ exchange = SrcX}],
+ [amqp_channel:call(Ch, M) || M <- Ms]
+ end, <<>>, [{<<"src-exchange">>, SrcX},
+ {<<"src-exchange-key">>, SrcXKey}]};
+ _ -> {fun (Conn, _Ch) ->
+ ensure_queue(Conn, SrcQ)
+ end, SrcQ, [{<<"src-queue">>, SrcQ}]}
+ end,
+ DestFun = fun (Conn, _Ch) ->
+ case DestQ of
+ none -> ok;
+ _ -> ensure_queue(Conn, DestQ)
+ end
+ end,
+ {X, Key, Table2}
+ = case DestQ of
+ none -> {DestX, DestXKey, [{<<"dest-exchange">>, DestX},
+ {<<"dest-exchange-key">>, DestXKey}]};
+ _ -> {<<>>, DestQ, [{<<"dest-queue">>, DestQ}]}
+ end,
+ PubFun = fun (_SrcURI, _DestURI, P0) ->
+ P1 = case X of
+ none -> P0;
+ _ -> P0#'basic.publish'{exchange = X}
+ end,
+ case Key of
+ none -> P1;
+ _ -> P1#'basic.publish'{routing_key = Key}
+ end
+ end,
+ AddHeaders = pget(<<"add-forward-headers">>, Def, false),
+ Table0 = [{<<"shovel-host">>, rabbit_nodes:fqdn_nodename()},
+ {<<"shovel-name">>, Name},
+ {<<"shovel-vhost">>, VHost}],
+ PubPropsFun = fun (SrcURI, DestURI, P = #'P_basic'{headers = H}) ->
+ case AddHeaders of
+ true -> H1 = update_headers(
+ Table0, Table1 ++ Table2,
+ SrcURI, DestURI, H),
+ P#'P_basic'{headers = H1};
+ false -> P
+ end
+ end,
+ {ok, #shovel{
+ sources = #endpoint{uris = SrcURIs,
+ resource_declaration = SrcFun},
+ destinations = #endpoint{uris = DestURIs,
+ resource_declaration = DestFun},
+ prefetch_count = pget(<<"prefetch-count">>, Def, 1000),
+ ack_mode = translate_ack_mode(
+ pget(<<"ack-mode">>, Def, <<"on-confirm">>)),
+ publish_fields = PubFun,
+ publish_properties = PubPropsFun,
+ queue = Queue,
+ reconnect_delay = pget(<<"reconnect-delay">>, Def, 1)}}.
+
+get_uris(Key, Def) ->
+ URIs = case pget(Key, Def) of
+ B when is_binary(B) -> [B];
+ L when is_list(L) -> L
+ end,
+ [binary_to_list(URI) || URI <- URIs].
+
+translate_ack_mode(<<"on-confirm">>) -> on_confirm;
+translate_ack_mode(<<"on-publish">>) -> on_publish;
+translate_ack_mode(<<"no-ack">>) -> no_ack.
+
+ensure_queue(Conn, Queue) ->
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ try
+ amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
+ passive = true})
+ catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
+ {ok, Ch2} = amqp_connection:open_channel(Conn),
+ amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
+ durable = true}),
+ catch amqp_channel:close(Ch2)
+
+ after
+ catch amqp_channel:close(Ch)
+ end.
+
+update_headers(Table0, Table1, SrcURI, DestURI, Headers) ->
+ Table = Table0 ++ [{<<"src-uri">>, list_to_binary(SrcURI)},
+ {<<"dest-uri">>, list_to_binary(DestURI)}] ++ Table1,
+ rabbit_basic:prepend_table_header(
+ ?ROUTING_HEADER, [{K, longstr, V} || {K, V} <- Table],
+ Headers).
Oops, something went wrong.

0 comments on commit 39f1572

Please sign in to comment.