diff --git a/README b/README new file mode 100644 index 0000000..36fc78b --- /dev/null +++ b/README @@ -0,0 +1,12 @@ +unsplit - Resolve conflicts in Mnesia tables after network split + +Author: Ulf Wiger, Erlang Solutions Ltd + + +NOTE: This application is currently a fairly crude prototype, + and is not intended for mission-critical tasks (yet). + +Documentation will follow, as soon as the approach has been +verified to work. So far, no Mnesia patches are used, but +for it to work really well, some added functionality in +Mnesia may well be needed. diff --git a/commands.txt b/commands.txt new file mode 100644 index 0000000..f790920 --- /dev/null +++ b/commands.txt @@ -0,0 +1,26 @@ +erl -boot start_sasl -kernel dist_auto_connect once -sname n1 -pa ~/src/unsplit-0.1/ebin -mnesia debug trace +erl -boot start_sasl -kernel dist_auto_connect once -sname n2 -pa ~/src/unsplit-0.1/ebin -mnesia debug trace + +mnesia:start(). +application:start(unsplit). +rd(test,{key,modified=erlang:now(),value}). + +On n2@debian: + +mnesia:create_schema([n1@debian,n2@debian]). + +mnesia:delete_table(test). +mnesia:create_table(test,[{ram_copies,[n1@debian,n2@debian]},{attributes,[key,modified,value]},{user_properties,[{unsplit_method,{unsplit_lib,last_modified,[]}}]}]). + +mnesia:transaction(fun() -> mnesia:write(#test{key=1,value=a}) end). +mnesia:transaction(fun() -> mnesia:write(#test{key=2,value=a}) end). + +ets:tab2list(test). + + +On n1@debian: + +disconnect_node(n2@debian). +mnesia:transaction(fun() -> mnesia:write(#test{key=2,value=b}) end). +timer:sleep(3000). +net_kernel:connect_node(n2@debian). diff --git a/ebin/unsplit.app b/ebin/unsplit.app new file mode 100644 index 0000000..e5aceb2 --- /dev/null +++ b/ebin/unsplit.app @@ -0,0 +1,7 @@ +{application, unsplit, + [{vsn, "0.1"}, + {description, "Merges mnesia tables after net split"}, + {applications, [mnesia]}, + {mod, {unsplit, []}}, + {env, []} + ]}. diff --git a/src/unsplit.erl b/src/unsplit.erl new file mode 100644 index 0000000..72a1597 --- /dev/null +++ b/src/unsplit.erl @@ -0,0 +1,22 @@ +-module(unsplit). + +-behaviour(application). +-behaviour(supervisor). + +-export([start/2, stop/1]). + + +-export([init/1]). + +start(_, _) -> + supervisor:start_link({local,?MODULE}, ?MODULE, []). + +stop(_) -> + ok. + +%% Supervisor callback: + +init([]) -> + Children = [{unsplit_server, {unsplit_server, start_link, []}, + permanent, 3000, worker, [unsplit_server]}], + {ok, {{one_for_one, 3, 10}, Children}}. diff --git a/src/unsplit_lib.erl b/src/unsplit_lib.erl new file mode 100644 index 0000000..f37788e --- /dev/null +++ b/src/unsplit_lib.erl @@ -0,0 +1,69 @@ +-module(unsplit_lib). +-export([no_action/2, + last_modified/2, + last_version/2]). + + + +no_action(start, [Tab|_]) -> + error_logger:format("Will not merge table ~p~n", [Tab]), + stop. + + +last_modified(init, S0) -> + last_version(init, S0 ++ [modified]); +last_modified(Other, S) -> + last_version(Other, S). + +last_version(init, [Tab, Attrs, Attr]) -> + case lists:member(Attr, Attrs) of + false -> + error_logger:format("Cannot merge table ~p." + "Missing ~p attribute~n", [Tab, Attr]), + stop; + true -> + io:fwrite("Starting merge of ~p (~p)~n", [Tab, Attrs]), + {ok, {Tab, pos(Attr, Tab, Attrs)}} + end; +last_version(done, _) -> + ok; +last_version(Objs, {T, P} = S) when is_list(Objs) -> + Actions = lists:map(fun(Obj) -> + last_version_entry(Obj, T, P) + end, Objs), + {ok, Actions, same, S}. + + +last_version_entry(Obj, T, P) -> + io:fwrite("last_version_entry(~p)~n", [Obj]), + case Obj of + {A, []} -> {write, A}; + {[], B} -> {write, B}; + {[A], [B]} -> + ModA = element(P, A), + ModB = element(P, B), + io:fwrite("ModA = ~p, ModB = ~p~n", [ModA, ModB]), + if ModA < ModB -> + {write, B}; + ModA > ModB -> + {write, A}; + ModA == ModB -> + if A =/= B -> + mnesia:abort({undecided,T,A,B}); + true -> + {write, A} + end + end + end. + + + +pos(A, T, L) -> + pos(A, T, L, 2). % record tag is the 1st element in the tuple + +pos(H, _, [H|_], P) -> + P; +pos(H, Tab, [_|T], P) -> + pos(H, Tab, T, P+1); +pos(A, Tab, [], _) -> + mnesia:abort({missing_attribute, Tab, A}). diff --git a/src/unsplit_server.erl b/src/unsplit_server.erl new file mode 100644 index 0000000..3c0a99c --- /dev/null +++ b/src/unsplit_server.erl @@ -0,0 +1,288 @@ +%%%------------------------------------------------------------------- +%%% File : unsplit_server.erl +%%% Author : Ulf Wiger +%%% Description : Coordinator for merging mnesia tables after netsplit +%%% +%%% Created : 1 Feb 2010 by Ulf Wiger +%%%------------------------------------------------------------------- +-module(unsplit_server). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([remote_handle_query/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {}). +-record(st, {module, function, extra_args = [], + modstate, + table, attributes, + remote, + chunk, + strategy = default_strategy(), + progress}). + + +-define(SERVER, ?MODULE). +-define(DEFAULT_METHOD, {unsplit_lib, no_action, []}). +-define(DEFAULT_STRATEGY, all_keys). +-define(TIMEOUT, 10000). + +-define(DONE, {?MODULE,done}). + +-define(LOCK, {?MODULE, stitch}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([]) -> + mnesia:subscribe(system), + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({mnesia_system_event, + {inconsistent_database, Context, Node}}, State) -> + io:fwrite("inconsistency. Context = ~p; Node = ~p~n", [Context, Node]), + Res = global:trans( + {?LOCK, self()}, + fun() -> + io:fwrite("have lock...~n", []), + stitch_together(node(), Node) + end, [node()|nodes()]), + io:fwrite("Res = ~p~n", [Res]), + {noreply, State}; +handle_info(_Info, State) -> + io:fwrite("Got event: ~p~n", [_Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + + +stitch_together(NodeA, NodeB) -> + case lists:member(NodeB, mnesia:system_info(running_db_nodes)) of + true -> + io:fwrite("~p already stitched, it seems. All is well.~n", [NodeB]), + ok; + false -> + do_stitch_together(NodeA, NodeB) + end. + +do_stitch_together(NodeA, NodeB) -> + Tabs = affected_tables(NodeA, NodeB), + io:fwrite("Affected tabs = ~p~n", [Tabs]), + DefaultMethod = default_method(), + TabMethods = [{T, get_method(T, DefaultMethod)} || T <- Tabs], + io:fwrite("Methods = ~p~n", [TabMethods]), + mnesia:activity(transaction, + fun() -> + mnesia_controller:connect_nodes([NodeB]), + io:fwrite("stitching: ~p~n", [TabMethods]), + stitch_tabs(TabMethods, NodeB) + end). + +stitch_tabs(TabMethods, NodeB) -> + Tabs = [Tab || {Tab,_} <- TabMethods], + [mnesia:write_lock_table(T) || T <- Tabs], + [do_stitch(TM, NodeB) || TM <- TabMethods]. + + + + +do_stitch({Tab, {M, F, XArgs}} = TM, Remote) -> + io:fwrite("do_stitch(~p, ~p).~n", [TM,Remote]), + Attrs = mnesia:table_info(Tab, attributes), + S0 = #st{module = M, function = F, extra_args = XArgs, + table = Tab, attributes = Attrs, + remote = Remote, + chunk = get_table_chunk_factor(Tab), + strategy = default_strategy()}, + io:fwrite("Calling ~p:~p(init, ~p)", [M,F,[Tab,Attrs|XArgs]]), + try + run_stitch(check_return(M:F(init, [Tab, Attrs | XArgs]), S0)) + catch + throw:?DONE -> + ok + end. + +check_return(Ret, S) -> + io:fwrite(" -> ~p~n", [Ret]), + case Ret of + stop -> throw(?DONE); + {ok, St} -> + S#st{modstate = St}; + {ok, Actions, St} -> + S1 = S#st{modstate = St}, + perform_actions(Actions, S1); + {ok, Actions, Strategy, St} -> + perform_actions(Actions, new_strategy(Strategy, + S#st{modstate = St})) + end. + +new_strategy(same, S) -> + S; +new_strategy(Strategy, S) -> + S#st{strategy = Strategy}. + +perform_actions(Actions, #st{table = Tab, remote = Remote} = S) -> + local_perform_actions(Actions, Tab), + ask_remote(Remote, {actions, Tab, Actions}), + S. + + +run_stitch(#st{table = Tab, + module = M, function = F, modstate = MSt, + strategy = all_keys, remote = Remote} = St) -> + Keys = mnesia:dirty_all_keys(Tab), + lists:foldl( + fun(K, Sx) -> + [_] = A = mnesia:read({Tab,K}), % assert that A is non-empty + B = get_remote_obj(Remote, Tab, K), + io:fwrite("Calling ~p:~p(~p, ~p, ~p)~n", [M,F,A,B,MSt]), + check_return(M:F([{A, B}], MSt), Sx) + end, St, Keys). + +get_remote_obj(Remote, Tab, Key) -> + ask_remote(Remote, {get_obj, Tab, Key}). + + +write_result(Data, Tab) when is_list(Data) -> + [mnesia:dirty_write(Tab, D) || D <- Data]; +write_result(Data, Tab) -> + mnesia:dirty_write(Tab, Data). + + +ask_remote(Remote, Q) -> + rpc:call(Remote, ?MODULE, remote_handle_query, [Q]). + + +remote_handle_query(Q) -> + case Q of + {get_obj, Tab, Key} -> + mnesia:dirty_read({Tab, Key}); + {write, Tab, Data} -> + write_result(Data, Tab); + {actions, Tab, Actions} -> + local_perform_actions(Actions, Tab) + end. + + +local_perform_actions(Actions, Tab) -> + lists:foreach( + fun({write, Data}) -> + write_result(Data, Tab); + ({delete, Data}) when is_list(Data) -> + [mnesia:dirty_delete({Tab,D}) || D <- Data] + end, Actions). + + + +affected_tables(NodeA, NodeB) -> + Both = [NodeA, NodeB], + Tabs = mnesia:system_info(tables) -- [schema], + lists:filter( + fun(T) -> + Nodes = lists:concat( + [mnesia:table_info(T, C) || + C <- [ram_copies, disc_copies, + disc_only_copies]]), + (Both -- Nodes) == [] + end, Tabs). + + + +default_method() -> + get_env(default_method, ?DEFAULT_METHOD). + +default_strategy() -> + get_env(default_strategy, ?DEFAULT_STRATEGY). + +get_env(K, Default) -> + case application:get_env(K) of + undefined -> + Default; + {ok, undefined} -> + Default; + {ok, {_,_} = Meth} -> + Meth + end. + +get_method(T, Def) -> + try mnesia:read_table_property(T, unsplit_method) of + {unsplit_method,Method} -> Method + catch + exit:_ -> + Def + end. + +get_table_chunk_factor(_) -> + %% initially use 1 for testing. 100 might be a better default, + %% and it should be made configurable per-table. + 1.