Skip to content

Commit

Permalink
Support dinamically created queues
Browse files Browse the repository at this point in the history
introduce sv:new/2, new/1 methods that allow dynamic
creation of named or anomymous queues, each with a
separate configuration
  • Loading branch information
lrascao committed Apr 8, 2015
1 parent d303edb commit 32c1b65
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 34 deletions.
19 changes: 9 additions & 10 deletions src/safetyvalve_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@
start(_StartType, _StartArgs) ->
{ok, MainPid} = safetyvalve_sup:start_link(),

{ok, Queues} = application:get_env(safetyvalve, queues),

[ok = start_queue(Q) || Q <- Queues],
%% empty static queue definition is allowed
Queues = case application:get_env(safetyvalve, queues, undefined) of
undefined -> [];
Qs -> Qs
end,

%% launch statically configured queues
[{ok, _QueuePid} = sv:new(Name, Conf) || {Name, Conf} <- Queues],
{ok, MainPid}.

stop(_State) ->
ok.

%% ----------------------------------------------------------------------
start_queue({QName, Conf}) ->
lager:info("Safetyvalve starting up queue: ~p", [QName]),
{ok, _Pid} = safetyvalve_sup:start_queue(QName, Conf),
stop(_State) ->
ok.
38 changes: 28 additions & 10 deletions src/safetyvalve_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,49 @@
-behaviour(supervisor).

%% API
-export([start_link/0, start_queue/2]).
-export([start_link/0, start_queue/2, stop_queue/1]).

%% Supervisor callbacks
-export([init/1]).

%% Helper macro for declaring children of supervisor
-define(QCHILD(I, C, Type), {I, {sv_queue, start_link, [I, C]},
permanent, 5000, Type, [I]}).

%% ===================================================================
%% API functions
%% ===================================================================

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

start_queue(Name, Conf) ->
-spec start_queue(Queue, Conf) -> {ok, pid()} | {error, invalid_configuration}
when
Queue :: undefined | atom(),
Conf :: proplists:proplist().
start_queue(Queue, Conf) ->
C = sv_queue:parse_configuration(Conf),
supervisor:start_child(?MODULE,
?QCHILD(Name, C, worker)).
case supervisor:start_child(?MODULE, [Queue, C]) of
{ok, Pid} -> {ok, Pid};
{error, {already_started, Pid}} ->
%% a process is already running registered with the
%% provided name, we must now obtain it's current
%% configuration and check that it is indeed the same
case sv_queue:q(Pid, configuration) of
C -> {ok, Pid};
_ -> {error, invalid_configuration}
end
end.

-spec stop_queue(Queue) -> ok | {error, not_found | simple_one_for_one}
when
Queue :: undefined | atom().
stop_queue(Queue) when is_pid(Queue) ->
supervisor:terminate_child(?MODULE, Queue);
stop_queue(Queue) when is_atom(Queue) ->
supervisor:terminate_child(?MODULE, whereis(Queue)).

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================

init([]) ->
{ok, { {one_for_one, 3, 600}, []} }.

{ok, { {simple_one_for_one, 3, 600},
[{queue, {sv_queue, start_link, []},
transient, 5000, worker, [sv_queue]}] } }.
35 changes: 29 additions & 6 deletions src/sv.erl
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
-module(sv).

-export([timestamp/0, ask/2, done/3]).
-export([timestamp/0, new/1, new/2, destroy/1, ask/2, done/3]).
-export([run/2]).
%% Internal API
-export([report/2]).

%% @doc Creates a new queue
%% @end
-spec new(Conf) -> {ok, pid()}
when
Conf :: proplists:proplist().
new(Conf) ->
new(undefined, Conf).

-spec new(Queue, Conf) -> {ok, pid()}
when
Queue :: undefined | atom(),
Conf :: proplists:proplist().
new(Queue, Conf) ->
{ok, Pid} = safetyvalve_sup:start_queue(Queue, Conf),
{ok, Pid}.

%% @doc Destroys a previously created queue
%% @end
-spec destroy(Queue) -> ok | {error, not_found | simple_one_for_one}
when
Queue :: undefined | atom().
destroy(Queue) ->
safetyvalve_sup:stop_queue(Queue).

