Skip to content

Commit

Permalink
Tweaked supervision tree for merle project, so that if process pool m…
Browse files Browse the repository at this point in the history
…gr dies, all outstanding merle watchers will also die
  • Loading branch information
jesse-lauro committed Aug 23, 2012
1 parent ba88778 commit 5a708f4
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/merle.erl
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ do_recv_stats(Timeout) ->
inet:setopts(Socket, ?TCP_OPTS_ACTIVE),
[{Field, Value} | do_recv_stats(Timeout)]
after Timeout ->
timeout
timeout
end.
%% @doc receive function for simple responses (not containing VALUEs)
recv_simple_reply(Timeout) ->
Expand Down
79 changes: 5 additions & 74 deletions src/merle_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ configure(MemcachedHosts, ConnectionsPerHost) ->
{M, B} = dynamic_compile:from_string(ModuleString),
code:load_binary(M, "", B),

% start all merle watchers
lists:foreach(
fun([Host, Port]) ->
lists:foreach(
fun(_) ->
supervisor:start_child(merle_sup, [[Host, Port]])
supervisor:start_child(merle_watcher_sup, [[Host, Port]])
end,
lists:seq(1, ConnectionsPerHost)
)
Expand All @@ -42,7 +43,7 @@ configure(MemcachedHosts, ConnectionsPerHost) ->
exec(Key, Fun, FullDefault, _ConnectionTimeout) ->
S = merle_cluster_dynamic:get_server(Key),

case local_pg2:get_closest_pid(round_robin, S) of
case merle_pool:get_closest_pid(round_robin, S) of
in_use ->
FullDefault;

Expand All @@ -55,78 +56,8 @@ exec(Key, Fun, FullDefault, _ConnectionTimeout) ->

merle_watcher:demonitor(P),

local_pg2:checkin_pid(P),
merle_pool:checkin_pid(P),

Value

end.

%exec(Key, Fun, FullDefault, ConnectionTimeout) ->
% S = merle_cluster_dynamic:get_server(Key),
%
% FromPid = self(),
%
% ConnFetchPid = spawn(
% fun() ->
%
% MerleConn = local_pg2:get_closest_pid(round_robin, S),
% MonitorRef = erlang:monitor(process, FromPid),
%
% FromPid ! {merle_watcher, MerleConn},
%
% receive
% {'DOWN', MonitorRef, _, _, _} ->
%
% log4erl:info("Merle connection fetch process received 'DOWN' message"),
%
% ok;
%
% done ->
%
% ok;
%
% Other ->
%
% log4erl:error("Merle connection unexpected message ~p", [Other])
%
% after 1000 ->
%
% log4erl:error("Merle connection fetch process timed out")
%
% end,
%
% local_pg2:checkin_pid(MerleConn),
%
% true = erlang:demonitor(MonitorRef)
%
% end
% ),
%
% ReturnValue = receive
% {merle_watcher, in_use} ->
% log4erl:info("Merle pool is full!"),
%
% ConnFetchPid ! done,
%
% FullDefault;
%
% {merle_watcher, P} ->
% log4erl:info("Merle found connection!"),
%
% MC = merle_watcher:merle_connection(P),
%
% Value = Fun(MC, Key),
%
% ConnFetchPid ! done,
%
% Value
%
% after ConnectionTimeout ->
% log4erl:error("Merle timed out while trying to retrieve connection!"),
%
% ConnFetchPid ! done,
%
% FullDefault
% end,
%
% ReturnValue.
end.
91 changes: 34 additions & 57 deletions src/local_pg2.erl → src/merle_pool.erl
Original file line number Diff line number Diff line change
@@ -1,64 +1,53 @@
-module(local_pg2).
-module(merle_pool).

%% Basically the same functionality than pg2, but process groups are local rather than global.
-export([create/1, delete/1, join/2, leave/2, get_members/1, get_closest_pid/2, checkout_pid/1, checkin_pid/1, which_groups/0]).

-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-define(TABLE, local_pg2_table).
-define(INDEXES_TABLE, local_pg2_indexes_table).
-define(PIDS_TABLE, merle_pool_pids).
-define(LOCKS_TABLE, merle_pool_counts).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

start() ->
ensure_started().

create(Name) ->
ensure_started(),
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
gen_server:call(?MODULE, {create, Name});
_ ->
ok
end.

delete(Name) ->
ensure_started(),
gen_server:call(?MODULE, {delete, Name}).

join(Name, Pid) when is_pid(Pid) ->
ensure_started(),
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{error, {no_such_group, Name}};
_ ->
gen_server:call(?MODULE, {join, Name, Pid})
end.

leave(Name, Pid) when is_pid(Pid) ->
ensure_started(),
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{error, {no_such_group, Name}};
_ ->
gen_server:call(?MODULE, {leave, Name, Pid})
end.

get_members(Name) ->
ensure_started(),
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] -> {error, {no_such_group, Name}};
[{Name, Members}] -> Members
end.
which_groups() ->
ensure_started(),
[K || {K, _Members} <- ets:tab2list(?TABLE)].

