Skip to content

Commit

Permalink
store aggregates in each node. use parallel query for fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Stepachev committed May 5, 2012
1 parent 2e2ac4d commit 36b4ca4
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 16 deletions.
9 changes: 5 additions & 4 deletions apps/estockd/src/estockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ add_row(#stock_row{name=Name} = Row) ->
-spec list_aggs(string(), atom(), timestamp(), integer()) ->
[ {timestamp(), #stock_agg{}} ].
list_aggs(Name, Scale, Start, Limit) ->
case estockd_worker:find(Name) of
undefined -> [];
Pid -> estockd_worker:list_aggs(Pid, Scale, Start, Limit)
end.
estockd_worker:merge_aggs(
estockd_server:exec_parallel(Name,
estockd_worker,
list_aggs,
[Name, Scale, Start, Limit])).
167 changes: 165 additions & 2 deletions apps/estockd/src/estockd_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@
-behaviour(gen_server).
-include("estockd.hrl").
-include("log.hrl").
-include_lib("eunit/include/eunit.hrl").

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([register_worker/1, find_workers/1, exec_parallel/4]).
-export([echo_callback/1]).

-define(SERVER, ?MODULE).
-define(PIDS_TABLE, estockd_workers).
-define(IDX_TABLE, estockd_idx).


-record(state, {}).

Expand All @@ -36,20 +42,69 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%% @doc map given name to sender pid
-spec register_worker(string())->any().
register_worker(Name) ->
gen_server:call(?SERVER, {register_worker, Name}),
[ gen_server:call({?SERVER, Node}, {register_worker, Name}) || Node <- nodes() ].

%%% @doc lookup workers for specified Name
-spec find_workers(string())->[pid()].
find_workers(Name) ->
case ets:lookup(?PIDS_TABLE, Name) of
undefined -> io:format("No workers found for ~p~n", [Name]), [];
List -> [ V || { _, V } <- List ]
end.

%%% @doc remove pid to worker mapping
-spec unregister_worker(pid()) -> boolean().
unregister_worker(WorkerPid) ->
case ets:lookup(?IDX_TABLE, WorkerPid) of
[{_Pid, _Name}] = List ->
[ delete_worker(Name, Pid) || { Pid, Name } <- List ],
true;
undefined -> false
end.

exec_parallel(Name, Module, Fun, Args) ->
Pids = find_workers(Name),
?DBG("Exec ~p:~p(~p) for name ~p: pids=~p~n",
[Module, Fun, Args, Name, Pids]),
if length(Pids) == 0 ->
[];
true->
Nodes =
gb_sets:to_list(
gb_sets:from_list(
lists:map(fun(Pid) -> node(Pid) end, Pids))),
{Good, _Bad} = gen_server:multi_call(Nodes, ?SERVER, {exec_parallel, Module, Fun, Args}),
lists:map(fun(R) -> { _Node, Result } = R, Result end, Good)
end.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init([]) ->
init_tables(),
{ok, #state{}}.

handle_call(_Request, _From, State) ->
handle_call({register_worker, Name}, {Pid, _Tag}, State) ->
erlang:monitor(process, Pid),
save_worker(Name, Pid),
Reply = ok,
{reply, Reply, State};
handle_call({exec_parallel, Module, Fun, Args}, _From, State) ->
Reply = erlang:apply(Module, Fun, Args),
{reply, Reply, State}.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', _MonitorRef, process, Pid, _Info}, State) ->
unregister_worker(Pid),
{noreply, State};
handle_info(stop, State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -63,3 +118,111 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================

init_tables() ->
?DBG("Initializing tables", []),
case ets:info(?PIDS_TABLE) of
undefined -> ets:new(?PIDS_TABLE, [bag, named_table, protected]);
_ -> ok %%% somehow already created
end,
case ets:info(?IDX_TABLE) of
undefined -> ets:new(?IDX_TABLE, [bag, named_table, private]);
_ -> ok %%% somehow already created
end.

delete_worker(Name, Pid) ->
ets:delete_object(?PIDS_TABLE, { Name, Pid }),
ets:delete_object(?IDX_TABLE, { Pid, Name }),
?DBG("Unregistered worker ~p for name ~p~n", [Pid, Name]).

save_worker(Name, Pid) ->
ets:insert(?PIDS_TABLE, {Name, Pid}),
ets:insert(?IDX_TABLE, {Pid, Name}),
?DBG("Registered worker ~p for name ~p~n", [Pid, Name]).

echo_callback(P) ->
io:format("Here we are, ~p~n", [P]),
P.


%% ===================================================================
%% Unit Tests
%% ===================================================================

-ifdef(EUNIT).


dao_test() ->
init_tables(),
Self = self(),
Pid2 = spawn(fun() -> receive ok -> ok end end),
save_worker("ABC", Self),
save_worker("CCC", Self),
save_worker("CCC", Pid2),
?assertEqual([Self], find_workers("ABC")),
?assertEqual([Self, Pid2], find_workers("CCC")),
delete_worker("ABC", self()),
delete_worker("CCC", self()),
[] = find_workers("ABC"),
[Pid2] = find_workers("CCC"),
Pid2 ! ok,
close_tables().


register_test() ->
{ok, Srv} = start_link(),
Self = self(),
{Pid1, _} = spawn_worker(Self, "ABC"),
wait_ok(),
{Pid2, _} = spawn_worker(Self, "CBA"),
wait_ok(),

[S1 | _Rest] = find_workers("ABC"),
?assert(S1 =:= Pid1),

[S2 | _Rest] = find_workers("CBA"),
?assert(S2 =:= Pid2),

[] = find_workers("FFF"),

[R] = exec_parallel("ABC", estockd_server, echo_callback, ["Hello"]),
?assertEqual(R, "Hello"),

Pid1 ! stop,
Pid2 ! stop,
wait_down(),
wait_down(),
Srv ! stop.


wait_ok() ->
receive ok -> ok;
F -> io:format("Unexpected: ~p~n", [F]),
?assert(F)
end.

wait_down() ->
receive {'DOWN', _, _, _, _} -> ok;
F -> io:format("Unexpected: ~p~n", [F]),
?assert(F)
end.

spawn_worker(Self, Name) ->
spawn_monitor(fun() ->
register_worker(Name),
Self ! ok,
receive _ -> ok end
end).

%%% don't know how do this better
close_tables() ->
{Pid, _} = spawn_monitor(fun()-> receive _->ok end end),
ets:delete_all_objects(?PIDS_TABLE),
ets:delete_all_objects(?IDX_TABLE),
ets:give_away(?PIDS_TABLE, Pid, {}),
ets:give_away(?IDX_TABLE, Pid, {}),
Pid ! ok,
wait_down().

-endif.


63 changes: 53 additions & 10 deletions apps/estockd/src/estockd_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
truncate_datetime_millis/2]).
%% API
-export([start_link/1, start_worker/2, find_or_create/1, find/1,
add_row/2, add_row_async/2, list_aggs/4, init_table/0]).
add_row/2, add_row_async/2, list_aggs/4, init_table/0,
merge_aggs/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -59,8 +60,29 @@ add_row(Pid, Row) ->
add_row_async(Pid, Row) ->
gen_server:cast(Pid, {add_row, Row}).

list_aggs(Pid, Scale, Start, Limit) ->
gen_server:call(Pid, {list_aggs, Scale, Start, Limit}).
-spec list_aggs(string(), atom(), timestamp(), integer())->
[ {{string(), atom(), timestamp()}, #stock_agg {} } ].
list_aggs(Name, Scale, Start, Limit) ->
case find(Name) of
undefined -> [];
Pid -> gen_server:call(Pid, {list_aggs, Scale, Start, Limit})
end.

%%% Lists should be sorted.
-spec merge_aggs([[ {{string(), atom(), timestamp()}, #stock_agg {} } ]])->
[ {{string(), atom(), timestamp()}, #stock_agg {} } ].
merge_aggs(AggsLists) ->
MergeF = fun(_Key, Agg1, Agg2) ->
merge_agg(Agg1, Agg2) end,
FoldF = fun(Aggs, Dict) ->
dict:merge(MergeF, Dict, Aggs) end,
ListToDictF = fun(Aggs) -> dict:from_list(Aggs) end,
ResultDict = lists:foldl(
FoldF,
dict:new(),
lists:map(ListToDictF, AggsLists)
),
lists:keysort(1, dict:to_list(ResultDict)).

-spec find(Name :: string()) -> term().
find(Name) ->
Expand All @@ -85,7 +107,7 @@ find_or_create(Name) ->

init([Name, Tab]) ->
true = gproc:add_local_name(Name),
?DBG("Worker for ~p~n", [Name]),
estockd_server:register_worker(Name),
{ok, #state{name = Name, table = Tab}}.

handle_call({list_aggs, Scale, Start, Limit}, _From, State) ->
Expand Down Expand Up @@ -176,6 +198,23 @@ make_agg(Row) ->
amount = Row#stock_row.amount
}.

merge_agg(Agg1, Agg2) ->
{NOP, NOPT} = ?OPEN_PRICE(Agg1#stock_agg.open_price, Agg1#stock_agg.open_price_ts,
Agg2#stock_agg.open_price, Agg2#stock_agg.open_price_ts),
{NCP, NCPT} = ?CLOSE_PRICE(Agg1#stock_agg.close_price, Agg1#stock_agg.close_price_ts,
Agg2#stock_agg.close_price, Agg2#stock_agg.close_price_ts),
MinPrice = ?MIN(Agg1#stock_agg.min_price, Agg2#stock_agg.min_price),
MaxPrice = ?MAX(Agg1#stock_agg.max_price, Agg2#stock_agg.max_price),
#stock_agg {
open_price = NOP,
open_price_ts = NOPT,
close_price = NCP,
close_price_ts = NCPT,
min_price = MinPrice,
max_price = MaxPrice,
amount = Agg1#stock_agg.amount + Agg2#stock_agg.amount }.


update_agg(Agg, Row) ->
{NOP, NOPT} = ?OPEN_PRICE(Row#stock_row.price, Row#stock_row.timestamp,
Agg#stock_agg.open_price, Agg#stock_agg.open_price_ts),
Expand Down Expand Up @@ -246,19 +285,23 @@ make_agg_test() ->
price = 25,
amount = 1230 },
Agg = make_agg(Row),
?assert(Agg#stock_agg.amount =:= 1230),
?assert(Agg#stock_agg.min_price =:= 25),
?assert(Agg#stock_agg.max_price =:= 25),
?assertEqual(Agg#stock_agg.amount, 1230),
?assertEqual(Agg#stock_agg.min_price, 25),
?assertEqual(Agg#stock_agg.max_price, 25),

Row2 = #stock_row { timestamp = datetime_to_millis({{2011,2,15},{22,15,01}}),
name = "MSFT",
price = 26,
amount = 123 },
Agg2 = update_agg(Agg, Row2),
io:format("~p~n~p~n", [Agg, Agg2]),
?assert(Agg2#stock_agg.amount =:= 1353),
?assert(Agg2#stock_agg.min_price =:= 25),
?assert(Agg2#stock_agg.max_price =:= 26),
?assertEqual(Agg2#stock_agg.amount, 1353),
?assertEqual(Agg2#stock_agg.min_price, 25),
?assertEqual(Agg2#stock_agg.max_price, 26),

Agg3 = merge_agg(Agg, Agg2),
?assertEqual(Agg3#stock_agg.amount, 2583),

ok.

compare_agg_test() ->
Expand Down

0 comments on commit 36b4ca4

Please sign in to comment.