Permalink
Browse files

Merge branch 'master' into gh197-delete-bucket-properties

  • Loading branch information...
2 parents a8db994 + 86d5148 commit af318b5224c2271bd8c691e371b6925640404865 @jerith committed Aug 28, 2012
View
1 .travis.yml
@@ -1,5 +1,6 @@
language: erlang
notifications:
+ webhooks: http://basho-engbot.herokuapp.com/travis?key=72ce513a4a26166521f60f72511bfb905329db87
email: eng@basho.com
otp_release:
- R15B01
View
14 ebin/riak_core.app
@@ -3,16 +3,16 @@
{application, riak_core,
[
{description, "Riak Core"},
- {vsn, "1.1.2"},
+ {vsn, "1.2.0"},
{modules, [
app_helper,
bloom,
chash,
gen_nb_server,
- gen_server2,
+ riak_core_gen_server,
json_pp,
merkerl,
- priority_queue,
+ riak_core_priority_queue,
process_proxy,
riak_core_gossip_legacy,
riak_core,
@@ -55,6 +55,8 @@
riak_core_ring_util,
riak_core_stat,
riak_core_stat_cache,
+ riak_core_stat_sup,
+ riak_core_stats_sup,
riak_core_status,
riak_core_sup,
riak_core_sysmon_handler,
@@ -72,12 +74,11 @@
riak_core_vnode_worker_pool,
riak_core_web,
riak_core_wm_urlmap,
- slide,
- spiraltime,
supervisor_pre_r14b04,
vclock
]},
{registered, []},
+ {included_applications, [folsom]},
{applications, [
kernel,
stdlib,
@@ -86,8 +87,7 @@
crypto,
riak_sysmon,
webmachine,
- os_mon,
- folsom
+ os_mon
]},
{mod, { riak_core_app, []}},
{env, [
View
BIN rebar
Binary file not shown.
View
6 rebar.config
@@ -1,4 +1,4 @@
-{erl_first_files, ["src/gen_nb_server.erl", "src/gen_server2.erl"]}.
+{erl_first_files, ["src/gen_nb_server.erl", "src/riak_core_gen_server.erl"]}.
{cover_enabled, true}.
{erl_opts, [{parse_transform, lager_transform}]}.
{edoc_opts, [{preprocess, true}]}.
@@ -7,8 +7,8 @@
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}},
{poolboy, ".*", {git, "git://github.com/basho/poolboy", {branch, "master"}}},
- {protobuffs, "0.6.0", {git, "git://github.com/basho/erlang_protobuffs",
- {tag, "protobuffs-0.6.0"}}},
+ {protobuffs, "0.7.*", {git, "git://github.com/basho/erlang_protobuffs",
+ {branch, "master"}}},
{basho_stats, ".*", {git, "git://github.com/basho/basho_stats", "HEAD"}},
{riak_sysmon, ".*", {git, "git://github.com/basho/riak_sysmon", {branch, "master"}}},
{webmachine, ".*", {git, "git://github.com/basho/webmachine",
View
44 src/bloom.erl
@@ -187,3 +187,47 @@ bitarray_get(I, A) ->
V = array:get(AI, A),
V band (1 bsl (I rem ?W)) =/= 0.
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+simple_shuffle(L, N) ->
+ lists:sublist(simple_shuffle(L), 1, N).
+simple_shuffle(L) ->
+ N = 1000 * length(L),
+ L2 = [{random:uniform(N), E} || E <- L],
+ {_, L3} = lists:unzip(lists:keysort(1, L2)),
+ L3.
+
+fixed_case(Bloom, Size, FalseRate) ->
+ ?assert(bloom:capacity(Bloom) > Size),
+ ?assertEqual(0, bloom:size(Bloom)),
+ RandomList = simple_shuffle(lists:seq(1,100*Size), Size),
+ [?assertEqual(false, bloom:is_element(E, Bloom)) || E <- RandomList],
+ Bloom2 =
+ lists:foldl(fun(E, Bloom0) ->
+ bloom:add_element(E, Bloom0)
+ end, Bloom, RandomList),
+ [?assertEqual(true, bloom:is_element(E, Bloom2)) || E <- RandomList],
+
+ ?assert(bloom:size(Bloom2) > ((1-FalseRate)*Size)),
+ ok.
+
+scalable_case(Bloom, Size, FalseRate) ->
+ ?assertEqual(infinity, bloom:capacity(Bloom)),
+ ?assertEqual(0, bloom:size(Bloom)),
+ RandomList = simple_shuffle(lists:seq(1,100*Size), 10*Size),
+ [?assertEqual(false, bloom:is_element(E, Bloom)) || E <- RandomList],
+ Bloom2 =
+ lists:foldl(fun(E, Bloom0) ->
+ bloom:add_element(E, Bloom0)
+ end, Bloom, RandomList),
+ [?assertEqual(true, bloom:is_element(E, Bloom2)) || E <- RandomList],
+ ?assert(bloom:size(Bloom2) > ((1-FalseRate)*Size)),
+ ok.
+
+bloom_test() ->
+ fixed_case(bloom(5000), 5000, 0.001),
+ scalable_case(sbf(1000, 0.2), 1000, 0.2),
+ ok.
+
+-endif.
View
25 src/json_pp.erl
@@ -65,5 +65,26 @@ listify(IoList) -> binary_to_list(list_to_binary(IoList)).
test() ->
J1 = listify(mochijson:encode(test_data())),
io:format("~s~n", [listify(print(J1))]).
-
-
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+basic_test() ->
+ J1 = listify(mochijson:encode(test_data())),
+ L1 =
+ "{\n"
+ " \"foo\": true,\n"
+ " \"bar\": false,\n"
+ " \"baz\": [\n"
+ " 1,\n"
+ " 2,\n"
+ " 3,\n"
+ " 4\n"
+ " ],\n"
+ " \"fiz:f\": null,\n"
+ " \"fozzer\\\"\": 5\n"
+ "}",
+ ?assertEqual(L1, listify(print(J1))),
+ ok.
+
+-endif.
View
7 src/merkerl.erl
@@ -50,8 +50,6 @@
-module(merkerl).
-export([insert/2,delete/2,build_tree/1,diff/2,allkeys/1]).
--include_lib("eunit/include/eunit.hrl").
-
% TODO: fix doc, userdata is the ONLY user-exposed key
-record(merk, {nodetype, % atom: expected values are 'leaf' or 'inner'
key=undefined, % if nodetype=leaf, then this is binary/160
@@ -328,6 +326,9 @@ getkids(Tree) ->
sha(X) ->
crypto:sha(term_to_binary(X)).
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
% @spec merkle_test() -> bool()
% @doc A test function and example code.
%
@@ -365,4 +366,4 @@ merkle_test() ->
I2 = build_tree(I),
?assertEqual(2, length(allkeys(I2))).
-
+-endif.
View
41 src/riak_core.erl
@@ -24,6 +24,7 @@
leave/0, remove_from_cluster/1]).
-export([vnode_modules/0]).
-export([register/1, register/2, bucket_fixups/0, bucket_validators/0]).
+-export([stat_mods/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
-export([delete_guarded_event_handler/3]).
@@ -115,7 +116,7 @@ standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
false ->
standard_join(Node, Ring, Rejoin, Auto)
end;
- _ ->
+ _ ->
{error, unable_to_get_join_ring}
end;
pang ->
@@ -160,11 +161,11 @@ legacy_join(Node) when is_atom(Node) ->
pong ->
case rpc:call(Node,
application,
- get_env,
+ get_env,
[riak_core, ring_creation_size]) of
{ok, OurRingSize} ->
riak_core_gossip:send_ring(Node, node());
- _ ->
+ _ ->
{error, different_ring_sizes}
end;
pang ->
@@ -190,7 +191,7 @@ remove(Node) ->
standard_remove(Node) ->
riak_core_ring_manager:ring_trans(
- fun(Ring2, _) ->
+ fun(Ring2, _) ->
Ring3 = riak_core_ring:remove_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4}
@@ -215,7 +216,7 @@ down(false, Node) ->
{error, only_member};
_ ->
riak_core_ring_manager:ring_trans(
- fun(Ring2, _) ->
+ fun(Ring2, _) ->
Ring3 = riak_core_ring:down_member(node(), Ring2, Node),
Ring4 = riak_core_ring:ring_changed(node(), Ring3),
{new_ring, Ring4}
@@ -246,7 +247,7 @@ leave() ->
standard_leave(Node) ->
riak_core_ring_manager:ring_trans(
- fun(Ring2, _) ->
+ fun(Ring2, _) ->
Ring3 = riak_core_ring:leave_member(Node, Ring2, Node),
{new_ring, Ring3}
end, []),
@@ -289,6 +290,12 @@ bucket_validators() ->
{ok, Mods} -> Mods
end.
+stat_mods() ->
+ case application:get_env(riak_core, stat_mods) of
+ undefined -> [];
+ {ok, Mods} -> Mods
+ end.
+
%% Get the application name if not supplied, first by get_application
%% then by searching by module name
get_app(undefined, Module) ->
@@ -324,12 +331,18 @@ register(App, [{vnode_module, VNodeMod}|T]) ->
register(App, T);
register(App, [{bucket_validator, ValidationMod}|T]) ->
register_mod(get_app(App, ValidationMod), ValidationMod, bucket_validators),
+ register(App, T);
+register(App, [{stat_mod, StatMod}|T]) ->
+ register_mod(App, StatMod, stat_mods),
register(App, T).
+
register_mod(App, Module, Type) when is_atom(Module), is_atom(Type) ->
case Type of
vnode_modules ->
riak_core_vnode_proxy_sup:start_proxies(Module);
+ stat_mods ->
+ riak_core_stats_sup:start_server(Module);
_ ->
ok
end,
@@ -356,9 +369,9 @@ add_guarded_event_handler(HandlerMod, Handler, Args) ->
%% ExitFun = fun(Handler, Reason::term())
%% AddResult = ok | {error, Reason::term()}
%%
-%% @doc Add a "guarded" event handler to a gen_event instance.
-%% A guarded handler is implemented as a supervised gen_server
-%% (riak_core_eventhandler_guard) that adds a supervised handler in its
+%% @doc Add a "guarded" event handler to a gen_event instance.
+%% A guarded handler is implemented as a supervised gen_server
+%% (riak_core_eventhandler_guard) that adds a supervised handler in its
%% init() callback and exits when the handler crashes so it can be
%% restarted by the supervisor.
add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
@@ -372,13 +385,13 @@ add_guarded_event_handler(HandlerMod, Handler, Args, ExitFun) ->
%% Reason = term()
%%
%% @doc Delete a guarded event handler from a gen_event instance.
-%%
-%% Args is an arbitrary term which is passed as one of the arguments to
+%%
+%% Args is an arbitrary term which is passed as one of the arguments to
%% Module:terminate/2.
%%
-%% The return value is the return value of Module:terminate/2. If the
-%% specified event handler is not installed, the function returns
-%% {error,module_not_found}. If the callback function fails with Reason,
+%% The return value is the return value of Module:terminate/2. If the
+%% specified event handler is not installed, the function returns
+%% {error,module_not_found}. If the callback function fails with Reason,
%% the function returns {'EXIT',Reason}.
delete_guarded_event_handler(HandlerMod, Handler, Args) ->
riak_core_eventhandler_sup:stop_guarded_handler(HandlerMod, Handler, Args).
View
24 src/riak_core_app.erl
@@ -75,30 +75,8 @@ start(_StartType, _StartArgs) ->
%% Spin up the supervisor; prune ring files as necessary
case riak_core_sup:start_link() of
{ok, Pid} ->
- riak_core_stat:register_stats(),
+ riak_core:register(riak_core, [{stat_mod, riak_core_stat}]),
ok = riak_core_ring_events:add_guarded_handler(riak_core_ring_handler, []),
- %% App is running; search for latest ring file and initialize with it
- riak_core_ring_manager:prune_ringfiles(),
- case riak_core_ring_manager:find_latest_ringfile() of
- {ok, RingFile} ->
- case riak_core_ring_manager:read_ringfile(RingFile) of
- {error, Reason} ->
- lager:critical("Failed to read ring file: ~p",
- [lager:posix_error(Reason)]),
- throw({error, Reason});
- Ring0 ->
- %% Upgrade the ring data structure if necessary.
- Ring = riak_core_ring:upgrade(Ring0),
- riak_core_ring_manager:set_my_ring(Ring)
- end;
- {error, not_found} ->
- riak_core_ring_manager:write_ringfile(),
- lager:warning("No ring file available.");
- {error, Reason} ->
- lager:critical("Failed to load ring file: ~p",
- [lager:posix_error(Reason)]),
- throw({error, Reason})
- end,
%% Register capabilities
riak_core_capability:register({riak_core, vnode_routing},
View
52 src/riak_core_capability.erl
@@ -205,11 +205,12 @@ init_state(Registered) ->
handle_call({register, Capability, Info}, _From, State) ->
State2 = register_capability(node(), Capability, Info, State),
- State3 = renegotiate_capabilities(State2),
- publish_supported(State3),
- update_local_cache(State3),
- save_registered(State3#state.registered),
- {reply, ok, State3};
+ State3 = update_supported(State2),
+ State4 = renegotiate_capabilities(State3),
+ publish_supported(State4),
+ update_local_cache(State4),
+ save_registered(State4#state.registered),
+ {reply, ok, State4};
handle_call({ring_changed, Ring}, _From, State) ->
State2 = update_supported(Ring, State),
@@ -265,6 +266,10 @@ reload(State) ->
save_registered(State3#state.registered),
State3.
+update_supported(State) ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ update_supported(Ring, State).
+
%% Update this node's view of cluster capabilities based on a received ring
update_supported(Ring, State) ->
AllSupported = get_supported_from_ring(Ring),
@@ -278,11 +283,7 @@ update_supported(Ring, State) ->
{[], []} ->
add_node(Node, Supported, StateAcc);
{[], _} ->
- %% While the ring has no knowledge of Node's
- %% supported modes, the local view has prior
- %% knowledge. Do nothing and continue to use
- %% the existing modes.
- StateAcc;
+ add_node(Node, Supported, StateAcc);
{Same, Same} ->
StateAcc;
{_, _} ->
@@ -395,13 +396,17 @@ add_supported_to_ring(Node, Supported, Ring) ->
%% in the cluster.
negotiate_capabilities(Node, Override, State=#state{registered=Registered,
supported=Capabilities}) ->
- MyCaps = orddict:fetch(Node, Capabilities),
- N1 = reformat_capabilities(Registered, Capabilities),
- N2 = intersect_capabilities(N1),
- N3 = order_by_preference(MyCaps, N2),
- N4 = override_capabilities(N3, Override),
- N5 = [{Cap, hd(Common)} || {Cap, Common} <- N4],
- State#state{negotiated=N5}.
+ case orddict:find(Node, Capabilities) of
+ error ->
+ State;
+ {ok, MyCaps} ->
+ N1 = reformat_capabilities(Registered, Capabilities),
+ N2 = intersect_capabilities(N1),
+ N3 = order_by_preference(MyCaps, N2),
+ N4 = override_capabilities(N3, Override),
+ N5 = [{Cap, hd(Common)} || {Cap, Common} <- N4],
+ State#state{negotiated=N5}
+ end.
renegotiate_capabilities(State=#state{supported=[]}) ->
State;
@@ -548,15 +553,26 @@ get_supported_from_ring(Ring) ->
%% Determine capabilities of legacy nodes based on app.config settings and
%% the provided app-var -> mode mapping associated with capabilities when
%% registered.
-query_capabilities(Node, #state{registered=Registered}) ->
+query_capabilities(Node, State=#state{registered=Registered}) ->
+ %% Only query results we do not already have local knowledge of
+ Known = dict:from_list(get_supported(Node, State)),
lists:mapfoldl(fun({Capability, Info}, ResolvedAcc) ->
{Resv, Cap} = query_capability(Node,
+ Known,
Capability,
Info#capability.default,
Info#capability.legacy),
{Cap, ResolvedAcc and Resv}
end, true, Registered).
+query_capability(Node, Known, Capability, DefaultSup, LegacyVar) ->
+ case dict:find(Capability, Known) of
+ {ok, Supported} ->
+ {true, {Capability, Supported}};
+ error ->
+ query_capability(Node, Capability, DefaultSup, LegacyVar)
+ end.
+
query_capability(_, Capability, DefaultSup, undefined) ->
Default = {Capability, [DefaultSup]},
{true, Default};
View
70 src/riak_core_claimant.erl
@@ -160,6 +160,7 @@ claimant() ->
%%%===================================================================
init([]) ->
+ schedule_tick(),
{ok, #state{changes=[], seed=erlang:now()}}.
handle_call(clear, _From, State) ->
@@ -193,6 +194,10 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info(tick, State) ->
+ State2 = tick(State),
+ {noreply, State2};
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -453,6 +458,46 @@ same_plan(RingA, RingB) ->
(riak_core_ring:all_owners(RingA) == riak_core_ring:all_owners(RingB)) andalso
(riak_core_ring:pending_changes(RingA) == riak_core_ring:pending_changes(RingB)).
+schedule_tick() ->
+ Tick = app_helper:get_env(riak_core,
+ claimant_tick,
+ 10000),
+ erlang:send_after(Tick, ?MODULE, tick).
+
+tick(State) ->
+ maybe_force_ring_update(),
+ schedule_tick(),
+ State.
+
+maybe_force_ring_update() ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ IsClaimant = (riak_core_ring:claimant(Ring) == node()),
+ IsReady = riak_core_ring:ring_ready(Ring),
+ %% Do not force if we have any joining nodes unless any of them are
+ %% auto-joining nodes. Otherwise, we will force update continuously.
+ JoinBlock = (are_joining_nodes(Ring)
+ andalso (auto_joining_nodes(Ring) == [])),
+ case IsClaimant and IsReady and (not JoinBlock) of
+ true ->
+ maybe_force_ring_update(Ring);
+ false ->
+ ok
+ end.
+
+maybe_force_ring_update(Ring) ->
+ case compute_next_ring([], erlang:now(), Ring) of
+ {ok, NextRing} ->
+ case same_plan(Ring, NextRing) of
+ false ->
+ lager:warning("Forcing update of stalled ring"),
+ riak_core_ring_manager:force_update();
+ true ->
+ ok
+ end;
+ _ ->
+ ok
+ end.
+
%% =========================================================================
%% Claimant rebalance/reassign logic
%% =========================================================================
@@ -685,18 +730,21 @@ are_joining_nodes(CState) ->
Joining /= [].
%% @private
-maybe_handle_auto_joining(Node, CState) ->
+auto_joining_nodes(CState) ->
Joining = riak_core_ring:members(CState, [joining]),
- Auto =
- case riak_core_capability:get({riak_core, staged_joins}, false) of
- false ->
- Joining;
- true ->
- [Member || Member <- Joining,
- riak_core_ring:get_member_meta(CState,
- Member,
- '$autojoin') == true]
- end,
+ case riak_core_capability:get({riak_core, staged_joins}, false) of
+ false ->
+ Joining;
+ true ->
+ [Member || Member <- Joining,
+ riak_core_ring:get_member_meta(CState,
+ Member,
+ '$autojoin') == true]
+ end.
+
+%% @private
+maybe_handle_auto_joining(Node, CState) ->
+ Auto = auto_joining_nodes(CState),
maybe_handle_joining(Node, Auto, CState).
%% @private
View
3 src/riak_core_dtrace.erl
@@ -40,8 +40,6 @@
-module(riak_core_dtrace).
--include_lib("eunit/include/eunit.hrl").
-
-export([dtrace/1, dtrace/3, dtrace/4, dtrace/6]).
-export([enabled/0, put_tag/1]).
-export([timeit0/1, timeit_mg/1, timeit_best/1]). % debugging/testing only
@@ -187,6 +185,7 @@ timeit_best(ArgList) ->
dtrace(ArgList).
-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
timeit_naive_test() ->
test_common("timeit_naive",
View
26 src/gen_server2.erl → src/riak_core_gen_server.erl
@@ -1,22 +1,22 @@
%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
%% distribution, with the following modifications:
%%
-%% 1) the module name is gen_server2
+%% 1) the module name is riak_core_gen_server
%%
%% 2) more efficient handling of selective receives in callbacks
-%% gen_server2 processes drain their message queue into an internal
+%% riak_core_gen_server processes drain their message queue into an internal
%% buffer before invoking any callback module functions. Messages are
%% dequeued from the buffer for processing. Thus the effective message
-%% queue of a gen_server2 process is the concatenation of the internal
+%% queue of a riak_core_gen_server process is the concatenation of the internal
%% buffer and the real message queue.
%% As a result of the draining, any selective receive invoked inside a
%% callback is less likely to have to scan a large message queue.
%%
-%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% 3) riak_core_gen_server:cast is guaranteed to be order-preserving
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% 4) The new functions riak_core_gen_server:pcall/3, pcall/4, and pcast/3
%% allow callers to attach priorities to requests. Requests with
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
@@ -57,7 +57,7 @@
%%
%% $Id$
%%
--module(gen_server2).
+-module(riak_core_gen_server).
%%% ---------------------------------------------------
%%%
@@ -334,7 +334,7 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
- Queue = priority_queue:new(),
+ Queue = riak_core_priority_queue:new(),
{Timeout1, TimeoutState} = build_timeout_state(Timeout),
loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
@@ -354,7 +354,7 @@ init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
- Queue = priority_queue:new(),
+ Queue = riak_core_priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
@@ -431,7 +431,7 @@ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
Debug, Hib) ->
- case priority_queue:out(Queue) of
+ case riak_core_priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug, Hib, Msg);
@@ -481,11 +481,11 @@ adjust_hibernate_after({Current, Min, HibernatedAt}) ->
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+ riak_core_priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+ riak_core_priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
in(Input, Queue) ->
- priority_queue:in(Input, Queue).
+ riak_core_priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
Debug, _Hib, Msg) ->
@@ -1022,5 +1022,5 @@ format_status(Opt, StatusData) ->
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", priority_queue:to_list(Queue)}]} |
+ {"Queued messages", riak_core_priority_queue:to_list(Queue)}]} |
Specfic1].
View
4 src/riak_core_gossip.erl
@@ -44,10 +44,6 @@
-include("riak_core_ring.hrl").
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
--endif.
-
%% Default gossip rate: allow at most 45 gossip messages every 10 seconds
-define(DEFAULT_LIMIT, {45, 10000}).
View
5 src/riak_core_handoff_listener.erl
@@ -27,7 +27,7 @@
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([sock_opts/0, new_connection/2]).
+-export([get_handoff_ip/0, sock_opts/0, new_connection/2]).
-record(state, {
ipaddr :: string(),
portnum :: integer(),
@@ -40,6 +40,9 @@ start_link() ->
SslOpts = riak_core_handoff_sender:get_handoff_ssl_options(),
gen_nb_server:start_link(?MODULE, IpAddr, PortNum, [IpAddr, PortNum, SslOpts]).
+get_handoff_ip() ->
+ riak_core_gen_server:call(?MODULE, handoff_ip, infinity).
+
init([IpAddr, PortNum, SslOpts]) ->
register(?MODULE, self()),
View
6 src/riak_core_handoff_receiver.erl
@@ -22,7 +22,7 @@
-module(riak_core_handoff_receiver).
-include("riak_core_handoff.hrl").
--behaviour(gen_server2).
+-behaviour(riak_core_gen_server).
-export([start_link/0, % Don't use SSL
start_link/1, % SSL options list, empty=no SSL
set_socket/2]).
@@ -42,10 +42,10 @@ start_link() ->
start_link([]).
start_link(SslOpts) ->
- gen_server2:start_link(?MODULE, [SslOpts], []).
+ riak_core_gen_server:start_link(?MODULE, [SslOpts], []).
set_socket(Pid, Socket) ->
- gen_server2:call(Pid, {set_socket, Socket}).
+ riak_core_gen_server:call(Pid, {set_socket, Socket}).
init([SslOpts]) ->
{ok, #state{ssl_opts = SslOpts,
View
15 src/riak_core_handoff_sender.erl
@@ -279,18 +279,19 @@ visit_item(K, V, Acc) ->
end.
get_handoff_ip(Node) when is_atom(Node) ->
- try
- gen_server2:call({riak_core_handoff_listener, Node}, handoff_ip, infinity)
- catch
- _:_ ->
- error
+ case rpc:call(Node, riak_core_handoff_listener, get_handoff_ip, [],
+ infinity) of
+ {badrpc, _} ->
+ error;
+ Res ->
+ Res
end.
get_handoff_port(Node) when is_atom(Node) ->
- case catch(gen_server2:call({riak_core_handoff_listener, Node}, handoff_port, infinity)) of
+ case catch(riak_core_gen_server:call({riak_core_handoff_listener, Node}, handoff_port, infinity)) of
{'EXIT', _} ->
%% Check old location from previous release
- gen_server2:call({riak_kv_handoff_listener, Node}, handoff_port, infinity);
+ riak_core_gen_server:call({riak_kv_handoff_listener, Node}, handoff_port, infinity);
Other -> Other
end.
View
80 src/priority_queue.erl → src/riak_core_priority_queue.erl
@@ -53,7 +53,7 @@
%% a base case.
--module(priority_queue).
+-module(riak_core_priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
out/1, out/2, pout/1, join/2]).
@@ -219,3 +219,81 @@ r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+simple_case(Order) ->
+ Queue = ?MODULE:new(),
+ ?assertEqual(true, ?MODULE:is_queue(Queue)),
+ ?assertEqual(true, ?MODULE:is_empty(Queue)),
+ ?assertEqual(0, ?MODULE:len(Queue)),
+ ?assertEqual([], ?MODULE:to_list(Queue)),
+ case Order of
+ forward ->
+ Queue2 = ?MODULE:in(low, Queue),
+ Queue3 = ?MODULE:in(mid, 500, Queue2),
+ Queue4 = ?MODULE:in(high, 1000, Queue3);
+ reverse ->
+ Queue2 = ?MODULE:in(high, 1000, Queue),
+ Queue3 = ?MODULE:in(mid, 500, Queue2),
+ Queue4 = ?MODULE:in(low, Queue3);
+ mixed ->
+ Queue2 = ?MODULE:in(high, 1000, Queue),
+ Queue3 = ?MODULE:in(low, Queue2),
+ Queue4 = ?MODULE:in(mid, 500, Queue3)
+ end,
+ ?assertEqual(false, ?MODULE:is_empty(Queue4)),
+ ?assertEqual(3, ?MODULE:len(Queue4)),
+ ?assertMatch({{value, high}, _}, ?MODULE:out(Queue4)),
+ {{value, high}, Queue5} = ?MODULE:out(Queue4),
+ ?assertMatch({{value, mid}, _}, ?MODULE:out(Queue5)),
+ {{value, mid}, Queue6} = ?MODULE:out(Queue5),
+ ?assertMatch({{value, low}, _}, ?MODULE:out(Queue6)),
+ {{value, low}, Queue7} = ?MODULE:out(Queue6),
+ ?assertEqual(0, ?MODULE:len(Queue7)),
+
+ ?assertEqual(true, ?MODULE:is_queue(Queue2)),
+ ?assertEqual(true, ?MODULE:is_queue(Queue3)),
+ ?assertEqual(true, ?MODULE:is_queue(Queue4)),
+ ?assertEqual(false, ?MODULE:is_queue([])),
+ ok.
+
+merge_case() ->
+ QueueA1 = ?MODULE:new(),
+ QueueA2 = ?MODULE:in(1, QueueA1),
+ QueueA3 = ?MODULE:in(3, QueueA2),
+ QueueA4 = ?MODULE:in(5, QueueA3),
+
+ QueueB1 = ?MODULE:new(),
+ QueueB2 = ?MODULE:in(2, QueueB1),
+ QueueB3 = ?MODULE:in(4, QueueB2),
+ QueueB4 = ?MODULE:in(6, QueueB3),
+
+ Merged1 = ?MODULE:join(QueueA4, QueueB4),
+ ?assertEqual([{0,1},{0,3},{0,5},{0,2},{0,4},{0,6}],
+ ?MODULE:to_list(Merged1)),
+
+ QueueC1 = ?MODULE:new(),
+ QueueC2 = ?MODULE:in(1, 10, QueueC1),
+ QueueC3 = ?MODULE:in(3, 30, QueueC2),
+ QueueC4 = ?MODULE:in(5, 50, QueueC3),
+
+ QueueD1 = ?MODULE:new(),
+ QueueD2 = ?MODULE:in(2, 20, QueueD1),
+ QueueD3 = ?MODULE:in(4, 40, QueueD2),
+ QueueD4 = ?MODULE:in(6, 60, QueueD3),
+
+ Merged2 = ?MODULE:join(QueueC4, QueueD4),
+ ?assertEqual([{60,6},{50,5},{40,4},{30,3},{20,2},{10,1}],
+ ?MODULE:to_list(Merged2)),
+ ok.
+
+basic_test() ->
+ simple_case(forward),
+ simple_case(reverse),
+ simple_case(mixed),
+ merge_case(),
+ ok.
+
+-endif.
View
60 src/riak_core_ring_manager.erl
@@ -23,9 +23,8 @@
%% @doc the local view of the cluster's ring configuration
-module(riak_core_ring_manager).
--include_lib("eunit/include/eunit.hrl").
-define(RING_KEY, riak_ring).
--behaviour(gen_server2).
+-behaviour(riak_core_gen_server).
-export([start_link/0,
start_link/1,
@@ -63,12 +62,12 @@
%% ===================================================================
start_link() ->
- gen_server2:start_link({local, ?MODULE}, ?MODULE, [live], []).
+ riak_core_gen_server:start_link({local, ?MODULE}, ?MODULE, [live], []).
%% Testing entry point
start_link(test) ->
- gen_server2:start_link({local, ?MODULE}, ?MODULE, [test], []).
+ riak_core_gen_server:start_link({local, ?MODULE}, ?MODULE, [test], []).
%% @spec get_my_ring() -> {ok, riak_core_ring:riak_core_ring()} | {error, Reason}
@@ -79,29 +78,29 @@ get_my_ring() ->
end.
get_raw_ring() ->
- gen_server2:call(?MODULE, get_raw_ring, infinity).
+ riak_core_gen_server:call(?MODULE, get_raw_ring, infinity).
%% @spec refresh_my_ring() -> ok
refresh_my_ring() ->
- gen_server2:call(?MODULE, refresh_my_ring, infinity).
+ riak_core_gen_server:call(?MODULE, refresh_my_ring, infinity).
refresh_ring(Node, ClusterName) ->
- gen_server2:cast({?MODULE, Node}, {refresh_my_ring, ClusterName}).
+ riak_core_gen_server:cast({?MODULE, Node}, {refresh_my_ring, ClusterName}).
%% @spec set_my_ring(riak_core_ring:riak_core_ring()) -> ok
set_my_ring(Ring) ->
- gen_server2:call(?MODULE, {set_my_ring, Ring}, infinity).
+ riak_core_gen_server:call(?MODULE, {set_my_ring, Ring}, infinity).
%% @spec write_ringfile() -> ok
write_ringfile() ->
- gen_server2:cast(?MODULE, write_ringfile).
+ riak_core_gen_server:cast(?MODULE, write_ringfile).
ring_trans(Fun, Args) ->
- gen_server2:call(?MODULE, {ring_trans, Fun, Args}, infinity).
+ riak_core_gen_server:call(?MODULE, {ring_trans, Fun, Args}, infinity).
set_cluster_name(Name) ->
- gen_server2:call(?MODULE, {set_cluster_name, Name}, infinity).
+ riak_core_gen_server:call(?MODULE, {set_cluster_name, Name}, infinity).
%% @doc Exposed for support/debug purposes. Forces the node to change its
%% ring in a manner that will trigger reconciliation on gossip.
@@ -198,30 +197,43 @@ prune_ringfiles() ->
%% @private (only used for test instances)
stop() ->
- gen_server2:cast(?MODULE, stop).
+ riak_core_gen_server:cast(?MODULE, stop).
%% ===================================================================
%% gen_server callbacks
%% ===================================================================
init([Mode]) ->
- case Mode of
- live ->
- Ring = riak_core_ring:fresh();
- test ->
- Ring = riak_core_ring:fresh(16,node())
- end,
-
- %% Set the ring and send initial notification to local observers that
- %% ring has changed.
- %% Do *not* save the ring to disk here. On startup we deliberately come
- %% up with a ring where the local node owns all partitions so that any
- %% fallback vnodes will be started so they can hand off.
+ Ring = reload_ring(Mode),
set_ring_global(Ring),
riak_core_ring_events:ring_update(Ring),
{ok, #state{mode = Mode, raw_ring=Ring}}.
+reload_ring(test) ->
+ riak_core_ring:fresh(16,node());
+reload_ring(live) ->
+ case riak_core_ring_manager:find_latest_ringfile() of
+ {ok, RingFile} ->
+ case riak_core_ring_manager:read_ringfile(RingFile) of
+ {error, Reason} ->
+ lager:critical("Failed to read ring file: ~p",
+ [lager:posix_error(Reason)]),
+ throw({error, Reason});
+ Ring0 ->
+ %% Upgrade the ring data structure if necessary.
+ lager:info("Upgrading legacy ring"),
+ Ring = riak_core_ring:upgrade(Ring0),
+ Ring
+ end;
+ {error, not_found} ->
+ lager:warning("No ring file available."),
+ riak_core_ring:fresh();
+ {error, Reason} ->
+ lager:critical("Failed to load ring file: ~p",
+ [lager:posix_error(Reason)]),
+ throw({error, Reason})
+ end.
handle_call(get_raw_ring, _From, #state{raw_ring=Ring} = State) ->
{reply, {ok, Ring}, State};
View
32 src/riak_core_stat.erl
@@ -42,6 +42,7 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_stats() ->
+ [(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
[register_stat({?APP, Name}, Type) || {Name, Type} <- stats()],
riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
@@ -57,9 +58,17 @@ get_stats() ->
update(Arg) ->
gen_server:cast(?SERVER, {update, Arg}).
+% @spec produce_stats(state(), integer()) -> proplist()
+%% @doc Produce a proplist-formatted view of the current aggregation
+%% of stats.
+produce_stats() ->
+ lists:append([gossip_stats(),
+ vnodeq_stats()]).
+
%% gen_server
init([]) ->
+ register_stats(),
{ok, ok}.
handle_call(_Req, _From, State) ->
@@ -92,10 +101,10 @@ update1(ignored_gossip) ->
folsom_metrics:notify_existing_metric({?APP, ignored_gossip_total}, {inc, 1}, counter);
update1(gossip_received) ->
- folsom_metrics:notify_existing_metric({?APP, gossip_received}, 1, meter);
+ folsom_metrics:notify_existing_metric({?APP, gossip_received}, 1, spiral);
update1(rings_reconciled) ->
- folsom_metrics:notify_existing_metric({?APP, rings_reconciled}, 1, meter);
+ folsom_metrics:notify_existing_metric({?APP, rings_reconciled}, 1, spiral);
update1(converge_timer_begin) ->
folsom_metrics:notify_existing_metric({?APP, converge_delay}, timer_start, duration);
@@ -110,35 +119,28 @@ update1(rebalance_timer_end) ->
%% private
stats() ->
[{ignored_gossip_total, counter},
- {rings_reconciled, meter},
- {gossip_received, meter},
+ {rings_reconciled, spiral},
+ {gossip_received, spiral},
{rejected_handoffs, counter},
{handoff_timeouts, counter},
{converge_delay, duration},
{rebalance_delay, duration}].
register_stat(Name, counter) ->
folsom_metrics:new_counter(Name);
-register_stat(Name, meter) ->
- folsom_metrics:new_meter(Name);
+register_stat(Name, spiral) ->
+ folsom_metrics:new_spiral(Name);
register_stat(Name, duration) ->
folsom_metrics:new_duration(Name).
-% @spec produce_stats(state(), integer()) -> proplist()
-%% @doc Produce a proplist-formatted view of the current aggregation
-%% of stats.
-produce_stats() ->
- lists:append([gossip_stats(),
- vnodeq_stats()]).
-
gossip_stats() ->
lists:flatten([backwards_compat(Stat, Type, folsom_metrics:get_metric_value({?APP, Stat})) ||
{Stat, Type} <- stats(), Stat /= riak_core_rejected_handoffs]).
-backwards_compat(rings_reconciled, meter, Stats) ->
+backwards_compat(rings_reconciled, spiral, Stats) ->
[{rings_reconciled_total, proplists:get_value(count, Stats)},
{rings_reconciled, trunc(proplists:get_value(one, Stats))}];
-backwards_compat(gossip_received, meter, Stats) ->
+backwards_compat(gossip_received, spiral, Stats) ->
{gossip_received, trunc(proplists:get_value(one, Stats))};
backwards_compat(Name, counter, Stats) ->
{Name, Stats};
View
62 src/riak_core_stat_cache.erl
@@ -66,15 +66,20 @@ stop() ->
%%% gen server
init([]) ->
+ process_flag(trap_exit, true),
Tab = ets:new(?MODULE, [protected, set, named_table]),
- {ok, #state{tab=Tab}}.
+ TTL = app_helper:get_env(riak_core, stat_cache_ttl, ?TTL),
+ %% re-register mods, if this is a restart after a crash
+ RegisteredMods = lists:foldl(fun({App, Mod}, Registerd) ->
+ register_mod(App, Mod, produce_stats, [], TTL, Registerd) end,
+ orddict:new(),
+ riak_core:stat_mods()),
+ {ok, #state{tab=Tab, apps=orddict:from_list(RegisteredMods)}}.
handle_call({register, App, {Mod, Fun, Args}, TTL}, _From, State0=#state{apps=Apps0}) ->
Apps = case registered(App, Apps0) of
false ->
- folsom_metrics:new_histogram({?MODULE, Mod}),
- folsom_metrics:new_meter({?MODULE, App}),
- orddict:store(App, {Mod, Fun, Args, TTL}, Apps0);
+ register_mod(App, Mod, Fun, Args, TTL, Apps0);
{true, _} ->
Apps0
end,
@@ -100,7 +105,7 @@ handle_call(_Request, _From, State) ->
handle_cast({stats, App, Stats, TS}, State0=#state{tab=Tab, active=Active}) ->
ets:insert(Tab, {App, TS, Stats}),
State = case orddict:find(App, Active) of
- {ok, Awaiting} ->
+ {ok, {_Pid, Awaiting}} ->
[gen_server:reply(From, {ok, Stats, TS}) || From <- Awaiting],
State0#state{active=orddict:erase(App, Active)};
error ->
@@ -112,6 +117,16 @@ handle_cast(stop, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+%% don't let a crashing stat mod crash the cache
+handle_info({'EXIT', FromPid, Reason}, State0=#state{active=Active}) when Reason /= normal ->
+ Reply = case awaiting_for_pid(FromPid, Active) of
+ not_found ->
+ {stop, Reason, State0};
+ {ok, {App, Awaiting}} ->
+ [gen_server:reply(From, {error, Reason}) || From <- Awaiting],
+ {noreply, State0#state{active=orddict:erase(App, Active)}}
+ end,
+ Reply;
handle_info(_Info, State) ->
{noreply, State}.
@@ -122,6 +137,11 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% internal
+register_mod(App, Mod, Fun, Args, TTL, Apps) ->
+ folsom_metrics:new_histogram({?MODULE, Mod}),
+ folsom_metrics:new_meter({?MODULE, App}),
+ orddict:store(App, {Mod, Fun, Args, TTL}, Apps).
+
registered(App, Apps) ->
registered(orddict:find(App, Apps)).
@@ -151,10 +171,10 @@ maybe_get_stats(App, From, Active, {M, F, A}) ->
%% if a get stats is not under way start one
Awaiting = case orddict:find(App, Active) of
error ->
- do_get_stats(App, {M, F, A}),
- [From];
- {ok, Froms} ->
- [From|Froms]
+ Pid = do_get_stats(App, {M, F, A}),
+ {Pid, [From]};
+ {ok, {Pid, Froms}} ->
+ {Pid, [From|Froms]}
end,
orddict:store(App, Awaiting, Active).
@@ -164,6 +184,14 @@ do_get_stats(App, {M, F, A}) ->
folsom_metrics:notify_existing_metric({?MODULE, App}, 1, meter),
gen_server:cast(?MODULE, {stats, App, Stats, folsom_utils:now_epoch()}) end).
+awaiting_for_pid(Pid, Active) ->
+ case [{App, Awaiting} || {App, {Proc, Awaiting}} <- orddict:to_list(Active),
+ Proc == Pid] of
+ [] ->
+ not_found;
+ L -> {ok, hd(L)}
+ end.
+
-ifdef(TEST).
-define(MOCKS, [folsom_utils, riak_core_stat, riak_kv_stat]).
@@ -189,8 +217,10 @@ cache_test_() ->
{"Expired cache, re-calculate",
fun get_expired/0},
{"Only a single process can calculate stats",
- fun serialize_calls/0}
- ]}.
+ fun serialize_calls/0},
+ {"Crash test",
+ fun crasher/0}
+ ]}.
register() ->
[meck:expect(M, produce_stats, fun() -> ?STATS end)
@@ -258,6 +288,16 @@ serialize_calls() ->
[?assertEqual({ok, ?STATS, Now}, Res) || Res <- Results],
?assertEqual(2, meck:num_calls(riak_kv_stat, produce_stats, [])).
+crasher() ->
+ Pid = whereis(riak_core_stat_cache),
+ Now = tick(10000, 0),
+ meck:expect(riak_core_stat, produce_stats, fun() ->
+ ?STATS end),
+ meck:expect(riak_kv_stat, produce_stats, fun() -> erlang:error(boom) end),
+ ?assertMatch({error, {boom, _Stack}}, riak_core_stat_cache:get_stats(riak_kv)),
+ ?assertEqual(Pid, whereis(riak_core_stat_cache)),
+ ?assertEqual({ok, ?STATS, Now}, riak_core_stat_cache:get_stats(riak_core)).
+
tick(Moment, IncrBy) ->
meck:expect(folsom_utils, now_epoch, fun() -> Moment + IncrBy end),
Moment+IncrBy.
View
54 src/riak_core_stat_sup.erl
@@ -0,0 +1,54 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_stat_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ Children = lists:flatten(
+ [?CHILD(folsom_sup, supervisor),
+ ?CHILD(riak_core_stats_sup, supervisor)
+ ]),
+
+ {ok, {{rest_for_one, 10, 10}, Children}}.
View
50 src/riak_core_stats_sup.erl
@@ -0,0 +1,50 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+-module(riak_core_stats_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+-export([start_server/1, stop_server/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type, Timeout), {I, {I, start_link, []}, permanent, Timeout, Type, [I]}).
+-define(CHILD(I, Type), ?CHILD(I, Type, 5000)).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ Children = [stat_server(Mod) || {_App, Mod} <- riak_core:stat_mods()],
+ {ok, {{one_for_one, 5, 10}, [?CHILD(riak_core_stat_cache, worker)|Children]}}.
+
+start_server(Mod) ->
+ Ref = stat_server(Mod),
+ Pid = case supervisor:start_child(?MODULE, Ref) of
+ {ok, Child} -> Child;
+ {error, {already_started, Child}} -> Child
+ end,
+ Pid.
+
+stop_server(Mod) ->
+ supervisor:terminate_child(?MODULE, Mod),
+ supervisor:delete_child(?MODULE, Mod),
+ ok.
+
+%% @private
+stat_server(Mod) ->
+ ?CHILD(Mod, worker).
View
5 src/riak_core_sup.erl
@@ -67,14 +67,11 @@ init([]) ->
?CHILD(riak_core_node_watcher_events, worker),
?CHILD(riak_core_node_watcher, worker),
?CHILD(riak_core_vnode_manager, worker),
- ?CHILD(riak_core_stat_cache, worker),
- ?CHILD(riak_core_stat, worker),
?CHILD(riak_core_capability, worker),
?CHILD(riak_core_gossip, worker),
?CHILD(riak_core_claimant, worker),
+ ?CHILD(riak_core_stat_sup, supervisor),
RiakWebs
]),
{ok, {{one_for_one, 10, 10}, Children}}.
-
-
View
18 src/riak_core_vnode_worker_pool.erl
@@ -91,15 +91,14 @@ shutdown(_Event, State) ->
{next_state, shutdown, State}.
handle_event({checkin, Pid}, shutdown, #state{monitors=Monitors0} = State) ->
- Monitors = lists:keydelete(Pid, 1, Monitors0),
+ Monitors = demonitor_worker(Pid, Monitors0),
case Monitors of
[] -> %% work all done, time to exit!
{stop, shutdown, State};
_ ->
{next_state, shutdown, State#state{monitors=Monitors}}
end;
-handle_event({checkin, Worker}, _, #state{pool = Pool, queue=Q, monitors=Monitors0} = State) ->
- Monitors = lists:keydelete(Worker, 1, Monitors0),
+handle_event({checkin, Worker}, _, #state{pool = Pool, queue=Q, monitors=Monitors} = State) ->
case queue:out(Q) of
{{value, {work, Work, From}}, Rem} ->
case poolboy:checkout(Pool, false) of
@@ -113,7 +112,8 @@ handle_event({checkin, Worker}, _, #state{pool = Pool, queue=Q, monitors=Monitor
monitors=NewMonitors}}
end;
{empty, Empty} ->
- {next_state, ready, State#state{queue=Empty, monitors=Monitors}}
+ NewMonitors = demonitor_worker(Worker, Monitors),
+ {next_state, ready, State#state{queue=Empty, monitors=NewMonitors}}
end;
handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.
@@ -179,6 +179,16 @@ monitor_worker(Worker, From, Work, Monitors) ->
[{Worker, Ref, From, Work} | Monitors]
end.
+demonitor_worker(Worker, Monitors) ->
+ case lists:keyfind(Worker, 1, Monitors) of
+ {Worker, Ref, _From, _Work} ->
+ erlang:demonitor(Ref),
+ lists:keydelete(Worker, 1, Monitors);
+ false ->
+ %% not monitored?
+ Monitors
+ end.
+
discard_queued_work(Q) ->
case queue:out(Q) of
{{value, {work, _Work, From}}, Rem} ->
View
484 src/slide.erl
@@ -1,484 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_core: Core Riak Application
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
-%% @doc Keep track of thing in a sliding time window. The idea here
-%% is that you have some reading to take several times.
-%% Occasionally, you want to compute some aggregation of those
-%% readings for the last N seconds.
-%%
-%% For example, you might read the weight of cars passing a point
-%% in the road. You want to compute some statistics every hour.
-%% You could:
-%%
-%% %% create a new slide, with an hour window
-%% T0 = slide:fresh(60*60)
-%%
-%% %% update it every time a car passes
-%% T1 = slide:update(T0, Weight, slide:moment())
-%%
-%% %% eventually ask for stats
-%% {NumberOfCars, TotalWeight} = slide:sum(TN, slide:moment())
-%% {NumberOfCars, AverageWeight} = slide:mean(TN, slide:moment())
-%% {NumberOfCars, {MedianWeight,
-%% NinetyFivePercentWeight,
-%% NinetyNinePercentWeight,
-%% HeaviestWeight} = slide:nines(TN, slide:moment())
-
--module(slide).
-
--export([fresh/0, fresh/1, fresh/2]).
--export([update/2, update/3, moment/0]).
--export([sum/1, sum/2, sum/3]).
--export([mean/1, mean/2, mean/3]).
--export([nines/1, nines/2, nines/3]).
--export([mean_and_nines/2, mean_and_nines/6]).
--export([private_dir/0, sync/1]).
-
--include_lib("kernel/include/file.hrl").
--include_lib("eunit/include/eunit.hrl").
-
--define(DIR, "/tmp/riak/slide-data"). % SLF TODO: need pkg-specific data dir handling
--define(REC_BYTES, 12). % 4 + (size(term_to_binary(4000000000)) = 8)
-
--record(slide, {
- oldest, %% oldest timestamp here
- window, %% window to which to trim
- trigger, %% age at which to trigger pruning
- dir, %% directory for data
- readings_fh, %% filehandle for current moment's readings
- readings_m %% moment associated with readings_fh
- }).
-
-%% @spec fresh() -> slide()
-%% @equiv fresh(60)
-fresh() -> fresh(60).
-
-%% @spec fresh(integer()) -> slide()
-%% @equiv fresh(Window, Window)
-fresh(Window) -> fresh(Window, Window).
-
-%% @spec fresh(integer(), integer()) -> slide()
-%% @doc Create an empty slide for tracking Window-seconds worth of
-%% readings, and pruning those readings after Trigger seconds.
-fresh(Window, Trigger) when Trigger >= Window ->
- {A,B,C} = now(),
- Dir = lists:flatten(io_lib:format("~s/~p.~p.~p", [private_dir(), A, B, C])),
- {ok, parent, Dir} = {filelib:ensure_dir(Dir), parent, Dir},
- {ok, Dir} = {file:make_dir(Dir), Dir},
- #slide{window=Window, trigger=Trigger, dir=Dir}.
-
-%% @spec moment() -> integer()
-%% @doc Get the current time in seconds.
-moment() ->
- calendar:datetime_to_gregorian_seconds(calendar:local_time()).
-
-%% @spec update(slide(), term()) -> slide()
-%% @equiv update(S, Reading, moment())
-update(S, Reading) -> update(S, Reading, moment()).
-
-%% @spec update(slide(), term(), integer()) -> slide()
-%% @doc Store a new reading. The current list of readings will be
-%% pruned if Moment is as new as or newer than the most recent
-%% reading stored, and more than Trigger seconds newer than the
-%% oldest reading stored.
-update(S0=#slide{oldest=Oldest,dir=Dir,readings_m=RdMoment,readings_fh=FH},
- Reading0, Moment) ->
- S1 = if Moment == RdMoment ->
- S0;
- true ->
- catch file:close(FH),
- File = integer_to_list(Moment rem S0#slide.window),
- {ok, FH2} = file:open(filename:join(Dir, File),
- file_write_options()),
- S0#slide{readings_m = Moment,
- readings_fh = FH2,
- oldest = if Oldest == undefined ->
- Moment;
- true ->
- Oldest
- end}
- end,
- Reading = if Reading0 < 4000000000 -> Reading0;
- true -> 4000000000
- end,
- %% 4 bytes len header + 8 bytes ...
- Bin = pad_bin(term_to_binary(Reading), 8),
- ok = file:write(S1#slide.readings_fh, [<<8:32>>, Bin]),
- S1.
-
-%% @spec sum(slide()) -> {Count::integer(), Sum::integer()}
-%% @doc Sum of readings from now through Window seconds ago. Return is
-%% number of readings in the range and the sum of those readings.
-sum(Slide) -> sum(Slide, moment()).
-
-%% @spec sum(slide(), integer()) -> {Count::integer(), Sum::integer()}
-%% @doc Sum of readings from Moment through Window seconds before Moment.
-%% Return is number of readings in the range and the sum of those
-%% readings.
-sum(Slide, Moment) -> sum(Slide, Moment, Slide#slide.window).
-
-%% @spec sum(slide(), integer(), integer()) ->
-%% {Count::integer(), Sum::integer()}
-%% @doc Sum of readings from Moment through Seconds seconds before
-%% Moment. Return is number of readings in the range and the sum
-%% of those readings.
-sum(#slide{dir=Dir}, Moment, Seconds) ->
- Cutoff = Moment-Seconds,
- Names = filelib:wildcard("*", Dir),
- ToScan = [Name || Name <- Names, list_to_integer(Name) >= 0],
- Blobs = [element(2, file:read_file(filename:join(Dir, Name))) ||
- Name <- ToScan],
- %% histo_experiment(Blobs),
- sum_blobs(Blobs, Moment, Cutoff).
-
-private_dir() ->
- case application:get_env(riak_core, slide_private_dir) of
- undefined ->
- Root = case application:get_env(riak_core, platform_data_dir) of
- undefined -> ?DIR;
- {ok, X} -> filename:join([X, "slide-data"])
- end,
- filename:join([Root, os:getpid()]);
- {ok, Dir} ->
- Dir
- end.
-
-sync(_S) ->
- todo.
-
-mean_and_nines(Slide, Moment) ->
- mean_and_nines(Slide, Moment, 0, 5000000, 20000, down).
-
-mean_and_nines(#slide{dir=Dir, window = Window}, _Moment, HistMin, HistMax, HistBins, RoundingMode) ->
- Now = moment(),
- Names = filelib:wildcard("*", Dir),
- ModTime = fun(Name) ->
- {ok, FI} = file:read_file_info(filename:join(Dir, Name)),
- calendar:datetime_to_gregorian_seconds(FI#file_info.mtime)
- end,
- ToScan = [Name || Name <- Names,
- Now - ModTime(Name) =< Window],
- Blobs = [element(2, file:read_file(filename:join(Dir, Name))) ||
- Name <- ToScan],
- compute_quantiles(Blobs, HistMin, HistMax, HistBins, RoundingMode).
-
-compute_quantiles(Blobs, HistMin, HistMax, HistBins, RoundingMode) ->
- {H, Count} = compute_quantiles(
- Blobs, basho_stats_histogram:new(HistMin, HistMax, HistBins), 0),
- {_Min, Mean, Max, _Var, _SDev} = basho_stats_histogram:summary_stats(H),
- P50 = basho_stats_histogram:quantile(0.50, H),
- P95 = basho_stats_histogram:quantile(0.95, H),
- P99 = basho_stats_histogram:quantile(0.99, H),
-
- %% RoundingMode allows the caller to decide whether to round up or
- %% down to the nearest integer. This is useful in cases where we
- %% measure very small, but non-zero integer values where rounding
- %% down would give a zero rather than a one.
-
- %% The calls to erlang:min/N exist because the histogram estimates
- %% percentiles. Depending on the sample size or distribution, it
- %% is possible that the estimated percentile is larger than the
- %% max, which is foolish. If that happens, then we ignore the
- %% estimate and use the value of max instead.
- case RoundingMode of
- up ->
- RMax = my_ceil(Max),
- {Count, my_ceil(Mean), {
- erlang:min(my_ceil(P50), RMax),
- erlang:min(my_ceil(P95), RMax),
- erlang:min(my_ceil(P99), RMax),
- erlang:min(my_ceil(Max), RMax)
- }};
- _ -> %% 'down'
- RMax = my_trunc(Max),
- {Count, my_trunc(Mean), {
- erlang:min(my_trunc(P50), RMax),
- erlang:min(my_trunc(P95), RMax),
- erlang:min(my_trunc(P99), RMax),
- erlang:min(my_trunc(Max), RMax)
- }}
- end.
-
-compute_quantiles([Blob|Blobs], H, Count) ->
- Ns = [binary_to_term(Bin) || <<_Hdr:32, Bin:8/binary>> <= Blob],
- H2 = basho_stats_histogram:update_all(Ns, H),
- compute_quantiles(Blobs, H2, Count + length(Ns));
-compute_quantiles([], H, Count) ->
- {H, Count}.
-
-my_trunc(X) when is_atom(X) ->
- 0;
-my_trunc(N) ->
- trunc(N).
-
-my_ceil(X) when is_atom(X) ->
- 0;
-my_ceil(X) ->
- T = erlang:trunc(X),
- case (X - T) of
- Neg when Neg < 0 -> T;
- Pos when Pos > 0 -> T + 1;
- _ -> T
- end.
-
-%% @spec mean(slide()) -> {Count::integer(), Mean::number()}
-%% @doc Mean of readings from now through Window seconds ago. Return is
-%% number of readings in the range and the mean of those readings.
-mean(Slide) -> mean(Slide, moment()).
-
-%% @spec mean(slide(), integer()) -> {Count::integer(), Mean::number()}
-%% @doc Mean of readings from Moment through Window seconds before Moment.
-%% Return is number of readings in the range and the mean of those
-%% readings.
-mean(Slide, Moment) -> mean(Slide, Moment, Slide#slide.window).
-
-%% @spec mean(slide(), integer(), integer()) ->
-%% {Count::integer(), Mean::number()}
-%% @doc Mean of readings from Moment through Seconds seconds before
-%% Moment. Return is number of readings in the range and the mean
-%% of those readings.
-mean(S, Moment, Seconds) ->
- case sum(S, Moment, Seconds) of
- {0, _} -> {0, undefined};
- {Count, Sum} -> {Count, Sum/Count}
- end.
-
-%% @spec nines(slide()) ->
-%% {Count::integer(), {Median::number(), NinetyFive::number(),
-%% NinetyNine::number(), Hundred::number()}}
-%% @doc Median, 95%, 99%, and 100% readings from now through Window
-%% seconds ago. Return is number of readings in the range and the
-%% nines of those readings.
-nines(Slide) -> nines(Slide, moment()).
-
-%% @spec nines(slide(), integer()) ->
-%% {Count::integer(), {Median::number(), NinetyFive::number(),
-%% NinetyNine::number(), Hundred::number()}}
-%% @doc Median, 95%, 99%, and 100% readings from Moment through Window
-%% seconds before Moment. Return is number of readings in the
-%% range and the nines of those readings.
-nines(Slide, Moment) -> nines(Slide, Moment, Slide#slide.window).
-
-%% @spec nines(slide(), integer(), integer()) ->
-%% {Count::integer(), {Median::number(), NinetyFive::number(),
-%% NinetyNine::number(), Hundred::number()}}
-%% @doc Median, 95%, 99%, and 100% readings from Moment through
-%% Seconds seconds before Moment. Return is number of readings
-%% in the range and the nines of those readings.
-nines(#slide{dir=Dir}, Moment, Seconds) ->
- _Cutoff = Moment-Seconds,
- Names = filelib:wildcard("*", Dir),
- ToScan = [Name || Name <- Names, list_to_integer(Name) >= 0],
- OutFile = filename:join(Dir, "-42"),
- Opts = [], %%[{no_files, 64}],
- ok = file_sorter:sort([filename:join(Dir, Name) || Name <- ToScan],
- OutFile, Opts),
- {ok, FI} = file:read_file_info(OutFile),
- case FI#file_info.size of
- 0 ->
- {0, {undefined, undefined, undefined, undefined}};
- Size ->
- Count = (Size div ?REC_BYTES) - 1,
- {Count,
- {read_word_at(mochinum:int_ceil(Count*0.50) * ?REC_BYTES, OutFile),
- read_word_at(mochinum:int_ceil(Count*0.95) * ?REC_BYTES, OutFile),
- read_word_at(mochinum:int_ceil(Count*0.99) * ?REC_BYTES, OutFile),
- read_word_at(Count * ?REC_BYTES, OutFile)}}
- end.
-
-read_word_at(Offset, File) ->
- {ok, FH} = file:open(File, [read, raw, binary]),
- {ok, Bin} = file:pread(FH, Offset + 4, ?REC_BYTES - 4), % 4 = header to skip
- binary_to_term(Bin).
-
-%% Using accumulator func args avoids the garbage creation by
-%% lists:foldl's need to create 2-tuples to manage accumulator.
-
-sum_blobs(Blobs, Moment, Cutoff) ->
- sum_blobs2(Blobs, Moment, Cutoff, 0, 0).
-
-sum_blobs2([], _Moment, _Cutoff, TCount, TSum) ->
- {TCount, TSum};
-sum_blobs2([Blob|Blobs], Moment, Cutoff, TCount, TSum) ->
- {Count, Sum} = sum_ints(
- [binary_to_term(Bin) || <<_Hdr:32, Bin:8/binary>> <= Blob],
- 0, 0),
- sum_blobs2(Blobs, Moment, Cutoff, TCount + Count, TSum + Sum).
-
-%% Dunno if this is any faster/slower than lists:sum/1 + erlang:length/1.
-
-sum_ints([I|Is], Count, Sum) ->
- sum_ints(Is, Count + 1, Sum + I);
-sum_ints([], Count, Sum) ->
- {Count, Sum}.
-
-pad_bin(Bin, Size) when size(Bin) == Size ->
- Bin;
-pad_bin(Bin, Size) ->
- Bits = (Size - size(Bin)) * 8,
- <<Bin/binary, 0:Bits>>.
-
-%%
-%% Test
-%%
-
-setup_eunit_proc_dict() ->
- erlang:put({?MODULE, eunit}, true).
-
-file_write_options() ->
- case erlang:get({?MODULE, eunit}) of
- true ->
- [write, raw, binary];
- _ ->
- [write, raw, binary, delayed_write]
- end.
-
--ifdef(TEST).
-
-auto_prune_test() ->
- S0 = slide:fresh(10),
- S1 = slide:update(S0, 5, 3),
- S1b = idle_time_passing(S1, 4, 13),
- S2 = slide:update(S1b, 6, 14),
- S2b = idle_time_passing(S2, 15, 15),
- ?assertEqual(6, element(2, slide:sum(S2b, 15, 10))).
-
-sum_test() ->
- setup_eunit_proc_dict(),
- S0 = slide:fresh(10),
- ?assertEqual({0, 0}, % no points, sum = 0
- slide:sum(S0, 9, 10)),
- S1 = slide:update(S0, 3, 1),
- ?assertEqual({1, 3}, % one point, sum = 3
- slide:sum(S1, 9, 10)),
- S2 = slide:update(S1, 5, 5),
- ?assertEqual({2, 8}, % two points, sum = 8
- slide:sum(S2, 9, 10)),
- S3 = slide:update(S2, 7, 5),
- ?assertEqual({3, 15}, % three points (two concurrent), sum = 15
- slide:sum(S3, 9, 10)),
- S3b = idle_time_passing(S3, 6, 13),
- S4 = slide:update(S3b, 11, 14),
- ?assertEqual(23, % ignoring first reading, sum = 23
- element(2, slide:sum(S4, 14, 10))),
- S4b = idle_time_passing(S4, 15, 18),
- ?assertEqual(11, % shifted window
- element(2, slide:sum(S4b, 18, 10))),
- S4c = idle_time_passing(S4b, 19, 21),
- S5 = slide:update(S4c, 13, 22),
- ?assertEqual(24, % shifted window
- element(2, slide:sum(S5, 22, 10))).
-
-idle_time_passing(Slide, StartMoment, EndMoment) ->
- lists:foldl(fun(Moment, S) -> slide:update(S, 0, Moment) end,
- Slide, lists:seq(StartMoment, EndMoment)).
-
-mean_test() ->
- setup_eunit_proc_dict(),
- S0 = slide:fresh(10),
- ?assertEqual({0, undefined}, % no points, no average
- slide:mean(S0)),
- S1 = slide:update(S0, 3, 1),
- ?assertEqual({1, 3.0}, % one point, avg = 3
- slide:mean(S1, 9, 10)),
- S2 = slide:update(S1, 5, 5),
- ?assertEqual({2, 4.0}, % two points, avg = 4
- slide:mean(S2, 9, 10)),
- S3 = slide:update(S2, 7, 5),
- ?assertEqual({3, 5.0}, % three points (two concurrent), avg = 5
- slide:mean(S3, 9, 10)),
- S3b = idle_time_passing(S3, 6, 13),
- S4 = slide:update(S3b, 11, 14),
- ?assertEqual(23/11, % ignoring first reading, avg =
- element(2, slide:mean(S4, 14, 10))),
- S4b = idle_time_passing(S4, 15, 18),
- ?assertEqual(11/10, % shifted window
- element(2, slide:mean(S4b, 18, 10))),
- S4c = idle_time_passing(S4b, 19, 21),
- S5 = slide:update(S4c, 13, 22),
- ?assertEqual(24/10, % shifted window
- element(2, slide:mean(S5, 22, 10))).
-
-mean_and_nines_test() ->
- setup_eunit_proc_dict(),
- PushReadings = fun(S, Readings) ->
- lists:foldl(
- fun({R,T}, A) ->
- slide:update(A, R, T)
- end,
- S, Readings)
- end,
- S0 = slide:fresh(10),
- ?assertEqual({0, {undefined, undefined, undefined, undefined}},
- slide:nines(S0)),
- S1 = PushReadings(S0, [ {R*100, 1} || R <- lists:seq(1, 10) ]),
- %% lists:sum([X*100 || X <- lists:seq(1,10)]) / 10 -> 550
- ?assertEqual({10, 550, {500, 958, 991, 1000}},
- slide:mean_and_nines(S1, 10)),
- S2 = PushReadings(S1, [ {R*100, 2} || R <- lists:seq(11, 20) ]),
- %% lists:sum([X*100 || X <- lists:seq(1,20)]) / 20 -> 1050
- ?assertEqual({20, 1050, {1000, 1916, 1983, 2000}},
- slide:mean_and_nines(S2, 10)),
- S3 = PushReadings(S2, [ {R*100, 3} || R <- lists:seq(21, 100) ]),
- %% lists:sum([X*100 || X <- lists:seq(1,100)]) / 100 -> 5050
- ?assertEqual({100, 5050, {5000, 9500, 9916, 10000}},
- slide:mean_and_nines(S3, 10)),
- S4 = idle_time_passing(S3, 4, 11), % 8 samples
- %% lists:sum([X*100 || X <- lists:seq(11,100)]) / (90+8) -> 5096.9
- ?assertEqual({98, 5096, {5125, 9512, 9918, 10000}},
- slide:mean_and_nines(S4, 11)).
-
-private_dir_test() ->
- %% Capture the initial state
- Pid = os:getpid(),
- OldSlide = application:get_env(riak_core, slide_private_dir),
- OldPlatform = application:get_env(riak_core, platform_data_dir),
-
- %% When slide_private_dir is set, use that.
- application:set_env(riak_core, slide_private_dir, "foo"),
- ?assertEqual("foo", private_dir()),
-
- %% When slide_private_dir is unset, but platform_data_dir is set,
- %% use a subdirectory of the platform_data_dir.
- application:unset_env(riak_core, slide_private_dir),
- application:set_env(riak_core, platform_data_dir, "./data"),
- ?assertEqual("./data/slide-data/" ++ Pid, private_dir()),
-
- %% When neither slide_private_dir nor platform_data_dir is set,
- %% use the hardcoded path.
- application:unset_env(riak_core, slide_private_dir),
- application:unset_env(riak_core, platform_data_dir),
- ?assertEqual(?DIR ++ "/" ++ Pid, private_dir()),
-
- %% Cleanup after ourselves
- case OldSlide of
- {ok, S} ->
- application:set_env(riak_core, slide_private_dir, S);
- _ -> ok
- end,
- case OldPlatform of
- {ok, P} ->
- application:set_env(riak_core, platform_data_dir, P);
- _ -> ok
- end.
-
--endif. %TEST
View
166 src/spiraltime.erl
@@ -1,166 +0,0 @@
-%% -------------------------------------------------------------------
-%%
-%% riak_core: Core Riak Application
-%%
-%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
-%%
-%% This file is provided to you under the Apache License,
-%% Version 2.0 (the "License"); you may not use this file
-%% except in compliance with the License. You may obtain
-%% a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing,
-%% software distributed under the License is distributed on an
-%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-%% KIND, either express or implied. See the License for the
-%% specific language governing permissions and limitations
-%% under the License.
-%%
-%% -------------------------------------------------------------------
-
-%% @doc A set of sliding windows for recording N-per-second running stats.
-%%
-%% This keeps stats per second for the last minute.
-%%
-%% See git commit history for versions of this module which keep stats
-%% for more than 1 minute.
-
--module(spiraltime).
--author('Justin Sheehy <justin@basho.com>').
-
--ifdef(TEST).
--ifdef(EQC).
--include_lib("eqc/include/eqc.hrl").
--endif.
--include_lib("eunit/include/eunit.hrl").
--endif.
-
--export([fresh/0,fresh/1,n/0,incr/2,incr/3,
- rep_second/1,rep_minute/1,
- test_spiraltime/0]).
-
--export_type([spiral/0]).
-
-%% @type moment() = integer().
-%% This is a number of seconds, as produced by
-%% calendar:datetime_to_gregorian_seconds(calendar:local_time())
-
-%% @type count() = integer().
-%% The number of entries recorded in some time period.
-
--record(spiral, {moment :: integer(),
- seconds :: [integer()]
- }).
-
--type spiral() :: #spiral{}.
-
-n() ->
- calendar:datetime_to_gregorian_seconds(calendar:local_time()).
-
-%% @doc Create an empty spiral with which to begin recording entries.
-%% @spec fresh() -> spiral()
-fresh() ->
- fresh(n()).
-
-%% @doc Create an empty spiral with which to begin recording entries.
-%% @spec fresh(moment()) -> spiral()
-fresh(Moment) ->
- #spiral{moment=Moment,
- seconds=[0 || _ <- lists:seq(1,60)]
- }.
-
-fieldlen(#spiral.seconds) -> 60.
-
-nextfield(#spiral.seconds) -> done.
-
-%% @doc Produce the number of entries recorded in the last second.
-%% @spec rep_second(spiral()) -> {moment(), count()}
-rep_second(Spiral) ->
- {Spiral#spiral.moment, hd(Spiral#spiral.seconds)}.
-
-%% @doc Produce the number of entries recorded in the last minute.
-%% @spec rep_minute(spiral()) -> {moment(), count()}
-rep_minute(Spiral) ->
- {Minute,_} = lists:split(60,Spiral#spiral.seconds),
- {Spiral#spiral.moment, lists:sum(Minute)}.
-
-%% @doc Add N to the counter of events, as recently as possible.
-%% @spec incr(count(), spiral()) -> spiral()
-incr(N, Spiral) -> incr(N,n(),Spiral).
-
-%% @doc Add N to the counter of events occurring at Moment.
-%% @spec incr(count(), moment(), spiral()) -> spiral()
-incr(N, Moment, Spiral) when Spiral#spiral.moment =:= Moment ->
- % common case -- updates for "now"
- Spiral#spiral{seconds=[hd(Spiral#spiral.seconds)+N|
- tl(Spiral#spiral.seconds)]};
-incr(_N, Moment, Spiral) when Spiral#spiral.moment - Moment > 59 ->
- Spiral; % updates more than a minute old are dropped! whee!
-incr(N, Moment, Spiral) ->
- S1 = update_moment(Moment, Spiral),
- {Front,Back} = lists:split(S1#spiral.moment - Moment,
- S1#spiral.seconds),
- S1#spiral{seconds=Front ++ [hd(Back)+N|tl(Back)]}.
-
-update_moment(Moment, Spiral) when Moment =< Spiral#spiral.moment ->
- Spiral;
-update_moment(Moment, Spiral) when Moment - Spiral#spiral.moment > 36288000 ->
- fresh(Moment);
-update_moment(Moment, Spiral) ->
- update_moment(Moment, push(0, Spiral#spiral{
- moment=Spiral#spiral.moment+1},
- #spiral.seconds)).
-
-getfield(Spiral,Field) -> element(Field, Spiral).
-setfield(Spiral,X,Field) -> setelement(Field, Spiral, X).
-
-push(_N, Spiral, done) ->
- Spiral;
-push(N, Spiral, Field) ->
- Full = [N|getfield