Skip to content

Commit

Permalink
add ptable: parallel etable foreach & fold
Browse files Browse the repository at this point in the history
  • Loading branch information
japerk committed Oct 24, 2009
1 parent bcce1ae commit f13092e
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 7 deletions.
4 changes: 2 additions & 2 deletions ebin/elib.app
@@ -1,11 +1,11 @@
{application, elib, [
{description, "Extended Erlang library modules"},
{vsn, "1.5.15"},
{vsn, "1.5.16"},
{mod, {elib, []}},
{registered, [elib_sup, timeout_server]},
{applications, [kernel, stdlib]},
{modules, [
datetime, elib, elib_sup, elists, emath, emod, estring, etable,
datetime, elib, elib_sup, elists, emath, emod, estring, etable, ptable,
etest, gen_cache, plists, uri_server, reltools, timeout_server, emp2,
delegation_server, baktools, rrule
]},
Expand Down
10 changes: 9 additions & 1 deletion ebin/elib.appup
@@ -1,5 +1,9 @@
{"1.5.15", [
{"1.5.16", [
% upgrade instructions
{"1.5.15", [
{load_module, etable},
{add_module, ptable}
]},
{"1.5.14", [{load_module, elists}]},
{"1.5.13", [{update, delegation_server, {advanced, []}}]},
{"1.5.12", [
Expand All @@ -11,6 +15,10 @@
{"1.5.10", [{load_module, etable}]}
], [
% downgrade instructions
{"1.5.15", [
{load_module, etable},
{add_module, ptable}
]},
{"1.5.14", [{load_module, elists}]},
{"1.5.13", [{update, delegation_server, {advanced, []}}]},
{"1.5.12", [
Expand Down
10 changes: 9 additions & 1 deletion src/elists.erl
Expand Up @@ -20,7 +20,15 @@

-module(elists).

-export([prepend/2, mapfilter/2, mapfilter_chain/2, sort_chain_generator/1, splitmany/2]).
-export([first/2, prepend/2, mapfilter/2, mapfilter_chain/2,
sort_chain_generator/1, splitmany/2]).

%% @doc Return the first element where F returns true, or none.
first(F, List) ->
case lists:dropwhile(fun(Elem) -> not F(Elem) end, List) of
[First | _] -> First;
[] -> none
end.

%% @equiv [Elem | List]
prepend(Elem, List) -> [Elem | List].
Expand Down
6 changes: 3 additions & 3 deletions src/etable.erl
Expand Up @@ -236,18 +236,18 @@ foldl(Context, F, Acc0, Table, Spec, NObject, Lock) ->
T = fun() ->
case mnesia:select(Table, Spec, NObject, Lock) of
'$end_of_table' -> Acc0;
{Objs, Cont} -> foldl2(F, Acc0, Cont, Objs)
{Objs, Cont} -> foldl2(F, Acc0, Objs, Cont)
end
end,

mnesia:activity(Context, T).

foldl2(F, Acc, Cont, Objs) ->
foldl2(F, Acc, Objs, Cont) ->
Acc2 = lists:foldl(F, Acc, Objs),

case mnesia:select(Cont) of
'$end_of_table' -> Acc2;
{More, Cont2} -> foldl2(F, Acc2, Cont2, More)
{More, Cont2} -> foldl2(F, Acc2, More, Cont2)
end.

%%%%%%%%%%%%%
Expand Down
121 changes: 121 additions & 0 deletions src/ptable.erl
@@ -0,0 +1,121 @@
%% @doc Like `etable', but uses plists for table chunks.
-module(ptable).

-export([foreach/4, foreach/5, foreach/6, foreach/7]).
-export([ets_foreach/4, ets_foreach/5]).
-export([fold/5, fold/6, fold/7, fold/8, fold/9]).
-export([nobjects/1]).

%%%%%%%%%%%%%
%% foreach %%
%%%%%%%%%%%%%

foreach(F, Table, Spec, Malt) ->
foreach(async_dirty, F, Table, Spec, read, Malt).

foreach(F, Table, Spec, write, Malt) ->
foreach(sync_transaction, F, Table, Spec, write, Malt);
foreach(F, Table, Spec, Lock, Malt) ->
foreach(async_dirty, F, Table, Spec, Lock, Malt).

foreach(Context, F, Table, Spec, Lock, Malt) ->
foreach(Context, F, Table, Spec, nobjects(Malt), Lock, Malt).

foreach(Context, F, Table, Spec, N, Lock, Malt) ->
T = fun() ->
case mnesia:select(Table, Spec, N, Lock) of
'$end_of_table' -> ok;
{Objs, Cont} -> foreach2(F, Objs, Malt, Cont)
end
end,

mnesia:activity(Context, T).

foreach2(F, Objs, Malt, Cont) ->
plists:foreach(F, Objs, Malt),

case mnesia:select(Cont) of
'$end_of_table' -> ok;
{More, Cont2} -> foreach2(F, More, Malt, Cont2)
end.

%%%%%%%%%%%%%%%%%
%% ets_foreach %%
%%%%%%%%%%%%%%%%%

ets_foreach(F, Table, Spec, Malt) ->
ets_foreach(F, Table, Spec, nobjects(Malt), Malt).

ets_foreach(F, Table, Spec, N, Malt) ->
case ets:select(Table, Spec, N) of
'$end_of_table' -> ok;
{Objs, Cont} -> ets_foreach2(F, Objs, Cont, Malt)
end.

ets_foreach2(F, Objs, Cont, Malt) ->
plists:foreach(F, Objs, Malt),

case ets:select(Cont) of
'$end_of_table' -> ok;
{More, Cont2} -> ets_foreach2(F, More, Cont2, Malt)
end.

%%%%%%%%%%
%% fold %%
%%%%%%%%%%

fold(F, Acc0, Table, Spec, Malt) ->
fold(async_dirty, F, Acc0, Table, Spec, read, F, Malt).

fold(F, Acc0, Table, Spec, Fuse, Malt) ->
fold(async_dirty, F, Acc0, Table, Spec, read, Fuse, Malt).

fold(F, Acc0, Table, Spec, write, Fuse, Malt) ->
fold(sync_transaction, F, Acc0, Table, Spec, write, Fuse, Malt);
fold(F, Acc0, Table, Spec, Lock, Fuse, Malt) ->
fold(async_dirty, F, Acc0, Table, Spec, Lock, Fuse, Malt).

fold(Context, F, Acc0, Table, Spec, Lock, Fuse, Malt) ->
fold(Context, F, Acc0, Table, Spec, nobjects(Malt), Lock, Fuse, Malt).

fold(Context, F, Acc0, Table, Spec, N, Lock, Fuse, Malt) ->
T = fun() ->
case mnesia:select(Table, Spec, N, Lock) of
'$end_of_table' -> Acc0;
{Objs, Cont} -> fold2(F, Acc0, Objs, Cont, Fuse, Malt)
end
end,

mnesia:activity(Context, T).

fold2(F, Acc, Objs, Cont, Fuse, Malt) ->
Acc2 = plists:fold(F, Fuse, Acc, Objs, Malt),

case mnesia:select(Cont) of
'$end_of_table' -> Acc2;
{More, Cont2} -> fold2(F, Acc2, More, Cont2, Fuse, Malt)
end.

%%%%%%%%%%%
%% utils %%
%%%%%%%%%%%

nobjects(Malt) ->
S = fun({_, Procs}) -> Procs; (_) -> 1 end,
% get total number of available processes
case proplists:get_value(nodes, Malt) of
undefined ->
NProcs = proplists:get_value(processes, Malt, 1);
Nodes ->
NProcs = lists:sum(lists:map(S, Nodes))
end,
% get length of sublists for each plist process
case elists:first(fun is_integer/1, Malt) of
none -> SubLen = 1;
SubLen -> ok
end,
% the number of objects to select is enough so there's 1 sublist for each
% available process. if only have 1 processor and want to process 1 item
% at a time, the result is 1 element from table at a time. if have
% 4 processors and want to process 100 elements at a time, result is 400.
NProcs * SubLen.

0 comments on commit f13092e

Please sign in to comment.