get_closest_pid(random, Name) ->
ensure_started(),
[K || {K, _Members} <- ets:tab2list(?PIDS_TABLE)].

case ets:lookup(?TABLE, Name) of
get_closest_pid(random, Name) ->
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{error, {no_process, Name}};
[{Name, Members}] ->
Expand All @@ -73,9 +62,9 @@ get_closest_pid(random, Name) ->

get_closest_pid(round_robin, Name) ->
% Get the round robin index
RoundRobinIndex = ets:update_counter(?INDEXES_TABLE, {Name, rr_index}, 1),
RoundRobinIndex = ets:update_counter(?LOCKS_TABLE, {Name, rr_index}, 1),

case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{error, {no_process, Name}};
[{Name, Members}] ->
Expand All @@ -88,12 +77,11 @@ checkout_pid(Pid) ->
checkout_pid(Pid, false).

checkout_pid(Pid, CheckBackIn) ->
UseCount = ets:update_counter(?INDEXES_TABLE, {Pid, use_count}, 1),
UseCount = ets:update_counter(?LOCKS_TABLE, {Pid, use_count}, 1),

case UseCount =< 1 of
true -> Pid;
false ->

if
CheckBackIn -> checkin_pid(Pid);
true -> ok
Expand All @@ -106,41 +94,41 @@ checkin_pid(in_use) -> ok;
checkin_pid(Pid) ->
case is_process_alive(Pid) of
true ->
ets:update_counter(?INDEXES_TABLE, {Pid, use_count}, -1);
ets:update_counter(?LOCKS_TABLE, {Pid, use_count}, -1);
false ->
no_proc
end.

init([]) ->
process_flag(trap_exit, true),
ets:new(?TABLE, [set, public, named_table, {read_concurrency, true}]),
ets:new(?INDEXES_TABLE, [set, public, named_table, {write_concurrency, true}]),
ets:new(?PIDS_TABLE, [set, public, named_table, {read_concurrency, true}]),
ets:new(?LOCKS_TABLE, [set, public, named_table, {write_concurrency, true}]),
{ok, []}.

handle_call({create, Name}, _From, S) ->
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
ets:insert(?INDEXES_TABLE, {{Name, rr_index}, 0}),
ets:insert(?TABLE, {Name, []});
ets:insert(?LOCKS_TABLE, {{Name, rr_index}, 0}),
ets:insert(?PIDS_TABLE, {Name, []});
_ ->
ok
end,
{reply, ok, S};

handle_call({join, Name, Pid}, _From, S) ->
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{reply, no_such_group, S};
[{Name, Members}] ->

% NOTE: skip one index since we are about to grow the list, this prevents collisions
ets:update_counter(?INDEXES_TABLE, {Name, rr_index}, 1),
ets:update_counter(?LOCKS_TABLE, {Name, rr_index}, 1),

