This is the public API of the JOBS framework.
Authors: : Ulf Wiger (ulf@wiger.net
).
add_counter/2 | Adds a named counter to the load regulator on the current node. |
add_group_rate/2 | Adds a group rate regulator to the load regulator on the current node. |
add_queue/2 | Installs a new queue in the load regulator on the current node. |
ask/1 | Asks permission to run a job of Type. |
ask_queue/2 | Sends a synchronous request to a specific queue. |
delete_counter/1 | Deletes a named counter from the load regulator on the current node. |
delete_group_rate/1 | |
delete_queue/1 | Deletes the named queue from the load regulator on the current node. |
dequeue/2 | Extracts up to N items from a passive queue. |
done/1 | Signals completion of an executed task. |
enqueue/2 | Inserts Item` into a passive queue.
|
info/1 | |
job_info/1 | Retrieves job-specific information from the Opaque data object. |
modify_counter/2 | |
modify_group_rate/2 | |
modify_queue/2 | Modifies queue parameters of existing queue. |
modify_regulator/4 | |
queue_info/1 | |
queue_info/2 | |
run/2 | Executes Function() when permission has been granted by job regulator. |
add_counter(Name, Options) -> ok
Adds a named counter to the load regulator on the current node.
Fails if there already is a counter the name Name
.
add_group_rate(Name, Options) -> ok
Adds a group rate regulator to the load regulator on the current node. Fails if there is already a group rate regulator of the same name.
add_queue(Name::any(), Options::[{Key, Value}]) -> ok
Installs a new queue in the load regulator on the current node.
Valid options are:
-
{regulators, Rs}
, whereRs
is a list of rate- or counter-based regulators. Valid regulators listed below. Default: []. -
{type, Type}
- type of queue. Valid types listed below. Default:fifo
. -
{action, Action}
- automatic action to perform for each request. Valid actions described below. Default:undefined
. -
{check_interval, I}
- If specified (in ms), this overrides the interval derived from any existing rate regulator. Note that regardless of how often the queue is checked, enough jobs will be dispatched at each interval to maintain the highest allowed rate possible, but the check interval may thus affect how many jobs are dispatched at the same time. Normally, this should not have to be specified. -
{max_time, T}
, specifies how long (in ms) a job is allowed to wait in the queue before it is automatically rejected. Ifundefined
, no limit is imposed. -
{max_size, S}
, indicates how many items can be queued before requests are automatically rejected. Strictly speaking, size is whatever the queue behavior reports as the size; in the default queue behavior, it is the number of elements in the queue. Ifundefined
, no limit is imposed. -
{mod, M}
, indicates which queue behavior to use. Default isjobs_queue
.
In addition, some 'abbreviated' options are supported:
-
{standard_rate, R}
- equivalent to[{regulators,[{rate,[{limit,R}, {modifiers,[{cpu,10},{memory,10}]}]}]}]
-
{standard_counter, C}
- equivalent to[{regulators,[{counter,[{limit,C}, {modifiers,[{cpu,10},{memory,10}]}]}]}]
-
{producer, F}
- equivalent to{type, {producer, F}}
-
passive
- equivalent to{type, {passive, fifo}}
-
approve | reject
- equivalent to{action, approve | reject}
Regulators
{rate, Opts}
- rate regulator. Valid options are
-
{limit, Limit}
whereLimit
is the maximum rate (requests/sec) -
{modifiers, Mods}
, control feedback-based regulation. See below. -
{name, Name}
, optional. The default name for the regulator is{rate, QueueName, N}
, whereN
is an index indicating which rate regulator in the list is referred. Currently, at most one rate regulator is allowed, soN
will always be1
.
{counter, Opts}
- counter regulator. Valid options are
-
{limit, Limit}
, whereLimit
is the number of concurrent jobs allowed. -
{increment, Incr}
, increment per job. Default is1
. -
{modifiers, Mods}
, control feedback-based regulation. See below.
-
{named_counter, Name, Incr}
, use an existing counter, incrementing it withIncr
for each job.Name
can either refer to a named top-level counter (seeadd_counter/2
), or a queue-specific counter (these are named{counter,Qname,N}
, whereN
is an index specifying their relative position in the regulators list - e.g. first or second counter). -
{group_rate, R}
, refers to a top-level group rateR
. Seeadd_group_rate/2
.
Types
-
fifo | lifo
- these are the types supported by the default queue behavior. While lifo may sound like an odd choice, it may have benefits for stochastic traffic with time constraints: there is no point to 'fairness', since requests cannot control their place in the queue, and choosing the 'freshest' job may increase overall goodness critera. -
{producer, F}
, the queue is not for incoming requests, but rather generates jobs. Valid options forF
are (for details, seejobs_prod_simpe
):
-
A fun of arity 0, indicating a stateless producer
-
A fun of arity 2, indicating a stateful producer
-
{M, F, A}
, indicating a stateless producer -
{Mod, Args}
indicating a stateful producer
{action, approve | reject}
, specifies an automatic response to every request. This can be used to either block a queue (reject
) or set it as a pass-through ('approve').
Modifiers
Jobs supports feedback-based modification of regulators.
The sampler framework sends feedback messages of type
[{Modifier, Local, Remote::[{node(), Level}]}]
.
Each regulator can specify a list of modifier instructions:
-
{Modifier, Local, Remote}
-Modifier
can be any label used by the samplers (seejobs_sampler
).Local
andRemote
indicate increments in percent by which to reduce the limit of the given regulator. TheLocal
increment is used for feedback info pertaining to the local node, and theRemote
increment is used for remote indicators.Local
is given as a percentage value (e.g.10
for10 %
). TheRemote
increment is either{avg, Percent}
or{max, Percent}
, indicating whether to respond to the average load of other nodes or to the most loaded node. The correction fromLocal
and the correction fromRemote
are summed before applying to the regulator limit. -
{Modifier, Local}
- same as above, but responding only to local indications, ignoring the load on remote nodes. -
{Modifier, F::function((Local, Remote) -> integer())}
- the functionF(Local, Remote)
is applied and expected to return a correction value, in percentage units. -
{Modifier, {Module, Function}}
-Module:Function(Local Remote)
is applied an expected to return a correction value in percentage units.
For example, if a rate regulator has a limit of 100
and has a modifier,
{cpu, 10}
, then a feedback message of {cpu, 2, _Remote}
will reduce
the rate limit by 2*10
percent, i.e. down to 80
.
Note that modifiers are always applied to the preset limit,
not the current limit. Thus, the next round of feedback messages in our
example will be applied to the preset limit of 100
, not the 80
that
resulted from the previous feedback messages. A correction value of 0
will reset the limit to the preset value.
If there are more than one modifier with the same name, the last one in the list will be the one used.
ask(Type) -> {ok, Opaque} | {error, Reason}
Asks permission to run a job of Type. Returns when permission granted.
The simplest way to have jobs regulated is to spawn a request per job.
The process should immediately call this function, and when granted
permission, execute the job, and then terminate.
If for some reason the process needs to remain, to execute more jobs,
it should explicitly call jobs:done(Opaque)
.
This is not strictly needed when regulation is rate-based, but as the
regulation strategy may change over time, it is the prudent thing to do.
ask_queue(QueueName, Request) -> Reply
Sends a synchronous request to a specific queue.
This function is mainly intended to be used for back-end processes that act as custom extensions to the load regulator itself. It should not be used by regular clients. Sophisticated queue behaviours could export gen_server-like logic allowing them to respond to synchronous calls, either for special inspection, or for influencing the queue state.
delete_counter(Name) -> boolean()
Deletes a named counter from the load regulator on the current node.
Returns true
if there was in fact such a counter; false
otherwise.
delete_group_rate(Name) -> any()
delete_queue(Name) -> boolean()
Deletes the named queue from the load regulator on the current node.
Returns true
if there was in fact such a queue; false
otherwise.
dequeue(Queue, N) -> [{JobID, Item}]
Extracts up to N
items from a passive queue
Note that this function only works on passive queues. An exception will be raised if the queue doesn't exist, or if it isn't passive.
This function will block until at least one item can be extracted from the
queue (see enqueue/2
). No more than N
items will be extracted.
The items returned are on the form {JobID, Item}
, where JobID
is in
the form of a microsecond timestamp
(see jobs_lib:timestamp_to_datetime/1
), and Item
is whatever was
provided in enqueue/2
.
done(Opaque) -> ok
Signals completion of an executed task.
This is used when the current process wants to submit more jobs to load regulation. It is mandatory when performing counter-based regulation (unless the process terminates after completing the task). It has no effect if the job type is purely rate-regulated.
enqueue(Queue, Item) -> ok | {error, Reason}
Inserts Item
into a passive queue.
Note that this function only works on passive queues. An exception will be raised if the queue doesn`t exist, or isn't passive.
Returns ok
if Item
was successfully entered into the queue,
{error, Reason}
otherwise (e.g. if the queue is full).
info(Item) -> any()
job_info(X1::Opaque) -> undefined | Info
Retrieves job-specific information from the Opaque
data object.
The queue could choose to return specific information that is passed to a granted job request. This could be used e.g. for load-balancing strategies.
modify_counter(CName, Opts) -> any()
modify_group_rate(GRName, Opts) -> any()
modify_queue(Name::any(), Options::[{Key, Value}]) -> ok | {error, Reason}
Modifies queue parameters of existing queue.
The queue parameters that can be modified are max_size
and max_time
.
modify_regulator(Type, QName, RegName, Opts) -> any()
queue_info(Name) -> any()
queue_info(Name, Item) -> any()
run(Queue::Type, Function::function()) -> Result
Executes Function() when permission has been granted by job regulator.
This is equivalent to performing the following sequence:
case jobs:ask(Type) of
{ok, Opaque} ->
try Function()
after
jobs:done(Opaque)
end;
{error, Reason} ->
erlang:error(Reason)
end.