Permalink
Browse files

stable to default

  • Loading branch information...
2 parents ba1d142 + 852ade3 commit 6dd2f4cb44828e3e7cb91e581691c16152f514f2 Simon MacMullen committed Jan 6, 2014
@@ -15,8 +15,8 @@
%%
-record(endpoint,
- {amqp_params,
- resource_declarations
+ {uris,
+ resource_declaration
}).
-record(shovel,
View
@@ -1,3 +1,3 @@
RELEASABLE:=true
DEPS:=rabbitmq-erlang-client
-WITH_BROKER_TEST_COMMANDS:=rabbit_shovel_test:test()
+WITH_BROKER_TEST_COMMANDS:=rabbit_shovel_test_all:all_tests()
@@ -113,7 +113,7 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
end,
{[], Brokers1} = run_state_monad(
lists:duplicate(length(Brokers),
- fun parse_uri/1),
+ fun check_uri/1),
{Brokers, []}),
ResourceDecls =
@@ -128,16 +128,20 @@ parse_endpoint({Endpoint, Pos}) when is_list(Endpoint) ->
lists:duplicate(length(ResourceDecls), fun parse_declaration/1),
{ResourceDecls, []}),
- return({#endpoint{amqp_params = Brokers1,
- resource_declarations = lists:reverse(ResourceDecls1)},
+ DeclareFun =
+ fun (_Conn, Ch) ->
+ [amqp_channel:call(Ch, M) || M <- lists:reverse(ResourceDecls1)]
+ end,
+ return({#endpoint{uris = Brokers1,
+ resource_declaration = DeclareFun},
Pos});
parse_endpoint({Endpoint, _Pos}) ->
fail({require_list, Endpoint}).
-parse_uri({[Uri | Uris], Acc}) ->
+check_uri({[Uri | Uris], Acc}) ->
case amqp_uri:parse(Uri) of
- {ok, Params} ->
- return({Uris, [Params | Acc]});
+ {ok, _Params} ->
+ return({Uris, [Uri | Acc]});
{error, _} = Err ->
throw(Err)
end.
@@ -0,0 +1,40 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_dyn_worker_sup).
+-behaviour(supervisor2).
+
+-export([start_link/2, init/1]).
+
+-import(rabbit_misc, [pget/3]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+start_link(Name, Config) ->
+ supervisor2:start_link(?MODULE, [Name, Config]).
+
+%%----------------------------------------------------------------------------
+
+init([Name, Config]) ->
+ {ok, {{one_for_one, 3, 10},
+ [{Name,
+ {rabbit_shovel_worker, start_link, [dynamic, Name, Config]},
+ case pget(<<"reconnect-delay">>, Config, 1) of
+ N when is_integer(N) andalso N > 0 -> {permanent, N};
+ _ -> temporary
+ end,
+ 16#ffffffff, worker, [rabbit_shovel_worker]}]}}.
@@ -0,0 +1,59 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_dyn_worker_sup_sup).
+-behaviour(mirrored_supervisor).
+
+-export([start_link/0, init/1, adjust_or_start_child/2, stop_child/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-define(SUPERVISOR, ?MODULE).
+
+start_link() ->
+ {ok, Pid} = mirrored_supervisor:start_link({local, ?SUPERVISOR},
+ ?SUPERVISOR, ?MODULE, []),
+ Shovels = rabbit_runtime_parameters:list_component(<<"shovel">>),
+ [adjust_or_start_child({pget(vhost, Shovel), pget(name, Shovel)},
+ pget(value, Shovel)) || Shovel <- Shovels],
+ {ok, Pid}.
+
+adjust_or_start_child(Name, Def) ->
+ case child_exists(Name) of
+ true -> stop_child(Name);
+ false -> ok
+ end,
+ {ok, _Pid} =
+ mirrored_supervisor:start_child(
+ ?SUPERVISOR,
+ {Name, {rabbit_shovel_dyn_worker_sup, start_link, [Name, Def]},
+ transient, ?MAX_WAIT, worker, [rabbit_shovel_dyn_worker_sup]}),
+ ok.
+
+child_exists(Name) ->
+ lists:any(fun ({N, _, _, _}) -> N =:= Name end,
+ mirrored_supervisor:which_children(?SUPERVISOR)).
+
+stop_child(Name) ->
+ ok = mirrored_supervisor:terminate_child(?SUPERVISOR, Name),
+ ok = mirrored_supervisor:delete_child(?SUPERVISOR, Name),
+ rabbit_shovel_status:remove(Name).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_one, 3, 10}, []}}.
@@ -0,0 +1,183 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_shovel_parameters).
+-behaviour(rabbit_runtime_parameter).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_shovel.hrl").
+
+-export([validate/4, notify/4, notify_clear/3]).
+-export([register/0, parse/1]).
+
+-import(rabbit_misc, [pget/2, pget/3]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "shovel parameters"},
+ {mfa, {rabbit_shovel_parameters, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"shovel">>, ?MODULE).
+
+validate(_VHost, <<"shovel">>, Name, Def) ->
+ [case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of
+ zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};
+ one -> ok;
+ both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []}
+ end,
+ case pget2(<<"dest-exchange">>, <<"dest-queue">>, Def) of
+ zero -> ok;
+ one -> ok;
+ both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
+ end | rabbit_parameter_validation:proplist(Name, validation(), Def)];
+
+validate(_VHost, _Component, Name, _Term) ->
+ {error, "name not recognised: ~p", [Name]}.
+
+pget2(K1, K2, Defs) -> case {pget(K1, Defs), pget(K2, Defs)} of
+ {undefined, undefined} -> zero;
+ {undefined, _} -> one;
+ {_, undefined} -> one;
+ {_, _} -> both
+ end.
+
+notify(VHost, <<"shovel">>, Name, Definition) ->
+ rabbit_shovel_dyn_worker_sup_sup:adjust_or_start_child(
+ {VHost, Name}, Definition).
+
+notify_clear(VHost, <<"shovel">>, Name) ->
+ rabbit_shovel_dyn_worker_sup_sup:stop_child({VHost, Name}).
+
+%%----------------------------------------------------------------------------
+
+validation() ->
+ [{<<"src-uri">>, fun validate_uri/2, mandatory},
+ {<<"dest-uri">>, fun validate_uri/2, mandatory},
+ {<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
+ {<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
+ {<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional},
+ {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
+ {<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
+ {<<"ack-mode">>, rabbit_parameter_validation:enum(
+ ['no-ack', 'on-publish', 'on-confirm']), optional}
+ ].
+
+%% TODO this function is duplicated from federation. Move to amqp_uri module?
+validate_uri(Name, Term) when is_binary(Term) ->
+ case rabbit_parameter_validation:binary(Name, Term) of
+ ok -> case amqp_uri:parse(binary_to_list(Term)) of
+ {ok, _} -> ok;
+ {error, E} -> {error, "\"~s\" not a valid URI: ~p", [Term, E]}
+ end;
+ E -> E
+ end;
+validate_uri(Name, Term) ->
+ case rabbit_parameter_validation:list(Name, Term) of
+ ok -> case [V || U <- Term,
+ V <- [validate_uri(Name, U)],
+ element(1, V) =:= error] of
+ [] -> ok;
+ [E | _] -> E
+ end;
+ E -> E
+ end.
+
+%%----------------------------------------------------------------------------
+
+parse(Def) ->
+ SrcURIs = get_uris(<<"src-uri">>, Def),
+ DestURIs = get_uris(<<"dest-uri">>, Def),
+ SrcExch = pget(<<"src-exchange">>, Def, none),
+ SrcExchKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
+ SrcQueue = pget(<<"src-queue">>, Def, none),
+ DestExch = pget(<<"dest-exchange">>, Def, none),
+ DestExchKey = pget(<<"dest-exchange-key">>, Def, none),
+ DestQueue = pget(<<"dest-queue">>, Def, none),
+ %% [1] src-exchange-key is never ignored if src-exchange is set
+ {SrcFun, Queue} =
+ case SrcQueue of
+ none -> {fun (_Conn, Ch) ->
+ Ms = [#'queue.declare'{exclusive = true},
+ #'queue.bind'{routing_key = SrcExchKey,
+ exchange = SrcExch}],
+ [amqp_channel:call(Ch, M) || M <- Ms]
+ end, <<>>};
+ _ -> {fun (Conn, _Ch) ->
+ ensure_queue(Conn, SrcQueue)
+ end, SrcQueue}
+ end,
+ DestFun = fun (Conn, _Ch) ->
+ case DestQueue of
+ none -> ok;
+ _ -> ensure_queue(Conn, DestQueue)
+ end
+ end,
+ {Exch, Key} = case DestQueue of
+ none -> {DestExch, DestExchKey};
+ _ -> {<<>>, DestQueue}
+ end,
+ PubFun = fun (P0) -> P1 = case Exch of
+ none -> P0;
+ _ -> P0#'basic.publish'{exchange = Exch}
+ end,
+ case Key of
+ none -> P1;
+ _ -> P1#'basic.publish'{routing_key = Key}
+ end
+ end,
+ {ok, #shovel{
+ sources = #endpoint{uris = SrcURIs,
+ resource_declaration = SrcFun},
+ destinations = #endpoint{uris = DestURIs,
+ resource_declaration = DestFun},
+ prefetch_count = pget(<<"prefetch-count">>, Def, 1000),
+ ack_mode = translate_ack_mode(
+ pget(<<"ack-mode">>, Def, <<"on-confirm">>)),
+ publish_fields = PubFun,
+ publish_properties = fun (P) -> P end,
+ queue = Queue,
+ reconnect_delay = pget(<<"reconnect-delay">>, Def, 1)}}.
+
+get_uris(Key, Def) ->
+ URIs = case pget(Key, Def) of
+ B when is_binary(B) -> [B];
+ L when is_list(L) -> L
+ end,
+ [binary_to_list(URI) || URI <- URIs].
+
+translate_ack_mode(<<"on-confirm">>) -> on_confirm;
+translate_ack_mode(<<"on-publish">>) -> on_publish;
+translate_ack_mode(<<"no-ack">>) -> no_ack.
+
+ensure_queue(Conn, Queue) ->
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ try
+ amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
+ passive = true})
+ catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
+ {ok, Ch2} = amqp_connection:open_channel(Conn),
+ amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
+ durable = true}),
+ catch amqp_channel:close(Ch2)
+
+ after
+ catch amqp_channel:close(Ch)
+ end.
@@ -19,7 +19,7 @@
-export([start_link/0]).
--export([report/2, status/0]).
+-export([report/3, remove/1, status/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -28,13 +28,16 @@
-define(ETS_NAME, ?MODULE).
-record(state, {}).
--record(entry, {name, info, timestamp}).
+-record(entry, {name, type, info, timestamp}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-report(Name, Info) ->
- gen_server:cast(?SERVER, {report, Name, Info, calendar:local_time()}).
+report(Name, Type, Info) ->
+ gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
+
+remove(Name) ->
+ gen_server:cast(?SERVER, {remove, Name}).
status() ->
gen_server:call(?SERVER, status, infinity).
@@ -46,12 +49,17 @@ init([]) ->
handle_call(status, _From, State) ->
Entries = ets:tab2list(?ETS_NAME),
- {reply, [{Entry#entry.name, Entry#entry.info, Entry#entry.timestamp}
+ {reply, [{Entry#entry.name, Entry#entry.type, Entry#entry.info,
+ Entry#entry.timestamp}
|| Entry <- Entries], State}.
-handle_cast({report, Name, Info, Timestamp}, State) ->
- true = ets:insert(?ETS_NAME, #entry{name = Name, info = Info,
+handle_cast({report, Name, Type, Info, Timestamp}, State) ->
+ true = ets:insert(?ETS_NAME, #entry{name = Name, type = Type, info = Info,
timestamp = Timestamp}),
+ {noreply, State};
+
+handle_cast({remove, Name}, State) ->
+ true = ets:delete(?ETS_NAME, Name),
{noreply, State}.
handle_info(_Info, State) ->
Oops, something went wrong.

0 comments on commit 6dd2f4c

Please sign in to comment.