%% @doc Enqueue a job on a queue
%% <p>Try to run `Fun' on queue `Name'. The `Fun' is run at time `TP'.
%% This means that either the
Expand All @@ -16,7 +40,7 @@

-spec run(Name, Fun) -> {ok, Result} | {error, Reason}
when
Name :: atom(),
Name :: atom() | pid(),
Fun :: fun (() -> term),
Result :: term(),
Reason :: term().
Expand All @@ -25,7 +49,7 @@ run(Name, Fun) ->
case sv_queue:ask(Name, StartPoint) of
{go, Ref} ->
Res = Fun(),
EndPoint = timestamp(),
EndPoint = timestamp(),
sv_queue:done(Name, Ref, EndPoint),
{ok, Res};
{error, Reason} ->
Expand All @@ -44,13 +68,13 @@ run(Name, Fun) ->
%% @end
-spec ask(Queue, T) -> {go, Ref} | {error, Reason}
when
Queue :: atom(),
Queue :: atom() | pid(),
T :: integer(),
Ref :: term(), % Opaque
Reason :: term().
ask(QN, T) ->
sv_queue:ask(QN, T).

%% @doc done/3 relinquishes a resource yet again to the queue
%% <p>Call this function when you are done with using a resource. @see ask/2 for the
%% documentation of how to invoke this function.</p>
Expand All @@ -74,4 +98,3 @@ timestamp() ->
%% {os:timestamp(), self()} or {os:timestamp(), ref()}.
{Mega, Secs, Micro} = erlang:now(),
(Mega * 1000000 + Secs) * 1000000 + Micro.

22 changes: 14 additions & 8 deletions src/sv_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(SERVER, ?MODULE).
-define(SERVER, ?MODULE).

-record(conf, { hz, rate, token_limit, size, concurrency, queue_type, queue_args }).
-type conf() :: #conf{}.

-record(state, {
%% Conf is the configuration object this queue is configured
%% with. It is a place to query about conf options
conf,

%% Queue reference to the queue we have of workers that are
%% waiting to be allowed to execute. Also maintains the
%% current queue size.
queue,

%% Tokens is a counter of how many tokens that are in the
%% Token Bucket Regulator right now.
tokens,

%% Tasks is a set which contains the monitor references on
%% the currently executing tasks. It is used to make sure
%% that we maintain the concurrency limit correctly if tasks
Expand All @@ -63,11 +65,14 @@
%%%===================================================================

%% @doc
%% Starts the server
%% Starts the server, can either be registered or not
%% @end
start_link(undefined, Conf) ->
gen_server:start_link(?MODULE, Conf, []);
start_link(Name, Conf) ->
gen_server:start_link({local, Name}, ?MODULE, [Conf], []).
gen_server:start_link({local, Name}, ?MODULE, Conf, []).

-spec parse_configuration(proplists:proplist()) -> conf().
parse_configuration(Conf) ->
{QueueType, QueueArgs} =
case proplists:get_value(queue_type, Conf, sv_queue_ets) of
Expand All @@ -90,7 +95,6 @@ ask(Name) ->
ask(Name, Timestamp) ->
gen_server:call(Name, {ask, Timestamp}, infinity).


done(Name, Ref, Timestamp) ->
gen_server:call(Name, {done, Timestamp, Ref}, infinity).

Expand All @@ -104,7 +108,7 @@ q(Name, Atom) ->
%%%===================================================================

%% @private
init([Conf]) ->
init(Conf) ->
set_timer(Conf),
QT = Conf#conf.queue_type,
QArgs = Conf#conf.queue_args,
Expand All @@ -117,6 +121,8 @@ init([Conf]) ->
%% @private
handle_call({q, tokens}, _, #state { tokens = K } = State) ->
{reply, K, State};
handle_call({q, configuration}, _, #state { conf = Conf } = State) ->
{reply, Conf, State};
handle_call({ask, Timestamp}, {Pid, _Tag} = From,
#state {
tokens = K,
Expand Down

0 comments on commit 32c1b65

Please sign in to comment.