From 8ce62226b5386a9ed4450ffca544aa8c19c2fa8a Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Mon, 11 Dec 2017 17:16:05 +0100 Subject: [PATCH] Making locks_leader pass locks-test Ref. https://github.com/ten0s/locks-test Fixes include adding an election ref value, to sync with cands. This value is kept when the leader enters the safe loop. Also, an 'assert_leader' message is introduced, to ensure that candidates are kicked out of the safe loop when a leader acquires a new lock and remains the leader. Also, a locks_agent:async_await_all_locks/1 function was added to ensure that the leader gets told that it has all locks in a contested scenario. --- src/locks_agent.erl | 37 ++++++-- src/locks_leader.erl | 206 ++++++++++++++++++++++++++++++------------- 2 files changed, 176 insertions(+), 67 deletions(-) diff --git a/src/locks_agent.erl b/src/locks_agent.erl index b6972df..296d6da 100755 --- a/src/locks_agent.erl +++ b/src/locks_agent.erl @@ -48,6 +48,7 @@ lock_objects/2, surrender_nowait/4, await_all_locks/1, + async_await_all_locks/1, monitor_nodes/2, change_flag/3, lock_info/1, @@ -102,14 +103,17 @@ claim_no = 0, require = all}). +-type monitored_nodes() :: [{node(), reference()}]. +-type down_nodes() :: [node()]. + -record(state, { locks :: ets:tab(), agents :: ets:tab(), interesting = [] :: [lock_id()], claim_no = 0 :: integer(), requests :: ets:tab(), - down = [] :: [node()], - monitored = [] :: [{node(), reference()}], + down = [] :: down_nodes(), + monitored = [] :: monitored_nodes(), await_nodes = false :: boolean(), monitor_nodes = false :: boolean(), pending :: ets:tab(), @@ -332,6 +336,9 @@ begin_transaction(Objects, Opts) -> await_all_locks(Agent) -> call(Agent, await_all_locks,infinity). +async_await_all_locks(Agent) -> + cast(Agent, {await_all_locks, self()}). + -spec monitor_nodes(agent(), boolean()) -> boolean(). %% @doc Toggles monitoring of nodes, like net_kernel:monitor_nodes/1. %% @@ -391,6 +398,9 @@ change_flag(Agent, Option, Bool) is_boolean(Bool), Option == notify -> gen_server:cast(Agent, {option, Option, Bool}). +cast(Agent, Msg) -> + gen_server:cast(Agent, Msg). + cast(Agent, Msg, Ref) -> gen_server:cast(Agent, Msg), await_reply(Ref). @@ -541,6 +551,15 @@ handle_cast({lock, Object, Mode, Nodes, Require, Wait, Client, Tag} = _Req, {true, S1} -> {noreply, check_if_done(S1)} end; +handle_cast({await_all_locks, Pid}, + #state{status = Status, awaiting_all = Aw} = State) -> + case Status of + {have_all_locks, _} -> + {noreply, notify_have_all( + State#state{awaiting_all = [{Pid,async}|Aw]})}; + _ -> + {noreply, check_if_done(add_waiter(wait, Pid, async, State))} + end; handle_cast({surrender, O, ToAgent, Nodes} = _Req, S) -> ?event(_Req, S), case lists:all( @@ -926,9 +945,14 @@ have_all(#state{have_all = Prev, claim_no = Cl} = State) -> notify_have_all(State#state{have_all = true, claim_no = Cl1}). notify_have_all(#state{awaiting_all = Aw, status = Status} = S) -> - [gen_server:reply(W, Status) || W <- Aw], + [reply_await_(W, Status) || W <- Aw], S#state{awaiting_all = []}. +reply_await_({Pid, notify}, Status) -> + notify_(Pid, Status); +reply_await_(From, Status) -> + gen_server:reply(From, Status). + abort_on_deadlock(OID, State) -> notify({abort, Reason = {deadlock, OID}}, State), error(Reason). @@ -939,11 +963,14 @@ notify_msgs([M|Ms], S) -> notify_msgs([], S) -> S. - notify(Msg, #state{notify = Notify} = State) -> - [P ! {?MODULE, self(), Msg} || P <- Notify], + [notify_(P, Msg) || P <- Notify], State. +notify_(P, Msg) -> + P ! {?MODULE, self(), Msg}. + + handle_locks(#state{have_all = true} = State) -> %% If we have all locks we've asked for, no need to search for potential %% deadlocks - reasonably, we cannot be involved in one. diff --git a/src/locks_leader.erl b/src/locks_leader.erl index f6706d8..b7f3b34 100644 --- a/src/locks_leader.erl +++ b/src/locks_leader.erl @@ -63,7 +63,6 @@ %% @end -module(locks_leader). -behaviour(gen_server). --compile(export_all). -export([start_link/2, start_link/3, start_link/4, call/2, call/3, @@ -123,6 +122,7 @@ lock, agent, leader, + election_ref, nodes = ordsets:new(), candidates = [], workers = [], @@ -199,7 +199,9 @@ leader(#st{leader = L}) -> %% This function is mainly present for compatibility with `gen_leader'. %% @end leader_node(#st{leader = L}) when is_pid(L) -> - node(L). + node(L); +leader_node(#st{}) -> + undefined. -spec reply({pid(), any()}, any()) -> ok. %% @doc Corresponds to `gen_server:reply/2'. @@ -236,9 +238,10 @@ broadcast(_, _) -> %% since it may cause sequencing issues with the broadcast message that is %% (normally) sent once the `Mod:elected/3' function returns. %% @end -broadcast_to_candidates(Msg, #st{leader = L, synced = Cands}) +broadcast_to_candidates(Msg, #st{leader = L, synced = Cands, + election_ref = ERef}) when L == self() -> - do_broadcast_(Cands, msg(from_leader, Msg)); + do_broadcast_(Cands, msg(from_leader, ERef, Msg)); broadcast_to_candidates(_, _) -> error(not_leader). @@ -491,14 +494,17 @@ abort_init(Reason, Parent) -> exit(Reason). noreply(#st{leader = undefined} = S) -> + put('$locks_leader_state', S), safe_loop(S); noreply(#st{initial = false} = S) -> + put('$locks_leader_state', S), {noreply, S}; noreply(#st{initial = true, regname = R, gen_server_opts = Opts} = S) -> %% The very first time we're out of the safe_loop() we have to %% *become* a gen_server (since we started using only proc_lib). %% Set initial = false to ensure it only happens once. S1 = S#st{initial = false}, + put('$locks_leader_state', S1), if R == undefined -> gen_server:enter_loop(?MODULE, Opts, S1); true -> gen_server:enter_loop(?MODULE, Opts, S1, {local,R}) end; @@ -526,15 +532,26 @@ safe_loop(#st{agent = A} = S) -> #locks_info{} = I -> % if worker - direct from locks_server ?event(I, S), noreply(locks_info(I, S)); - {?MODULE, am_leader, L, LeaderMsg} = _Msg -> + {?MODULE, am_leader, L, ERef, LeaderMsg} = _Msg -> ?event(_Msg, S), - noreply(leader_announced(L, LeaderMsg, S)); - {?MODULE, from_leader, L, LeaderMsg} = _Msg -> + noreply(leader_announced(L, ERef, LeaderMsg, S)); + {?MODULE, from_leader, L, ERef, LeaderMsg} = _Msg -> ?event(_Msg, S), - noreply(from_leader(L, LeaderMsg, S)); + noreply(from_leader(L, ERef, LeaderMsg, S)); {?MODULE, am_worker, W} = _Msg -> ?event(_Msg, S), noreply(worker_announced(W, S)); + {?MODULE, leader_uncertain, _L, _Synced, _SyncedWs} = _Msg -> + ?event(_Msg, S), + noreply(S); + {?MODULE, affirm_leader, L, Ref} = _Msg -> + ?event({in_safe_loop, _Msg}, S), + noreply(leader_affirmed(L, Ref, S)); + {?MODULE, ensure_sync, _, _} = _Msg -> + ?event({in_safe_loop, _Msg}, S), + noreply(S); + {'$gen_call', From, '$locks_leader_debug'} -> + handle_call('$locks_leader_debug', From, S); {'$gen_call', From, '$info'} -> handle_call('$locks_leader_info', From, S); {'$gen_call', From, {'$locks_leader_info', Item}} -> @@ -570,27 +587,52 @@ handle_info({locks_agent, A, Info} = _Msg, #st{agent = A} = S) -> #locks_info{} -> noreply(locks_info(Info, S)); waiting when S#st.leader == self() -> ?event(clearing_leader), - send_all(S, {?MODULE, leader_uncertain, self(), nodes()}), - noreply(S#st{leader = undefined, synced = []}); + noreply(set_leader_uncertain(S)); _ -> noreply(S) end; -handle_info({?MODULE, leader_uncertain, L, Nodes}, #st{leader = L} = S) -> - %% Enter safe_loop, since our leader has done so. - ?event({leader_uncertain, L, Nodes}, S), - noreply(S#st{leader = undefined}); +handle_info({?MODULE, leader_uncertain, L, Synced, SyncedWs}, S) -> + ?event({leader_uncertain, {{L, Synced, SyncedWs}, S#st.leader}}), + case S#st.leader of + MyL when MyL == self() -> + lists:foldl( + fun({Pid, Type}, Sx) -> + maybe_announce_leader( + Pid, Type, remove_synced(Pid, Type, Sx)) + end, S, + [{P,candidate} || P <- [L|Synced]] + ++ [{P,worker} || P <- SyncedWs]); + L -> + locks_agent:change_flag(S#st.agent, notify, true), + noreply(S#st{leader = undefined, + synced = [], synced_workers = []}); + _OtherL -> + noreply(S) + end; +handle_info({?MODULE, affirm_leader, L, ERef} = _Msg, #st{} = S) -> + ?event(_Msg, S), + noreply(leader_affirmed(L, ERef, S)); +handle_info({?MODULE, ensure_sync, Pid, Type} = _Msg, #st{} = S) -> + ?event(_Msg, S), + S1 = case S#st.leader of + Me when Me == self() -> + maybe_announce_leader(Pid, Type, S); + _ -> + S + end, + noreply(S1); handle_info({?MODULE, am_worker, W} = _Msg, #st{} = S) -> ?event({handle_info, _Msg}, S), noreply(worker_announced(W, S)); handle_info(#locks_info{lock = #lock{object = Lock}} = I, #st{lock = Lock} = S) -> {noreply, locks_info(I, S)}; -handle_info({?MODULE, am_leader, L, LeaderMsg} = _M, S) -> +handle_info({?MODULE, am_leader, L, ERef, LeaderMsg} = _M, S) -> ?event({handle_info, _M}, S), - noreply(leader_announced(L, LeaderMsg, S)); -handle_info({?MODULE, from_leader, L, LeaderMsg} = _M, S) -> + noreply(leader_announced(L, ERef, LeaderMsg, S)); +handle_info({?MODULE, from_leader, L, ERef, LeaderMsg} = _M, S) -> ?event({handle_info, _M}, S), - noreply(from_leader(L, LeaderMsg, S)); + noreply(from_leader(L, ERef, LeaderMsg, S)); handle_info({Ref, {'$locks_leader_reply', Reply}} = _M, #st{buffered = Buf} = S) -> ?event({handle_info, _M}, S), @@ -627,6 +669,17 @@ handle_call(Req, {_, {?MODULE, _Ref}} = From, noreply( callback_reply(M:handle_call(Req, From, MSt, opaque(S)), From, fun unchanged/1, S)); +handle_call('$locks_leader_debug', From, S) -> + I = [{leader, leader(S)}, + {leader_node, leader_node(S)}, + {candidates, candidates(S)}, + {new_candidates, new_candidates(S)}, + {workers, workers(S)}, + {module, S#st.mod}, + {mod_state, S#st.mod_state}, + {process_info, process_info(self())}], + gen_server:reply(From, I), + noreply(S); handle_call('$locks_leader_info', From, S) -> I = [{leader, leader(S)}, {leader_node, leader_node(S)}, @@ -692,6 +745,10 @@ nodeup(N, #st{nodes = Nodes} = S) -> include_node(N, #st{agent = A, lock = Lock, nodes = Nodes} = S) -> ?event({include_node, N}), + case ordsets:is_element(N, nodes()) of + true -> ok; + false -> asynch_ping(N) + end, locks_agent:lock_nowait(A, Lock, write, [N], all_alive), S#st{nodes = ordsets:add_element(N, Nodes)}. @@ -727,7 +784,8 @@ down({'DOWN', Ref, _, Pid, _} = Msg, S1 = if Pid == LPid -> [gen_server:reply(From,'$leader_died') || {_, From} <- S#st.buffered], - S#st{leader = undefined, buffered = []}; + S#st{leader = undefined, buffered = [], + synced = [], synced_workers = []}; true -> S end, maybe_remove_cand(Type, Pid, S1) @@ -757,18 +815,19 @@ monitor_cand(Client) -> maybe_announce_leader(Pid, Type, #st{leader = L, mod = M, mod_state = MSt} = S) -> ?event({maybe_announce_leader, Pid, Type}, S), + ERef = S#st.election_ref, IsSynced = is_synced(Pid, Type, S), if L == self(), IsSynced == false -> case M:elected(MSt, opaque(S), Pid) of {reply, Msg, MSt1} -> - Pid ! msg(am_leader, Msg), + Pid ! msg(am_leader, ERef, Msg), mark_as_synced(Pid, Type, S#st{mod_state = MSt1}); {ok, Msg, MSt1} -> - Pid ! msg(am_leader, Msg), + Pid ! msg(am_leader, ERef, Msg), S1 = do_broadcast(S#st{mod_state = MSt1}, Msg), mark_as_synced(Pid, Type, S1); {ok, AmLdrMsg, FromLdrMsg, MSt1} -> - Pid ! msg(am_leader, AmLdrMsg), + Pid ! msg(am_leader, ERef, AmLdrMsg), S1 = do_broadcast(S#st{mod_state = MSt1}, FromLdrMsg), mark_as_synced(Pid, Type, S1); {surrender, Other, MSt1} -> @@ -776,7 +835,7 @@ maybe_announce_leader(Pid, Type, #st{leader = L, mod = M, true -> locks_agent:surrender_nowait( S#st.agent, S#st.lock, Other, S#st.nodes), - S#st{mod_state = MSt1, leader = undefined}; + set_leader_undefined(S#st{mod_state = MSt1}); false -> error({cannot_surrender, Other}) end @@ -785,6 +844,10 @@ maybe_announce_leader(Pid, Type, #st{leader = L, mod = M, S end. +set_leader_undefined(#st{} = S) -> + S#st{leader = undefined, + synced = [], synced_workers = []}. + is_synced(Pid, worker, #st{synced_workers = Synced}) -> lists:member(Pid, Synced); is_synced(Pid, candidate, #st{synced = Synced}) -> @@ -795,6 +858,11 @@ mark_as_synced(Pid, worker, #st{synced_workers = Synced} = S) -> mark_as_synced(Pid, candidate, #st{synced = Synced} = S) -> S#st{synced = [Pid|Synced]}. +remove_synced(Pid, worker, #st{synced_workers = Synced} = S) -> + S#st{synced_workers = Synced -- [Pid]}; +remove_synced(Pid, candidate, #st{synced = Synced} = S) -> + S#st{synced = Synced -- [Pid]}. + maybe_remove_cand(candidate, Pid, #st{candidates = Cs, synced = Synced, leader = L, mod = M, mod_state = MSt} = S) -> @@ -815,11 +883,13 @@ become_leader(#st{agent = A} = S) -> end, S, Locks), become_leader_(S1). -become_leader_(#st{leader = L, mod = M, mod_state = MSt, +become_leader_(#st{election_ref = {L,_}, mod = M, mod_state = MSt, candidates = Cands, synced = Synced, - workers = Ws, synced_workers = SyncedWs} = S) - when L == self() -> + workers = Ws, synced_workers = SyncedWs} = S0) + when L =:= self() -> + S = S0#st{leader = self()}, ?event(become_leader_again, S), + send_all(S, {?MODULE, affirm_leader, self(), S#st.election_ref}), case {Cands -- Synced, Ws -- SyncedWs} of {[], []} -> S; _ -> @@ -838,23 +908,22 @@ become_leader_(#st{leader = L, mod = M, mod_state = MSt, do_broadcast(S1, FromLeaderMsg), AmLeaderMsg) end end; -become_leader_(#st{mod = M, mod_state = MSt} = S) -> +become_leader_(#st{mod = M, mod_state = MSt} = S0) -> + S = S0#st{election_ref = {self(),erlang:monotonic_time(microsecond)}}, ?event(become_leader, S), case M:elected(MSt, opaque(S), undefined) of {ok, Msg, MSt1} -> do_broadcast_new( - do_broadcast(S#st{mod_state = MSt1, leader = self()}, Msg), Msg); + S#st{mod_state = MSt1, leader = self(), + synced = [], synced_workers = []}, Msg); {error, Reason} -> error(Reason) end. -union(A, B) -> - A ++ (B -- A). - -msg(from_leader, Msg) -> - {?MODULE, from_leader, self(), Msg}; -msg(am_leader, Msg) -> - {?MODULE, am_leader, self(), Msg}. +msg(from_leader, ERef, Msg) -> + {?MODULE, from_leader, self(), ERef, Msg}; +msg(am_leader, ERef, Msg) -> + {?MODULE, am_leader, self(), ERef, Msg}. %% opaque(#st{candidates = Cands, workers = Ws, leader = L}) -> %% fun(candidates) -> Cands; @@ -898,17 +967,17 @@ callback_reply({stop, Reason, Reply, MSt}, From, F, S) -> callback_reply({stop, Reason, MSt}, _, _, S) -> {stop, Reason, S#st{mod_state = MSt}}. -do_broadcast_new(#st{candidates = Cands, synced = Synced, - workers = Ws, synced_workers = SyncedWs} = S, Msg) -> +do_broadcast_new(#st{election_ref = ERef, candidates = Cands, workers = Ws, + synced = Synced, synced_workers = SyncedWs} = S, Msg) -> NewCands = Cands -- Synced, NewWs = Ws -- SyncedWs, - AmLeader = msg(am_leader, Msg), + AmLeader = msg(am_leader, ERef, Msg), do_broadcast_(NewCands, AmLeader), do_broadcast_(NewWs, AmLeader), S#st{synced = Cands, synced_workers = Ws}. do_broadcast(#st{synced = Synced, synced_workers = SyncedWs} = S, Msg) -> - FromLeader = msg(from_leader, Msg), + FromLeader = msg(from_leader, S#st.election_ref, Msg), do_broadcast_(Synced, FromLeader), do_broadcast_(SyncedWs, FromLeader), S. @@ -921,43 +990,56 @@ do_broadcast_(Pids, Msg) when is_list(Pids) -> [P ! Msg || P <- Pids], ok. -from_leader(L, Msg, #st{leader = L, mod = M, mod_state = MSt} = S) -> +from_leader(L, ERef, Msg, #st{leader = L, election_ref = ERef, + mod = M, mod_state = MSt} = S) -> callback(M:from_leader(Msg, MSt, opaque(S)), S); -from_leader(_OtherL, _Msg, S) -> +from_leader(_OtherL, _ERef, _Msg, S) -> ?event({ignoring_from_leader, _OtherL, _Msg}, S), S. -leader_announced(L, Msg, #st{leader = L, mod = M, mod_state = MSt} = S) -> +leader_announced(L, ERef, Msg, #st{leader = L, election_ref = ERef, + mod = M, mod_state = MSt} = S) -> callback(M:surrendered(MSt, Msg, opaque(S)), S#st{synced = [], synced_workers = []}); -leader_announced(L, Msg, #st{mod = M, mod_state = MSt} = S) -> +leader_announced(L, ERef, Msg, #st{mod = M, mod_state = MSt} = S) -> %% Ref = erlang:monitor(process, L), %% put({?MODULE,monitor,Ref}, candidate), - S1 = S#st{leader = L, synced = [], synced_workers = []}, + S1 = S#st{leader = L, election_ref = ERef, + synced = [], synced_workers = []}, callback(M:surrendered(MSt, Msg, opaque(S1)), S1). +leader_affirmed(L, ERef, #st{election_ref = ERef} = S) -> + S#st{leader = L}; +leader_affirmed(_L, _ERef, #st{leader = Me} = S) when Me == self() -> + set_leader_uncertain(S); +leader_affirmed(L, _ERef, #st{} = S) -> + %% don't set election_ref, since we are not yet synced + L ! {?MODULE, ensure_sync, self(), S#st.role}, + S#st{leader = L}. + +set_leader_uncertain(#st{agent = A} = S) -> + send_all(S, {?MODULE, leader_uncertain, self(), + S#st.synced, S#st.synced_workers}), + locks_agent:async_await_all_locks(A), + S#st{leader = undefined}. + worker_announced(W, #st{workers = Workers} = S) -> case lists:member(W, Workers) of - true -> - S; - false -> - Ref = erlang:monitor(process, W), - put({?MODULE,monitor,Ref}, worker), - maybe_announce_leader(W, worker, S#st{workers = [W|Workers]}) - end. - -get_opt(K, Opts) -> - case lists:keyfind(K, 1, Opts) of - {_, Val} -> - Val; - false -> - error({required, K}) + true -> + S; + false -> + Ref = erlang:monitor(process, W), + put({?MODULE,monitor,Ref}, worker), + maybe_announce_leader(W, worker, S#st{workers = [W|Workers]}) end. get_opt(K, Opts, Default) -> case lists:keyfind(K, 1, Opts) of - {_, V} -> - V; - false -> - Default + {_, V} -> + V; + false -> + Default end. + +asynch_ping(N) -> + rpc:cast(N, erlang, is_atom, [true]).