% create an entry that will represent a lock for this pid
ets:insert(?INDEXES_TABLE, {{Pid, use_count}, 0}),
ets:insert(?LOCKS_TABLE, {{Pid, use_count}, 0}),

% insert new pid into the table
ets:insert(?TABLE, {Name, [Pid | Members]}),
ets:insert(?PIDS_TABLE, {Name, [Pid | Members]}),

link(Pid),

Expand All @@ -149,22 +137,22 @@ handle_call({join, Name, Pid}, _From, S) ->
end;

handle_call({leave, Name, Pid}, _From, S) ->
case ets:lookup(?TABLE, Name) of
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
{reply, no_such_group, S};
[{Name, Members}] ->
case lists:delete(Pid, Members) of
[] ->
ets:delete(?TABLE, Name);
ets:delete(?PIDS_TABLE, Name);
NewMembers ->
ets:insert(?TABLE, {Name, NewMembers})
ets:insert(?PIDS_TABLE, {Name, NewMembers})
end,
unlink(Pid),
{reply, ok, S}
end;

handle_call({delete, Name}, _From, S) ->
ets:delete(?TABLE, Name),
ets:delete(?PIDS_TABLE, Name),
{reply, ok, S}.

handle_cast(_Cast, S) ->
Expand All @@ -179,38 +167,27 @@ handle_info(_Info, S) ->
{noreply, S}.

terminate(_Reason, _S) ->
ets:delete(?TABLE),
ets:delete(?INDEXES_TABLE),
ets:delete(?PIDS_TABLE),
ets:delete(?LOCKS_TABLE),
%%do not unlink, if this fails, dangling processes should be killed
ok.

%%%-----------------------------------------------------------------
%%% Internal functions
%%%-----------------------------------------------------------------
del_member(Pid) ->
L = ets:tab2list(?TABLE),
L = ets:tab2list(?PIDS_TABLE),
lists:foreach(fun(Elem) -> del_member_func(Elem, Pid) end, L).

del_member_func({Name, Members}, Pid) ->
case lists:member(Pid, Members) of
true ->
case lists:delete(Pid, Members) of
[] ->
ets:delete(?TABLE, Name);
ets:delete(?PIDS_TABLE, Name);
NewMembers ->
ets:insert(?TABLE, {Name, NewMembers})
ets:insert(?PIDS_TABLE, {Name, NewMembers})
end;
false ->
ok
end.

ensure_started() ->
case whereis(?MODULE) of
undefined ->
C = {local_pg2, {?MODULE, start_link, []}, permanent,
1000, worker, [?MODULE]},
supervisor:start_child(kernel_safe_sup, C);
Pg2Pid ->
{ok, Pg2Pid}
end.

end.
26 changes: 14 additions & 12 deletions src/merle_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@

-export([start_link/2, init/1]).

-export([start_child/1]).

-behaviour(supervisor).

start_link(Instances, ConnectionsPerInstance) ->
{ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
local_pg2:start(),
merle_cluster:configure(Instances, ConnectionsPerInstance),
{ok, Pid}.
supervisor:start_link({local, ?MODULE}, ?MODULE, [Instances, ConnectionsPerInstance]).

init([Instances, ConnectionsPerInstance]) ->
MerlePool =
{merle_pool,
{merle_pool, start_link, []},
permanent, 5000, worker, dynamic
},

start_child(N) ->
supervisor:start_child(?MODULE, [N]).
MerleWatcherSup =
{merle_watcher_sup,
{merle_watcher_sup, start_link, [Instances, ConnectionsPerInstance]},
permanent, 5000, supervisor, dynamic
},

init([]) ->
MCDSpec = {mcd, {merle_watcher, start_link, []},
permanent, 5000, worker, dynamic},
{ok, {{simple_one_for_one, 10, 10}, [MCDSpec]}}.
{ok, {{one_for_all, 10, 10}, [MerlePool, MerleWatcherSup]}}.
Loading

0 comments on commit 5a708f4

Please sign in to comment.