Skip to content

Commit

Permalink
Adding periodic clean so that we never lose merle connections, even i…
Browse files Browse the repository at this point in the history
…n the event of a bug
  • Loading branch information
jesse-lauro committed Aug 24, 2012
1 parent 1eae31a commit 83191ce
Showing 1 changed file with 65 additions and 7 deletions.
72 changes: 65 additions & 7 deletions src/merle_pool.erl
Expand Up @@ -2,7 +2,7 @@

%% Basically the same functionality than pg2, but process groups are local rather than global.
-export([create/1, delete/1, join/2, leave/2,
get_members/1, count_available/1,
get_members/1, count_available/1, clean_locks/0,
get_closest_pid/2, checkout_pid/1,
checkin_pid/1, checkin_pid/2,
which_groups/0]).
Expand All @@ -12,6 +12,12 @@
-define(PIDS_TABLE, merle_pool_pids).
-define(LOCKS_TABLE, merle_pool_locks).

-define(CLEAN_LOCKS_INTERVAL, 10000). % every 10 seconds

-record(server_state, {
periodic_lock_clean
}).

start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

Expand Down Expand Up @@ -73,6 +79,42 @@ count_available(Name) ->
{length(Members), NumAvail}
end.

clean_locks() ->
L = ets:tab2list(?PIDS_TABLE),

TotalCleaned = lists:foldl(fun(Pids, Acc) -> Acc + clean_locks(Pids) end, 0, L),

log4erl:error("Cleaned ~p merle locks", [TotalCleaned]),

TotalCleaned.

clean_locks(PoolPids) ->
{_, NowSecs, _} = erlang:now(),

NumCleaned = lists:foldl(
fun(Pid, Acc) ->
case ets:lookup(?LOCKS_TABLE, {Pid, use_count}) of
[{{Pid, use_count}, 0}] ->
Acc;
_ ->
case ets:lookup(?LOCKS_TABLE, {Pid, last_unlocked}) of
[{{Pid, last_unlocked}, LastUnlocked}] ->
case (LastUnlocked + ?CLEAN_LOCKS_INTERVAL) < NowSecs of
true ->
reset_lock(Pid, NowSecs),
Acc + 1;
false -> Acc
end;
_ -> Acc
end
end
end,
0,
PoolPids
),

NumCleaned.

get_closest_pid(random, Name) ->
case ets:lookup(?PIDS_TABLE, Name) of
[] ->
Expand Down Expand Up @@ -145,12 +187,28 @@ mark_used(Pid) ->

mark_unused(Pid) ->
ets:update_counter(?LOCKS_TABLE, {Pid, use_count}, -1).


reset_lock(Pid, NowSecs) ->
% create an entry that will represent a lock for this pid
ets:insert(?LOCKS_TABLE, {{Pid, use_count}, 0}),

% create an entry that will represent the last unlock time for this pid
ets:insert(?LOCKS_TABLE, {{Pid, last_unlocked}, NowSecs}).


init([]) ->
process_flag(trap_exit, true),
ets:new(?PIDS_TABLE, [set, public, named_table, {read_concurrency, true}]),
ets:new(?LOCKS_TABLE, [set, public, named_table, {write_concurrency, true}]),
{ok, []}.

PLC = timer:apply_interval(?CLEAN_LOCKS_INTERVAL, merle_pool, clean_locks, []),

State = #server_state {
periodic_lock_clean = PLC
},

{ok, State}.

handle_call({create, Name}, _From, S) ->
case ets:lookup(?PIDS_TABLE, Name) of
Expand All @@ -172,11 +230,8 @@ handle_call({join, Name, Pid}, _From, S) ->
ets:update_counter(?LOCKS_TABLE, {Name, rr_index}, 1),

% create an entry that will represent a lock for this pid
ets:insert(?LOCKS_TABLE, {{Pid, use_count}, 0}),

% create an entry that will represent the last unlock time for this pid
{_, NowSecs, _} = erlang:now(),
ets:insert(?LOCKS_TABLE, {{Pid, last_unlocked}, NowSecs}),
reset_lock(Pid, NowSecs),

% insert new pid into the table
ets:insert(?PIDS_TABLE, {Name, [Pid | Members]}),
Expand Down Expand Up @@ -217,9 +272,12 @@ handle_info({'EXIT', Pid, _} , S) ->
handle_info(_Info, S) ->
{noreply, S}.

terminate(_Reason, _S) ->
terminate(_Reason, #server_state{ periodic_lock_clean=PLC }) ->
ets:delete(?PIDS_TABLE),
ets:delete(?LOCKS_TABLE),

timer:cancel(PLC),

%%do not unlink, if this fails, dangling processes should be killed
ok.

Expand Down

0 comments on commit 83191ce

Please sign in to comment.