Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'codel-dropped-packets' into next

Conflicts:
	README.md
  • Loading branch information...
commit bf72198befa7e084e5c4c25dc93f592d2dda666e 2 parents 85a1691 + 6562433
@jlouis authored
View
2  README.md
@@ -14,6 +14,8 @@ This project uses semantic versioning. Tags are declared like `vX.Y.Z`
* Y changes: These are additions to the API, but existing functionality has not been broken.
* Z changes: Bug fixes.
+See the document USING.md for a usage example.
+
# Changes
### v0.1.0 -> v1.0.0
View
4 TODO
@@ -0,0 +1,4 @@
+GH/jlouis/safetyvalve
+
+· EQC CoDel test: Vary Target and Interval :)
+· EQC CoDel test: Think about what properties to add.
View
42 USING.md
@@ -0,0 +1,42 @@
+# An example of using the safetyvalve system
+
+Let us say we have a system like the epgsql driver,
+
+ https://github.com/wg/epgsql
+
+This driver has no connection pooling, so we can write a system to do this. Basically, we keep a pool of connections and then when we want a connection, we request it from the pool. If there is no connection in the pool, then we create a new one.
+
+This will work, but it has the problem that if we suddenly get too many processes that needs to access the database, then we will overload it. Postgresql has a limit on how many connections we allow to it. So we want to use safetyvalve to do this.
+
+First, we create a queue. This is done by configuring the safetyvalve application in `sys.config` or similar place.
+
+ {safetyvalve,
+ {queues, [{pg_q, [{hz, 500}, {rate, 20}, {token_limit, 30}, {size, 30}, {concurrency, 32}]}]}},
+
+This says the following:
+
+* The queue has a rate-limit that generates up to 20 new connections every half second. The token-limit says there will at most be 30 tokens waiting, if no workers are doing work.
+* The concurrency level is 32, which means there will at most be 32 active connections to the Postgres database. This is what makes sure we cannot overload the database system.
+* The queue size is 30. So if there are too many workers, we allow up to 30 processes to wait around for a connection. If a process is #31 in the queue, then the process is rejected and will be told that the Postgres system is overloaded. The process can then itself make a choice of what to do. It may decide to wait a bit or fail permanently.
+
+Let us say we have the following:
+
+ with_pg(QueryFun) ->
+ {ok, C} = pg_pool:obtain_connection(),
+ QueryFun(C),
+ pg_pool:putback(C).
+
+which executes QueryFun in the context of a Connection. We assume the pool has a monitor on this process so it knows if the `QueryFun` dies.
+
+The only thing you need to change in order to make safetyvalve used here, is that you need to start the safetyvalve application first. This can best be done in your applications `.app` file by having it as a dependency and then have a release which boots it. Safetyvalve will pick up the queue definition of `pg_q` and arrange to start up this queue for us. So we can use that queue:
+
+ run_query(QueryFun) ->
+ case sv:run(pg_q, fun() -> with_pg(QueryFun) end) of
+ {ok, Res} -> {ok, Res};
+ {error, Reason} -> {error, Reason}
+ end.
+
+If we get to run, we will return `{ok, Res}` where Res is the result from the query. If it fails, what will be returned is `{error, Reason}`. The usual `Reason` is the atom `queue_full` which will tell you that there are 32 working on the database and that there are 30 in queue to work. This is where we will reject the job. Note the mention of `pg_q` in there which designates what queue to use. We can have multiple such queues if we have more than a single class of processes.
+
+What to do when a job is rejected is up to you. Either you can reject the job upwards. If you are using this to limit certain in-bound HTTP requests, you can return a 503 to designtate to the caller we have overload. Or you can wait for some time and try again and retry. Whenever you retry, you can increase a backoff. For real-time queries, it is usually best to reject and fail the request quickly. A user is usually not going to wait. For a batch-job, it is usually better to wait and try again.
+
View
99 eqc_test/sv_codel_eqc.erl
@@ -3,42 +3,93 @@
-compile([export_all]).
-include_lib("eqc/include/eqc.hrl").
--eqc_group_commands(true).
--record(state,
- { ttd = 10,
- t = 0,
- st
- }).
-
-g_time_advance(#state { ttd = N, t = T } = State) ->
- ?LET(K, choose(0, N),
- State#state { ttid = N - K, t = T + K }).
+-record(model,
+ { t = 30000, st }).
+
+g_sv_codel_args() ->
+ ?LET(T, choose(5, 50),
+ [T, choose(T, 200)]).
+
+g_cmd_advance_time(M) ->
+ {call, ?MODULE, advance_time, [M, g_time_advance()]}.
+
+g_time_advance() ->
+ choose(1, 1000).
g_model(0, todo) ->
- oneof([{call, ?MODULE, new, []}]);
+ oneof([{call, ?MODULE, new, g_sv_codel_args()}]);
g_model(N, todo) ->
frequency([
{1, g_model(0, todo)},
{N, ?LET(M, g_model(max(0, N-2), todo),
frequency(
- [{200, {call, ?MODULE, advance_time, M, g_time_advance(M)}} || not boundary(M)] ++
- [{200, {call, ?MODULE, enqueue, [M]}}] ++
- [{100, {call, ?MODULE, dequeue, [M]}} || boundary(M)]
- ])}]).
+ [{100, g_cmd_advance_time(M)}] ++
+ [{100, {call, ?MODULE, enqueue, [M]}}] ++
+ [{100, {call, ?MODULE, dequeue, [M]}}]))}]).
+
+
+g_model() ->
+ ?SIZED(Size, g_model(Size, todo)).
+
+%% Properties
+%% ----------------------------------------------
+
+%% Verify that the queue runs if we blindly execute it
+prop_termination() ->
+ ?FORALL(M, g_model(),
+ begin
+ _R = eval(M),
+ true
+ end).
+
+%% Various observations on a CoDel queue
+prop_observations() ->
+ ?FORALL(M, g_model(),
+ begin
+ #model { t = T, st = ST} = eval(M),
+ case sv_codel:dequeue(T+1, ST) of
+ {empty, _Dropped, EmptyState} ->
+ verify_empty(EmptyState);
+ {_Pkt, [_ | _], CoDelState} ->
+ verify_dropped(CoDelState);
+ {drop, [_Pkt], CoDelState} ->
+ classify(true, start_drop, true);
+ {_Pkt, _Dropped, _SomeState} ->
+ classify(true, dequeue, true)
+ end
+ end).
-boundary(#state { ttd = 0 }) -> true;
-boundary(_) -> false.
+
+verify_dropped(CoDelState) ->
+ %% We dropped packets, our state must be dropping
+ PL = sv_codel:qstate(CoDelState),
+ classify(true, dropped,
+ proplists:get_value(dropping, PL)).
+
+verify_empty(EmptyState) ->
+ %% Empty queues are never dropping and they reset first-above-time
+ PL = sv_codel:qstate(EmptyState),
+ classify(true, empty_queue,
+ (not proplists:get_value(dropping, PL))
+ andalso proplists:get_value(first_above_time, PL) == 0).
%% Operations
%% ----------------------------------------------
-new() ->
- #state { ttd = 10, t = 0, st = sv_codel:init() }.
+new(Target, Interval) ->
+ #model { t = 0, st = sv_codel:init(Target, Interval) }.
+
+advance_time(#model { t = T } = State, K) ->
+ State#model { t = T + K }.
-enqueue(#state { t = T, st = ST } = State) ->
- State#state { t = T+1, st = sv_codel:enqueue({pkt, T}, T, ST) }.
+enqueue(#model { t = T, st = ST } = State) ->
+ State#model { t = T+1, st = sv_codel:enqueue({pkt, T}, T, ST) }.
-dequeue(#state { t = T, st = ST }) ->
- {_, ST2} = sv_codel:dequeue(T, ST),
- State#state { t = T+1, ttd = 10, st = ST2 }.
+dequeue(#model { t = T, st = ST } = State) ->
+ ST2 =
+ case sv_codel:dequeue(T, ST) of
+ {ok, _, _, S} -> S;
+ {empty, _, S} -> S
+ end,
+ State#model { t = T+1, st = ST2 }.
View
228 src/sv_codel.erl
@@ -1,85 +1,195 @@
%%% @doc This is a loose translation of the following link from ACM:
%%% http://queue.acm.org/appendices/codel.html
+%%% http://pollere.net/CoDelnotes.html
+%%% http://pollere.net/CoDel.html
+%%%
%%% The document you want to read is
%%% "Controlling queue Delay" Kathleen Nichols, Van Jacobson, http://queue.acm.org/detail.cfm?id=2209336
+%%% But also note that some of the other papers are interesting. Especially Kathie Nichols notes are of
+%%% interest.
+%%%
%%% @end
-module(sv_codel).
--export([init/0, enqueue/3, dequeue/2]).
+%% Public API
+-export([init/2, enqueue/3, dequeue/2]).
--record(state,
- { queue = queue:new(),
- dropping = false,
- drop_next = 0,
- interval = 100, % ms
- target = 5, %ms
- first_above_time = 0,
- count = 0
+%% Scrutiny
+-export([qstate/1]).
+
+-type task() :: term().
+
+%% Internal state
+-record(state, {
+ %% The underlying queue to use. For now, since we are mainly in a test phase, we just use a standard
+ %% functional queue. But later the plan is to use a module here and then call the right kind of queue
+ %% functions for that module.
+ queue = queue:new(),
+
+ %% The `dropping' field tracks if the CoDel system is in a dropping state or not.
+ dropping = false,
+
+ %% If we are dropping, this value tracks the point in time where the next packet should
+ %% be dropped from the queue.
+ drop_next = 0,
+
+ %% First above time tracks when we first began seeing too much delay imposed by the queue.
+ %% This value may be 0 in which case it means we have not seen such a delay.
+ first_above_time = 0,
+
+ %% This variable tracks how many packets/jobs were recently dropped from the queue.
+ %% The value decays over time if no packets are dropped and is used to manipulate the control
+ %% law of the queue.
+ count = 0,
+
+ %% The `interval' and `target' are configurable parameters, described in @see init/2.
+ interval = 100, % ms
+ target = 5 %ms
}).
-init() -> #state{}.
+%% @doc Look at the queue state as a proplist
+%% @end
+-spec qstate(#state{}) -> [{atom(), term()}].
+qstate(#state {
+ queue = Q,
+ dropping = Drop,
+ drop_next = DN,
+ interval = I,
+ target = T,
+ first_above_time = FAT,
+ count = C
+ }) ->
+ [{queue, Q},
+ {dropping, Drop},
+ {drop_next, DN},
+ {interval, I},
+ {target, T},
+ {first_above_time, FAT},
+ {count, C}].
+
+%% @doc Initialize the CoDel state
+%% <p>The value `Target' defines the delay target in ms. If the queue has a sojourn-time through the queue
+%% which is above this value, then the queue begins to consider dropping packets.</p>
+%% <p>The value `Interval' is the window we have to be above `Target' before we consider that there may be
+%% problems. As such, it provides a hysteresis on the queue as well and small increases in latency does
+%% not affect the queue.</p>
+%% <p>Note that the interval makes sure we can use the queue as "good queue". If we get a sudden small
+%% spike in jobs, then the queue will make sure they get smoothed out and processed with no loss of jobs.
+%% But it also protects against "bad queue" where a standing queue won't dissipate due to consistent
+%% overload of the system</p>
+%% @end
+-spec init(pos_integer(), pos_integer()) -> #state{}.
+init(Target, Interval) when Target > Interval -> exit(misconfiguration);
+init(Target, Interval) -> #state{ target = Target, interval = Interval }.
+%% @doc Enqueue a packet
+%% <p>Enqueue packet `Pkt' at time `TS' into the queue.</p>
+%% @end
+-spec enqueue(task(), term(), #state{}) -> #state{}.
enqueue(Pkt, TS, #state { queue = Q } = State) ->
State#state { queue = queue:in({Pkt, TS}, Q) }.
+%% @doc Dequeue a packet from the CoDel system
+%% Given a point in time, `Now' and a CoDel `State', extract the next task from it.
+%% @end
+-spec dequeue(Now, InState) ->
+ {empty, [Pkt], OutState} | {drop, [Pkt], OutState} | {Pkt, [Pkt], OutState}
+ when
+ Now :: term(),
+ Pkt :: task(),
+ InState :: #state{},
+ OutState :: #state{}.
+dequeue(Now, State) ->
+ dequeue_(Now, dodequeue(Now, State)).
+
+%% Internal functions
+%% ---------------------------------------------------------
+%% The control law defines the packet drop rate. Given a time T we drop the next packet at T+I, where
+%% I is the interval. Now, if we need to drop yet another packet, we drop it at I/math:sqrt(C) where C
+%% is the number of packets we have dropped so far in this round.
control_law(T, I, C) ->
T + I / math:sqrt(C).
-dodequeue(Now, #state {
- queue = Q,
- target = Target,
- first_above_time = FirstAbove,
- interval = Interval } = State) ->
+%% This is a helper function. It dequeues from the underlying queue and then analyzes the Sojourn
+%% time together with the next function, dodequeue_.
+dodequeue(Now, #state { queue = Q } = State) ->
case queue:out(Q) of
{empty, NQ} ->
{nodrop, empty, State#state { first_above_time = 0, queue = NQ }};
{{value, {Pkt, InT}}, NQ} ->
- case Now - InT of
- Sojourn when Sojourn < Target ->
- {nodrop, Pkt, State#state { first_above_time = 0, queue = NQ }};
- _Sojourn when FirstAbove == 0 ->
- {nodrop, Pkt, State#state { first_above_time = Now + Interval, queue = NQ}};
- _Sojourn when Now >= FirstAbove ->
- {drop, Pkt, State#state { queue = NQ }};
- _Sojourn -> {nodrop, Pkt, State#state{ queue = NQ}}
- end
+ Sojourn = Now - InT,
+
+ dodequeue_(Now, Pkt, Sojourn, State#state { queue = NQ })
end.
-dequeue(Now, #state { dropping = Dropping } = State) ->
- case dodequeue(Now, State) of
- {nodrop, empty, NState} ->
- {empty, NState#state { dropping = false }};
- {nodrop, Pkt, #state {} = NState} when Dropping ->
- {Pkt, NState#state { dropping = false }};
- {drop, Pkt, #state { drop_next = DropNext } = NState} when Now >= DropNext ->
- dequeue_drop_next(Now, Pkt, NState);
- {nodrop, Pkt, NState} when not Dropping ->
- {Pkt, NState};
- {drop, _Pkt, #state { drop_next = DN, interval = I, first_above_time = FirstAbove } = NState}
- when not Dropping, Now - DN < I orelse Now - FirstAbove >= I ->
- dequeue_start_drop(Now, NState);
- {drop, Pkt, NState} when not Dropping ->
- {Pkt, NState}
- end.
+%% Case split:
+%% The sojourn time through the queue is less than our target value. Thus, we should not drop, and
+%% we reset when we were first above.
+dodequeue_(_Now, Pkt, Sojourn, #state { target = T } = State) when Sojourn < T ->
+ {nodrop, Pkt, State#state { first_above_time = 0 }};
+%% We are above target, but this is the first time we are above target. We set up the point in time when
+%% we went above the target to start tracking this.
+dodequeue_(Now, Pkt, _Sojourn, #state { first_above_time = FAT, interval = I } = State) when FAT == 0 ->
+ {nodrop, Pkt, State#state { first_above_time = Now + I }};
+%% We have been above target for more than one interval. This is when we need to start dropping.
+dodequeue_(Now, Pkt, _Sojourn, #state { first_above_time = FAT } = State) when Now >= FAT ->
+ {drop, Pkt, State};
+%% We are above target, but we have not yet been above target for a complete interval. Wait and see
+%% what happens, but don't begin dropping packets just yet.
+dodequeue_(_Now, Pkt, _Sojourn, State) ->
+ {nodrop, Pkt, State}.
+
+
+%% Dequeue worker. This drives the meat of the dequeue steps.
+%% Case split:
+%% We are in the dropping state, but are transitioning to not dropping.
+dequeue_(Now, {nodrop, Pkt, #state { dropping = true } = State}) ->
+ dequeue_drop_next(Now, Pkt, State#state { dropping = false }, []);
+%% We are in the dropping state and are to continue dropping.
+dequeue_(Now, {drop, Pkt, #state { dropping = true } = State}) ->
+ dequeue_drop_next(Now, Pkt, State, []);
+%% We are not in the dropping state, but should start dropping.
+dequeue_(Now, {drop, Pkt, #state { dropping = false } = State}) ->
+ dequeue_start_drop(Now, Pkt, State);
+%% Default case for normal operation.
+dequeue_(_Now, {nodrop, Pkt, #state { dropping = false } = State}) ->
+ {Pkt, [], State}.
-
-
-dequeue_drop_next(Now, _DroppedPkt, #state { drop_next = DN, dropping = true, count = C } = State) ->
- case dodequeue(Now, State#state { count = C+1 }) of
- {nodrop, Res, NState} ->
- {Res, NState#state { dropping = false }};
- {drop, Res, #state { count = NC, interval = I } = NState} ->
- dequeue_drop_next(Now, Res, NState#state { drop_next = control_law(DN, I, NC) })
- end;
-dequeue_drop_next(_Now, Pkt, #state { dropping = false } = State) ->
- {Pkt, State}.
-
-dequeue_start_drop(Now, #state { drop_next = DN, interval = Interval, count = Count } = State)
- when Now - DN < Interval ->
- State#state {
+%% Consider dropping the next packet from the queue. This function drives a loop until the next timepoint
+%% where we should drop is in the future. The helper dequeue_drop_next_/3 carries out the book-keeping
+dequeue_drop_next(Now, Pkt, #state { drop_next = DN, dropping = true } = State, Dropped)
+ when Now >= DN ->
+ dequeue_drop_next_(Now, dodequeue(Now, State), [Pkt | Dropped]);
+dequeue_drop_next(_Now, Pkt, State, Dropped) ->
+ {Pkt, Dropped, State}.
+
+%% If the Sojourn time improves, we leave the dropping state.
+dequeue_drop_next_(Now, {nodrop, Pkt, State}, Dropped) ->
+ dequeue_drop_next(Now, Pkt, State#state { dropping = false }, Dropped);
+%% We are still to drop packets, so update the count and the control law for the next loop round.
+dequeue_drop_next_(
+ Now,
+ {drop, Pkt, #state { count = C, interval = I, drop_next = DN } = State},
+ Dropped) ->
+ dequeue_drop_next(
+ Now,
+ Pkt,
+ State#state { count = C + 1, drop_next = control_law(DN, I, C + 1) },
+ Dropped).
+
+%% Function for setting up the dropping state. When we start dropping, we evaluate a bit on
+%% how long ago we last dropped. If we did this recently, we do not start off from the bottom of
+%% the control law, but rather pick a point a bit up the function. On the other hand, if it is a long time
+%% ago, we just pick the usual starting point of 1.
+dequeue_start_drop(Now, Pkt, #state { drop_next = DN, interval = Interval, count = Count } = State)
+ when Now - DN < Interval, Count > 2 ->
+ {drop, [Pkt], State#state {
+ dropping = true,
+ count = Count - 2,
+ drop_next = control_law(Now, Interval, Count - 2) }};
+dequeue_start_drop(Now, Pkt, #state { interval = I } = State) ->
+ {drop, [Pkt], State#state {
dropping = true,
- count = case Count > 2 of true -> Count - 2; false -> 1 end,
- drop_next = control_law(Now, Interval, Count) };
-dequeue_start_drop(Now, #state { interval = Interval, count = Count } = State) ->
- State#state { dropping = true, count = 1, drop_next = control_law(Now, Interval, Count) }.
+ count = 1,
+ drop_next = control_law(Now, I, 1) }}.
Please sign in to comment.
Something went wrong with that request. Please try again.