Permalink
Browse files

Merge branch 'counters'

  • Loading branch information...
2 parents 4e4eeae + 627d4bb commit 38be26dfb4190aac25e453b57876b97f420d102d @etrepum etrepum committed Nov 9, 2011
Showing with 361 additions and 13 deletions.
  1. +4 −10 Makefile
  2. +3 −1 rebar.config
  3. +9 −0 src/statebox.erl
  4. +6 −2 src/statebox_clock.erl
  5. +100 −0 src/statebox_counter.erl
  6. +17 −0 src/statebox_identity.erl
  7. +207 −0 test/statebox_counter_tests.erl
  8. +15 −0 test/statebox_identity_tests.erl
View
@@ -1,21 +1,15 @@
-REBAR=./rebar
+REBAR ?= $(shell which rebar 2>/dev/null || which ./rebar)
+
+.PHONY: all edoc test clean
all:
@$(REBAR) get-deps compile
edoc:
- @$(REBAR) doc
+ @$(REBAR) doc skip_deps=true
test:
- @rm -rf .eunit
- @mkdir -p .eunit
@$(REBAR) skip_deps=true eunit
clean:
@$(REBAR) clean
-
-build_plt:
- @$(REBAR) build-plt
-
-dialyzer:
- @$(REBAR) dialyze
View
@@ -3,7 +3,9 @@
debug_info]}.
{deps,
[{meck, ".*",
- {git, "https://github.com/esl/meck.git", ""}}]}.
+ {git, "git://github.com/esl/meck.git", "master"}},
+ {proper, ".*",
+ {git, "git://github.com/manopapad/proper", "master"}}]}.
{cover_enabled, true}.
{clean_files, ["*.eunit", "ebin/*.beam"]}.
{eunit_opts, [verbose, {report,{eunit_surefire,[{dir,"."}]}}]}.
View
@@ -127,6 +127,8 @@ apply_op({F, [A]}, Data) when is_function(F, 2) ->
F(A, Data);
apply_op({F, [A, B]}, Data) when is_function(F, 3) ->
F(A, B, Data);
+apply_op({F, [A, B, C]}, Data) when is_function(F, 4) ->
+ F(A, B, C, Data);
apply_op({F, A}, Data) when is_function(F) ->
apply(F, A ++ [Data]);
apply_op({M, F, [A]}, Data) ->
@@ -226,6 +228,13 @@ batch_apply_op_test() ->
value(S10)),
ok.
+apply_op_5_test() ->
+ ?assertEqual(
+ [a, b, c, d, e],
+ statebox:apply_op(
+ {fun (A, B, C, D, E) -> [A, B, C, D, E] end, [a, b, c, d]},
+ e)).
+
alt_apply_op_test() ->
L = [fun (N=1) -> {ordsets, add_element, [N]} end,
fun (N=2) ->
View
@@ -2,7 +2,7 @@
%% <code>statebox:modify/2</code>.
-module(statebox_clock).
--export([timestamp/0, now_to_msec/1]).
+-export([timestamp/0, now_to_msec/1, now/0]).
-define(KILO, 1000).
-define(MEGA, 1000000).
@@ -15,6 +15,10 @@ timestamp() ->
%% @doc Converts given time of now() format to UNIX epoch timestamp in
%% integer milliseconds.
--spec now_to_msec({integer(), integer(), integer()}) -> integer().
+-spec now_to_msec(calendar:t_now()) -> integer().
now_to_msec({MegaSecs, Secs, MicroSecs}) ->
trunc(((MegaSecs * ?MEGA) + Secs + (MicroSecs / ?MEGA)) * ?KILO).
+
+-spec now() -> calendar:t_now().
+now() ->
+ erlang:now().
View
@@ -0,0 +1,100 @@
+%% @doc Integer counter based on an ordered list of counter events.
+%%
+%% A counter is stored as an orddict of counter events. Each counter
+%% event has a unique key based on the timestamp and some entropy, and it
+%% stores the delta from the inc operation. The value of a counter is the
+%% sum of all these deltas.
+%%
+%% As an optimization, counter events older than a given age are coalesced
+%% to a single counter event with a key in the form of
+%% <code>{timestamp(), 'acc'}</code>.
+
+-module(statebox_counter).
+-export([value/1, merge/1, accumulate/2, inc/3]).
+-export([f_inc_acc/2, f_inc_acc/3, op_inc_acc/4]).
+
+-type op() :: statebox:op().
+-type timestamp() :: statebox_clock:timestamp().
+-type timedelta() :: statebox:timedelta().
+-type counter_id() :: statebox_identity:entropy() | acc.
+-type counter_key() :: {timestamp(), counter_id()}.
+-type counter_op() :: {counter_key(), integer()}.
+-type counter() :: [counter_op()].
+
+%% @doc Return the value of the counter (the sum of all counter event deltas).
+-spec value(counter()) -> integer().
+value([]) ->
+ 0;
+value([{_Key, Value} | Rest]) ->
+ Value + value(Rest).
+
+%% @doc Merge the given list of counters and return a new counter
+%% with the union of that history.
+-spec merge([counter()]) -> counter().
+merge([Counter]) ->
+ Counter;
+merge(Counters) ->
+ orddict:from_list(merge_prune(Counters)).
+
+%% @doc Accumulate all counter events older than <code>Timestamp</code> to
+%% the key <code>{Timestamp, acc}</code>. If there is already an
+%% <code>acc</code> at or before <code>Timestamp</code> this is a no-op.
+-spec accumulate(timestamp(), counter()) -> counter().
+accumulate(Timestamp, Counter=[{{T0, acc}, _} | _]) when Timestamp =< T0 ->
+ Counter;
+accumulate(Timestamp, Counter) ->
+ accumulate(Timestamp, Counter, 0).
+
+%% @doc Return a new counter with the given counter event. If there is
+%% an <code>acc</code> at or before the timestamp of the given key then
+%% this is a no-op.
+-spec inc(counter_key(), integer(), counter()) -> counter().
+inc({T1, _Id1}, _Value, Counter=[{{T0, acc}, _} | _Rest]) when T1 =< T0 ->
+ Counter;
+inc(Key, Value, Counter) ->
+ orddict:store(Key, Value, Counter).
+
+%% @equiv f_inc_acc(Value, Age, {statebox_clock:timestamp(),
+%% statebox_identity:entropy()})
+-spec f_inc_acc(integer(), timedelta()) -> op().
+f_inc_acc(Value, Age) ->
+ Key = {statebox_clock:timestamp(), statebox_identity:entropy()},
+ f_inc_acc(Value, Age, Key).
+
+%% @doc Return a statebox event to increment and accumulate the counter.
+%% <code>Value</code> is the delta,
+%% <code>Age</code> is the maximum age of counter events in milliseconds
+%% (this should be longer than the amount of time you expect your cluster to
+%% reach a consistent state),
+%% <code>Key</code> is the counter event key.
+-spec f_inc_acc(integer(), timedelta(), counter_key()) -> op().
+f_inc_acc(Value, Age, Key={Timestamp, _Id}) ->
+ {fun ?MODULE:op_inc_acc/4, [Timestamp - Age, Key, Value]}.
+
+%% @private
+op_inc_acc(Timestamp, Key, Value, Counter) ->
+ accumulate(Timestamp, inc(Key, Value, Counter)).
+
+%% Internal API
+
+merge_prune(Counters) ->
+ %% Merge of all of the counters and prune all entries older than the
+ %% newest {_, acc}.
+ prune(lists:umerge(Counters)).
+
+prune(All) ->
+ prune(All, All).
+
+prune(Here=[{{_Ts, acc}, _V} | Rest], _Last) ->
+ prune(Rest, Here);
+prune([_ | Rest], Last) ->
+ prune(Rest, Last);
+prune([], Last) ->
+ Last.
+
+accumulate(Timestamp, [{{T1, _Id}, Value} | Rest], Sum) when T1 =< Timestamp ->
+ %% Roll up old counter events
+ accumulate(Timestamp, Rest, Value + Sum);
+accumulate(Timestamp, Counter, Sum) ->
+ %% Return the new counter
+ inc({Timestamp, acc}, Sum, Counter).
View
@@ -0,0 +1,17 @@
+%% @doc Functions for uniquely identifying events.
+-module(statebox_identity).
+
+-export([entropy/2, entropy/0]).
+
+-type entropy() :: 1..4294967296.
+
+%% @equiv entropy(node(), statebox_clock:now())
+-spec entropy() -> entropy().
+entropy() ->
+ entropy(node(), statebox_clock:now()).
+
+%% @doc Return an integer that can be expected to be reasonably unique
+%% at a given msec timestamp.
+-spec entropy(node(), calendar:t_now()) -> entropy().
+entropy(Node, Now) ->
+ erlang:phash2({Node, Now}).
Oops, something went wrong.

0 comments on commit 38be26d

Please sign in to comment.