diff --git a/Makefile b/Makefile index b989450..84df7f0 100644 --- a/Makefile +++ b/Makefile @@ -22,10 +22,10 @@ examples: compile rebar compile test: - rebar clean compile eunit + rebar clean compile eunit ct test_debug: - rebar clean compile eunit eunit_compile_opts="\[\{d,'DEBUG'\}\]" + rebar clean compile eunit ct eunit_compile_opts="\[\{d,'DEBUG'\}\]" doc: rebar get-deps compile doc diff --git a/examples/src/test_cb.erl b/examples/src/test_cb.erl index e55c4f2..0d3fda2 100644 --- a/examples/src/test_cb.erl +++ b/examples/src/test_cb.erl @@ -51,10 +51,18 @@ terminate/2, code_change/4]). --record(st, {am_leader = false, +-export([record_fields/1]). + +-record(cb, {am_leader = false, dict}). --define(event(E), event(?LINE, E)). +-define(event(E), event(?LINE, E, none)). +-define(event(E, S), event(?LINE, E, S)). + +record_fields(cb) -> record_info(fields, cb); +record_fields(st) -> locks_leader:record_fields(st); +record_fields(_) -> + no. %% @spec init(Arg::term()) -> {ok, State} %% @@ -64,7 +72,7 @@ %% init(Dict) -> ?event({init, Dict}), - {ok, #st{dict = Dict}}. + {ok, #cb{dict = Dict}}. %% @spec elected(State::state(), I::info(), Cand::pid() | undefined) -> %% {ok, Broadcast, NState} @@ -114,27 +122,26 @@ init(Dict) -> %% Example: %% %%
-%%   elected(#st{dict = Dict} = St, _I, undefined) ->
+%%   elected(#cb{dict = Dict} = St, _I, undefined) ->
 %%       {ok, Dict, St};
-%%   elected(#st{dict = Dict} = St, _I, Pid) when is_pid(Pid) ->
+%%   elected(#cb{dict = Dict} = St, _I, Pid) when is_pid(Pid) ->
 %%       %% reply only to Pid
 %%       {reply, Dict, St}.
 %% 
%% @end %% -elected(#st{dict = Dict} = S, I, undefined) -> - ?event(elected_leader), +elected(#cb{dict = Dict} = S, I, _) -> + ?event({elected_leader, I}), case locks_leader:new_candidates(I) of [] -> - ?event({elected, Dict}), - {ok, {sync, Dict}, S#st{am_leader = true}}; + ?event({elected, Dict}, S), + {ok, {sync, Dict}, S#cb{am_leader = true}}; Cands -> ?event({new_candidates, Cands}), NewDict = merge_dicts(Dict, I), - {ok, {sync, NewDict}, S#st{am_leader = true, dict = NewDict}} - end; -elected(#st{dict = Dict} = S, _E, Pid) when is_pid(Pid) -> - {reply, {sync, Dict}, S#st{am_leader = true}}. + ?event({merge_result, NewDict}), + {ok, {sync, NewDict}, S#cb{am_leader = true, dict = NewDict}} + end. %% This is sub-optimal, but it's only an example! merge_dicts(D, I) -> @@ -167,9 +174,9 @@ merge_dicts(D, I) -> %% {ok, LeaderDict}. %% %% @end -surrendered(#st{dict = _OurDict} = S, {sync, LeaderDict}, _I) -> +surrendered(#cb{dict = _OurDict} = S, {sync, LeaderDict}, _I) -> ?event({surrendered, LeaderDict}), - {ok, S#st{dict = LeaderDict, am_leader = false}}. + {ok, S#cb{dict = LeaderDict, am_leader = false}}. %% @spec handle_DOWN(Candidate::pid(), State::state(), I::info()) -> %% {ok, NState} | {ok, Broadcast, NState} @@ -206,10 +213,10 @@ handle_DOWN(_Pid, S, _I) -> %% Example: %% %%
-%%   handle_leader_call({store,F}, From, #st{dict = Dict} = S, E) ->
+%%   handle_leader_call({store,F}, From, #cb{dict = Dict} = S, E) ->
 %%       NewDict = F(Dict),
-%%       {reply, ok, {store, F}, S#st{dict = NewDict}};
-%%   handle_leader_call({leader_lookup,F}, From, #st{dict = Dict} = S, E) ->
+%%       {reply, ok, {store, F}, S#cb{dict = NewDict}};
+%%   handle_leader_call({leader_lookup,F}, From, #cb{dict = Dict} = S, E) ->
 %%       Reply = F(Dict),
 %%       {reply, Reply, S}.
 %% 
@@ -221,14 +228,16 @@ handle_DOWN(_Pid, S, _I) -> %% leader; normally, lookups are served locally and updates by the leader, %% which can lead to race conditions. %% @end -handle_leader_call({store,F} = Op, _From, #st{dict = Dict} = S, _I) -> +handle_leader_call({store,F} = Op, _From, #cb{dict = Dict} = S, _I) -> ?event({handle_leader_call, Op}), NewDict = F(Dict), - {reply, ok, {store, F}, S#st{dict = NewDict}}; -handle_leader_call({leader_lookup,F} = Op, _From, #st{dict = Dict} = S, _I) -> + ?event({new_dict, NewDict}), + {reply, ok, {store, F}, S#cb{dict = NewDict}}; +handle_leader_call({leader_lookup,F} = Op, _From, #cb{dict = Dict} = S, _I) -> ?event({handle_leader_call, Op}), Reply = F(Dict), - {reply, Reply, S#st{dict = Dict}}. + ?event({reply, Reply}), + {reply, Reply, S#cb{dict = Dict}}. %% @spec handle_leader_cast(Msg::term(), State::term(), I::info()) -> @@ -251,12 +260,14 @@ handle_leader_cast(_Msg, S, _I) -> %% In this particular module, the leader passes an update function to be %% applied to the candidate's state. %% @end -from_leader({sync, D}, #st{} = S, _I) -> - {ok, S#st{dict = D}}; -from_leader({store,F} = Op, #st{dict = Dict} = S, _I) -> - ?event({from_leader, Op}), +from_leader({sync, D} = Msg, #cb{} = S, _I) -> + ?event({from_leader, Msg}, S), + {ok, S#cb{dict = D}}; +from_leader({store,F} = Op, #cb{dict = Dict} = S, _I) -> + ?event({from_leader, Op}, S), NewDict = F(Dict), - {ok, S#st{dict = NewDict}}. + ?event({new_dict, NewDict}), + {ok, S#cb{dict = NewDict}}. %% @spec handle_call(Request::term(), From::callerRef(), State::state(), %% I::info()) -> @@ -275,15 +286,18 @@ from_leader({store,F} = Op, #st{dict = Dict} = S, _I) -> %% used to it from gen_server. %% @end %% -handle_call(merge, _From, #st{am_leader = AmLeader, +handle_call(merge, _From, #cb{am_leader = AmLeader, dict = Dict} = S, _I) -> + ?event({handle_call, merge}, S), if AmLeader -> {reply, {true, Dict}, S}; true -> {reply, false, S} end; -handle_call({lookup, F}, _From, #st{dict = Dict} = S, _I) -> +handle_call({lookup, F}, _From, #cb{dict = Dict} = S, _I) -> + ?event({handle_call, lookup}, S), Reply = F(Dict), + ?event({reply, Reply}), {reply, Reply, S}. %% @spec handle_cast(Msg::term(), State::state(), I::info()) -> @@ -331,5 +345,5 @@ terminate(_Reason, _S) -> ok. -event(_Line, _Event) -> +event(_Line, _Event, _State) -> ok. diff --git a/src/locks_agent.erl b/src/locks_agent.erl index 59d0c17..2024593 100755 --- a/src/locks_agent.erl +++ b/src/locks_agent.erl @@ -48,6 +48,7 @@ lock_objects/2, surrender_nowait/4, await_all_locks/1, + monitor_nodes/2, change_flag/3, lock_info/1, transaction_status/1, @@ -104,6 +105,7 @@ down = [] :: [node()], monitored = [] :: [{node(), reference()}], await_nodes = false :: boolean(), + monitor_nodes = false :: boolean(), pending :: ets:tab(), sync = [] :: [#lock{}], client :: pid(), @@ -183,6 +185,12 @@ start() -> %% the agent will wait for the nodes to reappear, and reclaim the lock(s) when %% they do. %% +%% * `{monitor_nodes, boolean()}' - default: `false'. Works like +%% {@link net_kernel:monitor_nodes/1}, but `nodedown' and `nodeup' messages are +%% sent when the `locks' server on a given node either appears or disappears. +%% The messages (`{nodeup,Node}' and `{nodedown,Node}') are sent only to +%% the client. Can also be toggled using {@link monitor_nodes/2}. +%% %% * `{notify, boolean()}' - default: `false'. If `{notify, true}', the client %% will receive `{locks_agent, Agent, Info}' messages, where `Info' is either %% a `#locks_info{}' record or `{have_all_locks, Deadlocks}'. @@ -318,6 +326,17 @@ begin_transaction(Objects, Opts) -> await_all_locks(Agent) -> call(Agent, await_all_locks,infinity). +-spec monitor_nodes(agent(), boolean()) -> boolean(). +%% @doc Toggles monitoring of nodes, like net_kernel:monitor_nodes/1. +%% +%% Works like {@link net_kernel:monitor_nodes/1}, but `nodedown' and `nodeup' +%% messages are sent when the `locks' server on a given node either appears +%% or disappears. In this sense, the monitoring will signal when a viable +%% `locks' node becomes operational (or inoperational). +%% The messages (`{nodeup,Node}' and `{nodedown,Node}') are sent only to +%% the client. +monitor_nodes(Agent, Bool) when is_boolean(Bool) -> + call(Agent, {monitor_nodes, Bool}). -spec end_transaction(agent()) -> ok. %% @doc Ends the transaction, terminating the agent. @@ -462,6 +481,7 @@ init(Opts) -> false -> [] end, AwaitNodes = proplists:get_value(await_nodes, Opts, false), + MonNodes = proplists:get_value(monitor_nodes, Opts, false), net_kernel:monitor_nodes(true), {ok,#state{ locks = ets_new(locks, [ordered_set, {keypos, #lock.object}]), @@ -470,6 +490,7 @@ init(Opts) -> down = [], monitored = orddict:new(), await_nodes = AwaitNodes, + monitor_nodes = MonNodes, pending = ets_new(pending, [bag, {keypos, #req.object}]), sync = [], client = Client, @@ -486,6 +507,8 @@ handle_call(transaction_status, _From, #state{status = Status} = State) -> {reply, Status, State}; handle_call(await_all_locks, {Client, Tag}, State) -> {noreply, check_if_done(add_waiter(wait, Client, Tag, State))}; +handle_call({monitor_nodes, Bool}, _From, St) when is_boolean(Bool) -> + {reply, St#state.monitor_nodes, St#state{monitor_nodes = Bool}}; handle_call(stop, {Client, _}, State) when Client==?myclient -> {stop, normal, ok, State}; handle_call(R, _, State) -> @@ -677,7 +700,7 @@ handle_info({'DOWN',_,_,_,_}, S) -> {noreply, S}; handle_info({nodeup, N} = _Msg, #state{down = Down} = S) -> ?event(_Msg), - case lists:member(N, Down) of + case S#state.monitor_nodes orelse lists:member(N, Down) of true -> watch_node(N); false -> ignore end, @@ -685,12 +708,23 @@ handle_info({nodeup, N} = _Msg, #state{down = Down} = S) -> handle_info({nodedown,_}, S) -> %% We react on 'DOWN' messages above instead {noreply, S}; -handle_info({locks_running,N} = Msg, #state{down=Down, pending=Pending} = S) -> +handle_info({locks_running,N} = Msg, #state{down=Down, pending=Pending, + requests = Reqs, + monitor_nodes = MonNodes, + client = C} = S) -> ?event(Msg), + if MonNodes -> + C ! {nodeup, N}; + true -> + ignore + end, case lists:member(N, Down) of true -> S1 = S#state{down = Down -- [N]}, - case requests_with_node(N, Pending) of + {R, P} = Res = {requests_with_node(N, Reqs), + requests_with_node(N, Pending)}, + ?event({{reqs, pending}, Res}), + case R ++ P of [] -> {noreply, S1}; Reissue -> @@ -777,16 +811,21 @@ handle_nodedown(Node, #state{down = Down, requests = Reqs, pending = Pending, monitored = Mon, locks = Locks, interesting = I, - agents = Agents} = S) -> + agents = Agents, + monitor_nodes = MonNodes} = S) -> ?event({handle_nodedown, Node}, S), + handle_monitor_on_down(Node, S), ets_match_delete(Locks, #lock{object = {'_',Node}, _ = '_'}), ets_match_delete(Agents, {{'_',{'_',Node}}}), Down1 = [Node|Down -- [Node]], - S1 = S#state{down = Down1, interesting = prune_interesting(I, node, Node)}, - case {requests_with_node(Node, Reqs), - requests_with_node(Node, Pending)} of + S1 = S#state{down = Down1, interesting = prune_interesting(I, node, Node), + monitored = lists:keydelete(Node, 1, Mon)}, + Res = {requests_with_node(Node, Reqs), + requests_with_node(Node, Pending)}, + ?event({{reqs,pending}, Res}), + case Res of {[], []} -> - {noreply, S#state{down = lists:keydelete(Node, 1, Mon)}}; + {noreply, S1}; {Rs, PRs} -> move_requests(Rs, Reqs, Pending), case S1#state.await_nodes of @@ -799,7 +838,8 @@ handle_nodedown(Node, #state{down = Down, requests = Reqs, {stop, {cannot_lock_objects, Lost}, S1} end; true -> - case lists:member(Node, nodes()) of + case MonNodes == false + andalso lists:member(Node, nodes()) of true -> watch_node(Node); false -> ignore end, @@ -811,6 +851,20 @@ handle_nodedown(Node, #state{down = Down, requests = Reqs, end end. +handle_monitor_on_down(_, #state{monitor_nodes = false}) -> + ok; +handle_monitor_on_down(Node, #state{monitor_nodes = true, + client = C}) -> + C ! {nodedown, Node}, + case lists:member(Node, nodes()) of + true -> + watch_node(Node); + false -> + ignore + end, + ok. + + prune_interesting(I, node, Node) -> [OID || {_, N} = OID <- I, N =/= Node]; prune_interesting(I, object, Object) -> @@ -859,7 +913,8 @@ ensure_monitor_(Locker, Node, Mon) -> orddict:store(Node, Ref, Mon) end. -request_lock({OID, Node}, Mode, #state{client = Client} = State) -> +request_lock({OID, Node} = _LockID, Mode, #state{client = Client} = State) -> + ?event({request_lock, _LockID}), P = {?LOCKER, Node}, erlang:monitor(process, P), locks_server:lock(OID, [Node], Client, Mode), @@ -946,7 +1001,7 @@ do_surrender(ShouldSurrender, ToObject, InvolvedAgents, NewDeadlocks = [{ShouldSurrender, ToObject} | Deadlocks], ?event({do_surrender, [{should_surrender, ShouldSurrender}, {to_object, ToObject}, - {involved_agents, InvolvedAgents}]}), + {involved_agents, lists:sort(InvolvedAgents)}]}), if ShouldSurrender == self() -> request_surrender(ToObject, State1), send_surrender_info(InvolvedAgents, OldLock), diff --git a/src/locks_leader.erl b/src/locks_leader.erl index c4bc7e2..021e091 100644 --- a/src/locks_leader.erl +++ b/src/locks_leader.erl @@ -178,7 +178,8 @@ candidates(#st{candidates = C}) -> %% new candidates to see whether one of them was a leader, which could %% be the case if the candidates appeared after a healed netsplit. %% @end -new_candidates(#st{candidates = C, synced = S}) -> +new_candidates(#st{candidates = C, synced = S} = St) -> + ?event({new_candidates, St}), C -- S. -spec workers(election()) -> [pid()]. @@ -456,9 +457,9 @@ init_(Module, ModSt0, Options, Parent, Reg) -> Agent = case Role of candidate -> - net_kernel:monitor_nodes(true), {ok, A} = locks_agent:start([{notify,true}, - {await_nodes, true}]), + {await_nodes, true}, + {monitor_nodes, true}]), locks_agent:lock_nowait( A, Lock, write, AllNodes, all_alive), A; @@ -756,6 +757,7 @@ monitor_cand(Client) -> maybe_announce_leader(Pid, Type, #st{leader = L, mod = M, mod_state = MSt} = S) -> + ?event({maybe_announce_leader, Pid, Type}, S), IsSynced = is_synced(Pid, Type, S), if L == self(), IsSynced == false -> case M:elected(MSt, opaque(S), Pid) of diff --git a/src/locks_ttb.erl b/src/locks_ttb.erl index 0298a5f..b76d98f 100644 --- a/src/locks_ttb.erl +++ b/src/locks_ttb.erl @@ -52,10 +52,7 @@ handler(Fd, Trace, _, {Tp,Diff} = Acc) -> case Trace of {trace_ts,{_, _, Node}, call, - {Mod, event, [Line, Evt, State]}, TS} when Mod==locks_agent; - Mod==locks_server; - Mod==locks_leader; - Mod==?MODULE -> + {Mod, event, [Line, Evt, State]}, TS} when is_integer(Line) -> Tdiff = tdiff(TS, Tp), Diff1 = Diff + Tdiff, print(Fd, Node, Mod, Line, Evt, State, Diff1), @@ -87,11 +84,27 @@ print_tail(St, Mod, Col) -> [{put_chars, unicode, [lists:duplicate(Col,$\s), Cs]}, nl]. pp(Term, Col, Mod) -> - io_lib_pretty:print(Term, [{column, Col}, - {line_length, 80}, - {depth, -1}, - {max_chars, ?CHAR_MAX}, - {record_print_fun, record_print_fun(Mod)}]). + io_lib_pretty:print(pp_term(Term), + [{column, Col}, + {line_length, 80}, + {depth, -1}, + {max_chars, ?CHAR_MAX}, + {record_print_fun, record_print_fun(Mod)}]). + +pp_term(D) when element(1,D) == dict -> + try {'$dict', dict:to_list(D)} + catch + error:_ -> + list_to_tuple([pp_term(T) || T <- tuple_to_list(D)]) + end; +pp_term(T) when is_tuple(T) -> + list_to_tuple([pp_term(Trm) || Trm <- tuple_to_list(T)]); +pp_term(L) when is_list(L) -> + [pp_term(T) || T <- L]; +pp_term(T) -> + T. + + tdiff(_, 0) -> 0; tdiff(TS, T0) -> diff --git a/test/locks_leader_SUITE.erl b/test/locks_leader_SUITE.erl new file mode 100644 index 0000000..64d3a76 --- /dev/null +++ b/test/locks_leader_SUITE.erl @@ -0,0 +1,416 @@ +-module(locks_leader_SUITE). + +%% common_test exports +-export( + [ + all/0, groups/0, suite/0, + init_per_suite/1, end_per_suite/1, + init_per_group/2, end_per_group/2, + init_per_testcase/2, end_per_testcase/2 + ]). + +%% test case exports +-export( + [ + local_dict/1, + gdict_all_nodes/1, + gdict_netsplit/1, + start_incremental/1 + ]). + +-export([patch_net_kernel/0, + proxy/0, + connect_nodes/1, + disconnect_nodes/1, + unbar_nodes/0, + leader_nodes/1]). + +-include_lib("common_test/include/ct.hrl"). +-define(retry_not(Res, Expr), retry(fun() -> + __E = Expr, + {false, _} = {Res == __E, __E}, + __E + end, 10)). +-define(retry(Res, Expr), retry(fun() -> + __E = Expr, + {ok, Res} = {ok, __E}, + __E + end, 10)). +-define(NOT(Expr), {'$not', Expr}). + +all() -> + [ + {group, g_local}, + {group, g_all}, + {group, g_incr} + ]. + +groups() -> + [ + {g_local, [], [local_dict]}, + {g_all, [], [gdict_all_nodes, + gdict_netsplit]}, + {g_incr, [], [start_incremental]} + ]. + +suite() -> + []. + +init_per_suite(Config) -> + compile_dict(), + application:start(sasl), + Config. + +end_per_suite(_Config) -> + application:stop(sasl), + ok. + +init_per_group(g_local, Config) -> + application:start(locks), + Config; +init_per_group(_Group, Config) -> + application:start(locks), + Ns = start_slaves(node_list(5)), + [{slaves, Ns}|Config]. + +end_per_group(g_local, _Config) -> + application:stop(locks); +end_per_group(_Group, Config) -> + stop_slaves(?config(slaves, Config)), + ok. + +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(Case, Config) when Case==gdict_all_nodes; + Case==gdict_netsplit -> + proxy_multicall(get_slave_nodes(Config), + application, stop, [locks]), + ok; +end_per_testcase(_Case, _Config) -> + ok. + + +%% ============================================================ +%% Test cases +%% ============================================================ + +local_dict(_Config) -> + Name = {gdict, ?LINE}, + Dicts = lists:map( + fun(_) -> + {ok,D} = gdict:new_opt([{resource, Name}]), + D + end, [1,2,3]), + lists:foreach(fun(D) -> + ok = gdict:store(a, 17, D), + {ok,17} = gdict:find(a, D) + end, Dicts), + _ = [begin unlink(D), exit(D,kill) end || D <- Dicts], + ok. + +gdict_all_nodes(Config) -> + with_trace(fun gdict_all_nodes_/1, Config, "leader_tests_all_nodes"). + +gdict_all_nodes_(Config) -> + [H|T] = Ns = get_slave_nodes(Config), + Name = [?MODULE,?LINE], + ok = call_proxy(H, ?MODULE, connect_nodes, [T]), + T = call_proxy(H, erlang, nodes, []), + ok = lists:foreach( + fun(ok) -> ok end, + proxy_multicall(Ns, application, start, [locks])), + Results = proxy_multicall(Ns, gdict, new_opt, [[{resource, Name}]]), + Dicts = lists:map( + fun({ok,D}) -> D end, Results), + ok = gdict:store(a,1,hd(Dicts)), + [] = lists:filter( + fun({_Node,{ok,1}}) -> false; + (_) -> true + end, + lists:zip(Ns, [?retry({ok,1}, gdict:find(a,D)) || D <- Dicts])), + [exit(D, kill) || D <- Dicts], + proxy_multicall(Ns, application, stop, [locks]), + ok. + +gdict_netsplit(Config) -> + with_trace(fun gdict_netsplit_/1, Config, "leader_tests_netsplit"). + +gdict_netsplit_(Config) -> + Name = [?MODULE, ?LINE], + [A,B,C,D,E] = Ns = get_slave_nodes(Config), + proxy_multicall([A,B], ?MODULE, disconnect_nodes, [[C,D,E]]), + [B] = call_proxy(A, erlang, nodes, []), + [A] = call_proxy(B, erlang, nodes, []), + locks_ttb:event(netsplit_ready), + ok = lists:foreach( + fun(ok) -> ok end, + proxy_multicall(Ns, application, start, [locks])), + Results = proxy_multicall(Ns, gdict, new_opt, [[{resource, Name}]]), + [Da,Db,Dc,Dd,De] = Dicts = lists:map(fun({ok,Dx}) -> Dx end, Results), + locks_ttb:event({dicts_created, lists:zip(Ns, Dicts)}), + ok = ?retry(ok, gdict:store(a, 1, Da)), + ok = gdict:store(b, 2, Dc), + {ok, 1} = ?retry({ok,1}, gdict:find(a, Db)), + error = gdict:find(a, Dc), + [X,X] = [locks_leader:info(Dx, leader) || Dx <- [Da,Db]], + locks_ttb:event({leader_consensus, [Da,Db], X}), + [Y,Y,Y] = [locks_leader:info(Dx, leader) || Dx <- [Dc,Dd,De]], + locks_ttb:event({leader_consensus, [Dc,Dd,De], Y}), + true = (X =/= Y), + {ok, 2} = ?retry({ok,2}, gdict:find(b, Dc)), + {ok, 2} = ?retry({ok,2}, gdict:find(b, Dd)), + {ok, 2} = ?retry({ok,2}, gdict:find(b, De)), + error = gdict:find(b, Da), + locks_ttb:event(reconnecting), + proxy_multicall(Ns, ?MODULE, unbar_nodes, []), + proxy_multicall(Ns, ?MODULE, connect_nodes, [Ns]), + [B,C,D,E] = lists:sort(call_proxy(A, erlang, nodes, [])), + [Z,Z,Z,Z,Z] = ?retry([Z,Z,Z,Z,Z], + call_proxy(A, ?MODULE, leader_nodes, [Dicts])), + locks_ttb:event({leader_consensus, Ns, Z}), + {ok, 1} = ?retry({ok,1}, gdict:find(a, Dc)), + {ok, 2} = ?retry({ok,2}, gdict:find(b, Da)), + [exit(Dx, kill) || Dx <- Dicts], + proxy_multicall(Ns, application, stop, [locks]), + ok. + +start_incremental(Config) -> + with_trace(fun start_incremental_/1, Config, "leader_tests_incr"). + +start_incremental_(Config) -> + Name = [?MODULE, ?LINE], + Ns = get_slave_nodes(Config), + start_incremental(Ns, [], Name). + +start_incremental([], _, _) -> + ok; +start_incremental([N|Ns], Alive, Name) -> + start_incremental(N, Alive, Ns, Name). + +start_incremental(N, Alive, Rest, Name) -> + maybe_connect(N, Alive), + ok = rpc:call(N, application, start, [locks]), + {ok, D} = call_proxy(N, gdict, new_opt, [[{resource, Name}]]), + ct:log("Dict created on ~p: ~p~n", [N, D]), + insert_initial(D, Alive), + NewAlive = [{N, D}|Alive], + Vals = [{D, ?retry({ok,1}, gdict:find(a, D1))} + || {_,D1} <- NewAlive], + ct:log("Values = ~p~n", [Vals]), + Leaders = [{D1, ?retry_not(undefined, locks_leader:info(D1, leader))} + || {_, D1} <- NewAlive], + ct:log("Leaders = ~p~n", [Leaders]), + start_incremental(Rest, NewAlive, Name). + +%% ============================================================ +%% Support code +%% ============================================================ + +with_trace(F, Config, Name) -> + Ns = get_slave_nodes(Config), + Pats = [{test_cb, event, 3, []}|locks_ttb:default_patterns()], + Flags = locks_ttb:default_flags(), + locks_ttb:trace_nodes([node()|Ns], Pats, Flags, [{file, Name}]), + try F(Config) + catch + error:R -> + ttb_stop(), + Stack = erlang:get_stacktrace(), + ct:log("Error ~p; Stack = ~p~n", [R, Stack]), + erlang:error(R); + exit:R -> + ttb_stop(), + exit(R) + end, + locks_ttb:stop_nofetch(), + ok. + +ttb_stop() -> + Dir = locks_ttb:stop(), + Out = filename:join(filename:dirname(Dir), + filename:basename(Dir) ++ ".txt"), + locks_ttb:format(Dir, Out), + ct:log("Formatted trace log in ~s~n", [Out]). + + +compile_dict() -> + Lib = filename:absname(code:lib_dir(locks)), + Examples = filename:join(Lib, "examples"), + _ = os:cmd(["cd ", Examples, " && rebar clean compile"]), + _ = code:add_path(filename:join(Examples, "ebin")), + ok. + +maybe_connect(_, []) -> + ok; +maybe_connect(N, [{N1,_}|_]) -> + call_proxy(N, net_kernel, connect, [N1]). + +insert_initial(D, []) -> + gdict:store(a, 1, D); +insert_initial(_, _) -> + ok. + +node_list(5) -> + [locks_1, locks_2, locks_3, locks_4, locks_5]. + +retry(F, N) -> + retry(F, N, undefined). + +retry(F, N, _) when N > 0 -> + try F() + catch + error:{badmatch, {_, Other}} -> + timer:sleep(100), + retry(F, N-1, Other) + end; +retry(_, _, Last) -> + Last. + +disconnect_nodes(Ns) -> + [{true,_} = {erlang:disconnect_node(N), N} || N <- Ns, N =/= node()], + ok. + +unbar_nodes() -> + gen_server:call(net_kernel, unbar_all). + +connect_nodes(Ns) -> + [{true,_} = {net_kernel:connect_node(N), N} || N <- Ns, N =/= node()], + ok. + +leader_nodes(Ds) -> + [node(locks_leader:info(D, leader)) || D <- Ds]. + +-define(PROXY, locks_leader_test_proxy). + +proxy() -> + register(?PROXY, self()), + process_flag(trap_exit, true), + proxy_loop(). + +proxy_loop() -> + receive + {From, Ref, apply, M, F, A} -> + From ! {Ref, (catch apply(M,F,A))}; + _ -> + ok + end, + proxy_loop(). + +proxy_multicall(Ns, M, F, A) -> + [call_proxy(N, M, F, A) || N <- Ns]. + +call_proxy(N, M, F, A) -> + Ref = erlang:monitor(process, {?PROXY, N}), + {?PROXY, N} ! {self(), Ref, apply, M, F, A}, + receive + {'DOWN', Ref, _, _, Reason} -> + error({proxy_died, N, Reason}); + {Ref, Result} -> + Result + after 1000 -> + error(proxy_call_timeout) + end. + +get_slave_nodes(Config) -> + [N || {N,_} <- ?config(slaves, Config)]. + +start_slaves(Ns) -> + Nodes = [start_slave(N) || N <- Ns], + ct:log("start_slaves() -> ~p~n", [Nodes]), + Nodes. + +start_slave(Name) -> + case node() of + nonode@nohost -> + os:cmd("epmd -daemon"), + {ok, _} = net_kernel:start([locks_master, shortnames]); + _ -> + ok + end, + {Pa, Pz} = paths(), + Paths = "-pa ./ -pz ../ebin" ++ + lists:flatten([[" -pa " ++ Path || Path <- Pa], + [" -pz " ++ Path || Path <- Pz]]), + Arg = " -kernel dist_auto_connect once", + {ok, Node} = ct_slave:start(host(), Name, [{erl_flags, Paths ++ Arg}]), + {module,net_kernel} = rpc:call(Node, ?MODULE, patch_net_kernel, []), + disconnect_node(Node), + true = net_kernel:hidden_connect(Node), + spawn(Node, ?MODULE, proxy, []), + {Node, rpc:call(Node, os, getpid, [])}. + +stop_slaves(Ns) -> + [ok = stop_slave(N) || N <- Ns], + ok. + +stop_slave({N, Pid}) -> + try erlang:monitor_node(N, true) of + true -> + rpc:call(N, erlang, halt, []), + receive + {nodedown, N} -> ok + after 10000 -> + os:cmd("kill -9 " ++ Pid), + ok + end + catch + error:badarg -> + ok + end. + +paths() -> + Path = code:get_path(), + {ok, [[Root]]} = init:get_argument(root), + {Pas, Rest} = lists:splitwith(fun(P) -> + not lists:prefix(Root, P) + end, Path), + Pzs = lists:filter(fun(P) -> + not lists:prefix(Root, P) + end, Rest), + {Pas, Pzs}. + + +host() -> + [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]), + list_to_atom(Host). + + +patch_net_kernel() -> + NetKernel = code:which(net_kernel), + {ok, {_,[{abstract_code, + {raw_abstract_v1, + [{attribute,1,file,_}|Forms]}}]}} = + beam_lib:chunks(NetKernel, [abstract_code]), + NewForms = xform_net_kernel(Forms), + try + {ok,net_kernel,Bin} = compile:forms(NewForms, [binary]), + code:unstick_dir(filename:dirname(NetKernel)), + {module, _Module} = Res = code:load_binary(net_kernel, NetKernel, Bin), + locks_ttb:event({net_kernel, NewForms}), + Res + catch + error:What -> + io:fwrite(user, "~p: ERROR:~p~n", [?LINE, What]), + error({What, erlang:get_stacktrace()}) + end. + +xform_net_kernel({function,L,handle_call,3,Clauses}) -> + {function,L,handle_call,3, + [{clause,L,[{atom,L,unbar_all},{var,L,'From'},{var,L,'State'}], [], + [{call,L,{remote,L,{atom,L,ets},{atom,L,match_delete}}, + [ + {atom,L,sys_dist}, + {record,L,barred_connection, + [{record_field,L,{var,L,'_'},{atom,L,'_'}}]} + ]}, + {call,L,{atom,L,async_reply}, + [{tuple,L,[{atom,L,reply},{atom,L,true},{var,L,'State'}]}, + {var,L,'From'}]} + ]} | Clauses]}; +xform_net_kernel(T) when is_tuple(T) -> + list_to_tuple(xform_net_kernel(tuple_to_list(T))); +xform_net_kernel([H|T]) -> + [xform_net_kernel(H) | xform_net_kernel(T)]; +xform_net_kernel(Other) -> + Other. diff --git a/test/locks_leader_tests.erl b/test/locks_leader_tests.erl deleted file mode 100644 index a2999f8..0000000 --- a/test/locks_leader_tests.erl +++ /dev/null @@ -1,273 +0,0 @@ --module(locks_leader_tests). - --include_lib("eunit/include/eunit.hrl"). - --compile(export_all). - --define(retry(Res, Expr), retry(fun() -> Res = Expr end, 10)). - -run_test_() -> - {timeout, 60, - {setup, - fun() -> - compile_dict(), - application:start(sasl), - ok = application:start(locks), - Ns = start_slaves([locks_1, locks_2, - locks_3, locks_4, locks_5]), - rpc:multicall(Ns, application, start, [locks]), - Ns - end, - fun(Ns) -> - stop_slaves(Ns), - ok = application:stop(locks) - end, - fun(Ns) -> - {inorder, - [ - ?_test(local_dict()), - ?_test(gdict_all_nodes(Ns)), - ?_test(gdict_netsplit(Ns)) - ]} - end - }}. - - -compile_dict() -> - Lib = filename:absname(code:lib_dir(locks)), - Examples = filename:join(Lib, "examples"), - _ = os:cmd(["cd ", Examples, " && rebar clean compile"]), - _ = code:add_path(filename:join(Examples, "ebin")), - ok. - -local_dict() -> - Name = {gdict, ?LINE}, - Dicts = lists:map( - fun(_) -> - {ok,D} = gdict:new_opt([{resource, Name}]), - D - end, [1,2,3]), - lists:foreach(fun(D) -> - ok = gdict:store(a, 17, D), - {ok,17} = gdict:find(a, D) - end, Dicts), - _ = [begin unlink(D), exit(D,kill) end || D <- Dicts], - ok. - -gdict_all_nodes([H|T] = Ns) -> - Name = [?MODULE,?LINE], - ok = call_proxy(H, ?MODULE, connect_nodes, [T]), - T = call_proxy(H, erlang, nodes, []), - Results = proxy_multicall(Ns, gdict, new_opt, [[{resource, Name}]]), - Dicts = lists:map( - fun({ok,D}) -> D end, Results), - ok = gdict:store(a,1,hd(Dicts)), - [] = lists:filter( - fun({_Node,{ok,1}}) -> false; - (_) -> true - end, - lists:zip(Ns, [gdict:find(a,D) || D <- Dicts])), - [exit(D, kill) || D <- Dicts], - ok. - -gdict_netsplit([A,B,C,D,E] = Ns) -> - Name = [?MODULE, ?LINE], - locks_ttb:trace_nodes([node()|Ns], "leader_tests_netsplit"), - try - proxy_multicall([A,B], ?MODULE, disconnect_nodes, [[C,D,E]]), - [B] = call_proxy(A, erlang, nodes, []), - [A] = call_proxy(B, erlang, nodes, []), - locks_ttb:event(netsplit_ready), - Results = proxy_multicall(Ns, gdict, new_opt, [[{resource, Name}]]), - [Da,Db,Dc,Dd,De] = Dicts = lists:map(fun({ok,Dx}) -> Dx end, Results), - locks_ttb:event({dicts_created, lists:zip(Ns, Dicts)}), - ok = ?retry(ok, gdict:store(a, 1, Da)), - ok = gdict:store(b, 2, Dc), - {ok, 1} = gdict:find(a, Db), - error = gdict:find(a, Dc), - [X,X] = [locks_leader:info(Dx, leader) || Dx <- [Da,Db]], - locks_ttb:event({leader_consensus, [Da,Db], X}), - [Y,Y,Y] = [locks_leader:info(Dx, leader) || Dx <- [Dc,Dd,De]], - locks_ttb:event({leader_consensus, [Dc,Dd,De], Y}), - true = (X =/= Y), - {ok, 2} = ?retry({ok,2}, gdict:find(b, Dc)), - {ok, 2} = gdict:find(b, Dd), - {ok, 2} = gdict:find(b, De), - error = gdict:find(b, Da), - locks_ttb:event(reconnecting), - proxy_multicall(Ns, ?MODULE, unbar_nodes, []), - proxy_multicall(Ns, ?MODULE, connect_nodes, [Ns]), - [B,C,D,E] = lists:sort(call_proxy(A, erlang, nodes, [])), - [Z,Z,Z,Z,Z] = ?retry([Z,Z,Z,Z,Z], - call_proxy(A, ?MODULE, leader_nodes, [Dicts])), - locks_ttb:event({leader_consensus, Ns, Z}), - {ok, 1} = ?retry({ok,1}, gdict:find(a, Dc)), - {ok, 2} = gdict:find(b, Da), - [exit(Dx, kill) || Dx <- Dicts] - catch - error:R -> - locks_ttb:stop(), - erlang:error(R); - exit:R -> - locks_ttb:stop(), - exit(R) - end, - locks_ttb:stop_nofetch(), - ok. - -retry(F, N) -> - retry(F, N, undefined). - -retry(F, N, _) when N > 0 -> - try F() - catch - error:{badmatch, Other} -> - timer:sleep(100), - retry(F, N-1, Other) - end; -retry(_, _, Last) -> - Last. - - -disconnect_nodes(Ns) -> - [{true,_} = {erlang:disconnect_node(N), N} || N <- Ns, N =/= node()], - ok. - -unbar_nodes() -> - gen_server:call(net_kernel, unbar_all). - -connect_nodes(Ns) -> - [{true,_} = {net_kernel:connect_node(N), N} || N <- Ns, N =/= node()], - ok. - -leader_nodes(Ds) -> - [node(locks_leader:info(D, leader)) || D <- Ds]. - --define(PROXY, locks_leader_test_proxy). - -proxy() -> - register(?PROXY, self()), - process_flag(trap_exit, true), - proxy_loop(). - -proxy_loop() -> - receive - {From, Ref, apply, M, F, A} -> - From ! {Ref, (catch apply(M,F,A))}; - _ -> - ok - end, - proxy_loop(). - -proxy_multicall(Ns, M, F, A) -> - [call_proxy(N, M, F, A) || N <- Ns]. - -call_proxy(N, M, F, A) -> - Ref = erlang:monitor(process, {?PROXY, N}), - {?PROXY, N} ! {self(), Ref, apply, M, F, A}, - receive - {'DOWN', Ref, _, _, Reason} -> - error({proxy_died, N, Reason}); - {Ref, Result} -> - Result - after 1000 -> - error(proxy_call_timeout) - end. - -start_slaves(Ns) -> - Nodes = [start_slave(N) || N <- Ns], - Nodes. - -start_slave(Name) -> - case node() of - nonode@nohost -> - os:cmd("epmd -daemon"), - {ok, _} = net_kernel:start([locks_master, shortnames]); - _ -> - ok - end, - {Pa, Pz} = paths(), - Paths = "-pa ./ -pz ../ebin" ++ - lists:flatten([[" -pa " ++ Path || Path <- Pa], - [" -pz " ++ Path || Path <- Pz]]), - Arg = " -kernel dist_auto_connect once", - {ok, Node} = ct_slave:start(host(), Name, [{erl_flags, Paths ++ Arg}]), - {module,net_kernel} = rpc:call(Node, ?MODULE, patch_net_kernel, []), - disconnect_node(Node), - true = net_kernel:hidden_connect(Node), - spawn(Node, ?MODULE, proxy, []), - Node. - -stop_slaves(Ns) -> - [ok = stop_slave(N) || N <- Ns], - ok. - -stop_slave(N) -> - try erlang:monitor_node(N, true) of - true -> - rpc:call(N, erlang, halt, []), - receive - {nodedown, N} -> ok - after 10000 -> - erlang:error(slave_stop_timeout) - end - catch - error:badarg -> - ok - end. - -paths() -> - Path = code:get_path(), - {ok, [[Root]]} = init:get_argument(root), - {Pas, Rest} = lists:splitwith(fun(P) -> - not lists:prefix(Root, P) - end, Path), - Pzs = lists:filter(fun(P) -> - not lists:prefix(Root, P) - end, Rest), - {Pas, Pzs}. - - -host() -> - [_Name, Host] = re:split(atom_to_list(node()), "@", [{return, list}]), - list_to_atom(Host). - - -patch_net_kernel() -> - NetKernel = code:which(net_kernel), - {ok, {_,[{abstract_code, - {raw_abstract_v1, - [{attribute,1,file,_}|Forms]}}]}} = - beam_lib:chunks(NetKernel, [abstract_code]), - NewForms = xform_net_kernel(Forms), - try - {ok,net_kernel,Bin} = compile:forms(NewForms, [binary]), - code:unstick_dir(filename:dirname(NetKernel)), - {module, _Module} = Res = code:load_binary(net_kernel, NetKernel, Bin), - locks_ttb:event({net_kernel, NewForms}), - Res - catch - error:What -> - io:fwrite(user, "~p: ERROR:~p~n", [?LINE, What]), - error({What, erlang:get_stacktrace()}) - end. - -xform_net_kernel({function,L,handle_call,3,Clauses}) -> - {function,L,handle_call,3, - [{clause,L,[{atom,L,unbar_all},{var,L,'From'},{var,L,'State'}], [], - [{call,L,{remote,L,{atom,L,ets},{atom,L,match_delete}}, - [ - {atom,L,sys_dist}, - {record,L,barred_connection, - [{record_field,L,{var,L,'_'},{atom,L,'_'}}]} - ]}, - {call,L,{atom,L,async_reply}, - [{tuple,L,[{atom,L,reply},{atom,L,true},{var,L,'State'}]}, - {var,L,'From'}]} - ]} | Clauses]}; -xform_net_kernel(T) when is_tuple(T) -> - list_to_tuple(xform_net_kernel(tuple_to_list(T))); -xform_net_kernel([H|T]) -> - [xform_net_kernel(H) | xform_net_kernel(T)]; -xform_net_kernel(Other) -> - Other.