Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

first commit

  • Loading branch information...
commit 649ad0b70736c78024816a1d78665bd65a744ded 0 parents
Ulf Wiger authored
12 README
@@ -0,0 +1,12 @@
+unsplit - Resolve conflicts in Mnesia tables after network split
+
+Author: Ulf Wiger, Erlang Solutions Ltd
+
+
+NOTE: This application is currently a fairly crude prototype,
+ and is not intended for mission-critical tasks (yet).
+
+Documentation will follow, as soon as the approach has been
+verified to work. So far, no Mnesia patches are used, but
+for it to work really well, some added functionality in
+Mnesia may well be needed.
26 commands.txt
@@ -0,0 +1,26 @@
+erl -boot start_sasl -kernel dist_auto_connect once -sname n1 -pa ~/src/unsplit-0.1/ebin -mnesia debug trace
+erl -boot start_sasl -kernel dist_auto_connect once -sname n2 -pa ~/src/unsplit-0.1/ebin -mnesia debug trace
+
+mnesia:start().
+application:start(unsplit).
+rd(test,{key,modified=erlang:now(),value}).
+
+On n2@debian:
+
+mnesia:create_schema([n1@debian,n2@debian]).
+
+mnesia:delete_table(test).
+mnesia:create_table(test,[{ram_copies,[n1@debian,n2@debian]},{attributes,[key,modified,value]},{user_properties,[{unsplit_method,{unsplit_lib,last_modified,[]}}]}]).
+
+mnesia:transaction(fun() -> mnesia:write(#test{key=1,value=a}) end).
+mnesia:transaction(fun() -> mnesia:write(#test{key=2,value=a}) end).
+
+ets:tab2list(test).
+
+
+On n1@debian:
+
+disconnect_node(n2@debian).
+mnesia:transaction(fun() -> mnesia:write(#test{key=2,value=b}) end).
+timer:sleep(3000).
+net_kernel:connect_node(n2@debian).
7 ebin/unsplit.app
@@ -0,0 +1,7 @@
+{application, unsplit,
+ [{vsn, "0.1"},
+ {description, "Merges mnesia tables after net split"},
+ {applications, [mnesia]},
+ {mod, {unsplit, []}},
+ {env, []}
+ ]}.
22 src/unsplit.erl
@@ -0,0 +1,22 @@
+-module(unsplit).
+
+-behaviour(application).
+-behaviour(supervisor).
+
+-export([start/2, stop/1]).
+
+
+-export([init/1]).
+
+start(_, _) ->
+ supervisor:start_link({local,?MODULE}, ?MODULE, []).
+
+stop(_) ->
+ ok.
+
+%% Supervisor callback:
+
+init([]) ->
+ Children = [{unsplit_server, {unsplit_server, start_link, []},
+ permanent, 3000, worker, [unsplit_server]}],
+ {ok, {{one_for_one, 3, 10}, Children}}.
69 src/unsplit_lib.erl
@@ -0,0 +1,69 @@
+-module(unsplit_lib).
+-export([no_action/2,
+ last_modified/2,
+ last_version/2]).
+
+
+
+no_action(start, [Tab|_]) ->
+ error_logger:format("Will not merge table ~p~n", [Tab]),
+ stop.
+
+
+last_modified(init, S0) ->
+ last_version(init, S0 ++ [modified]);
+last_modified(Other, S) ->
+ last_version(Other, S).
+
+last_version(init, [Tab, Attrs, Attr]) ->
+ case lists:member(Attr, Attrs) of
+ false ->
+ error_logger:format("Cannot merge table ~p."
+ "Missing ~p attribute~n", [Tab, Attr]),
+ stop;
+ true ->
+ io:fwrite("Starting merge of ~p (~p)~n", [Tab, Attrs]),
+ {ok, {Tab, pos(Attr, Tab, Attrs)}}
+ end;
+last_version(done, _) ->
+ ok;
+last_version(Objs, {T, P} = S) when is_list(Objs) ->
+ Actions = lists:map(fun(Obj) ->
+ last_version_entry(Obj, T, P)
+ end, Objs),
+ {ok, Actions, same, S}.
+
+
+last_version_entry(Obj, T, P) ->
+ io:fwrite("last_version_entry(~p)~n", [Obj]),
+ case Obj of
+ {A, []} -> {write, A};
+ {[], B} -> {write, B};
+ {[A], [B]} ->
+ ModA = element(P, A),
+ ModB = element(P, B),
+ io:fwrite("ModA = ~p, ModB = ~p~n", [ModA, ModB]),
+ if ModA < ModB ->
+ {write, B};
+ ModA > ModB ->
+ {write, A};
+ ModA == ModB ->
+ if A =/= B ->
+ mnesia:abort({undecided,T,A,B});
+ true ->
+ {write, A}
+ end
+ end
+ end.
+
+
+
+pos(A, T, L) ->
+ pos(A, T, L, 2). % record tag is the 1st element in the tuple
+
+pos(H, _, [H|_], P) ->
+ P;
+pos(H, Tab, [_|T], P) ->
+ pos(H, Tab, T, P+1);
+pos(A, Tab, [], _) ->
+ mnesia:abort({missing_attribute, Tab, A}).
288 src/unsplit_server.erl
@@ -0,0 +1,288 @@
+%%%-------------------------------------------------------------------
+%%% File : unsplit_server.erl
+%%% Author : Ulf Wiger <ulf.wiger@erlang-solutions.com>
+%%% Description : Coordinator for merging mnesia tables after netsplit
+%%%
+%%% Created : 1 Feb 2010 by Ulf Wiger <ulf.wiger@erlang-solutions.com>
+%%%-------------------------------------------------------------------
+-module(unsplit_server).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+-export([remote_handle_query/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {}).
+-record(st, {module, function, extra_args = [],
+ modstate,
+ table, attributes,
+ remote,
+ chunk,
+ strategy = default_strategy(),
+ progress}).
+
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_METHOD, {unsplit_lib, no_action, []}).
+-define(DEFAULT_STRATEGY, all_keys).
+-define(TIMEOUT, 10000).
+
+-define(DONE, {?MODULE,done}).
+
+-define(LOCK, {?MODULE, stitch}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ mnesia:subscribe(system),
+ {ok, #state{}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({mnesia_system_event,
+ {inconsistent_database, Context, Node}}, State) ->
+ io:fwrite("inconsistency. Context = ~p; Node = ~p~n", [Context, Node]),
+ Res = global:trans(
+ {?LOCK, self()},
+ fun() ->
+ io:fwrite("have lock...~n", []),
+ stitch_together(node(), Node)
+ end, [node()|nodes()]),
+ io:fwrite("Res = ~p~n", [Res]),
+ {noreply, State};
+handle_info(_Info, State) ->
+ io:fwrite("Got event: ~p~n", [_Info]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+
+stitch_together(NodeA, NodeB) ->
+ case lists:member(NodeB, mnesia:system_info(running_db_nodes)) of
+ true ->
+ io:fwrite("~p already stitched, it seems. All is well.~n", [NodeB]),
+ ok;
+ false ->
+ do_stitch_together(NodeA, NodeB)
+ end.
+
+do_stitch_together(NodeA, NodeB) ->
+ Tabs = affected_tables(NodeA, NodeB),
+ io:fwrite("Affected tabs = ~p~n", [Tabs]),
+ DefaultMethod = default_method(),
+ TabMethods = [{T, get_method(T, DefaultMethod)} || T <- Tabs],
+ io:fwrite("Methods = ~p~n", [TabMethods]),
+ mnesia:activity(transaction,
+ fun() ->
+ mnesia_controller:connect_nodes([NodeB]),
+ io:fwrite("stitching: ~p~n", [TabMethods]),
+ stitch_tabs(TabMethods, NodeB)
+ end).
+
+stitch_tabs(TabMethods, NodeB) ->
+ Tabs = [Tab || {Tab,_} <- TabMethods],
+ [mnesia:write_lock_table(T) || T <- Tabs],
+ [do_stitch(TM, NodeB) || TM <- TabMethods].
+
+
+
+
+do_stitch({Tab, {M, F, XArgs}} = TM, Remote) ->
+ io:fwrite("do_stitch(~p, ~p).~n", [TM,Remote]),
+ Attrs = mnesia:table_info(Tab, attributes),
+ S0 = #st{module = M, function = F, extra_args = XArgs,
+ table = Tab, attributes = Attrs,
+ remote = Remote,
+ chunk = get_table_chunk_factor(Tab),
+ strategy = default_strategy()},
+ io:fwrite("Calling ~p:~p(init, ~p)", [M,F,[Tab,Attrs|XArgs]]),
+ try
+ run_stitch(check_return(M:F(init, [Tab, Attrs | XArgs]), S0))
+ catch
+ throw:?DONE ->
+ ok
+ end.
+
+check_return(Ret, S) ->
+ io:fwrite(" -> ~p~n", [Ret]),
+ case Ret of
+ stop -> throw(?DONE);
+ {ok, St} ->
+ S#st{modstate = St};
+ {ok, Actions, St} ->
+ S1 = S#st{modstate = St},
+ perform_actions(Actions, S1);
+ {ok, Actions, Strategy, St} ->
+ perform_actions(Actions, new_strategy(Strategy,
+ S#st{modstate = St}))
+ end.
+
+new_strategy(same, S) ->
+ S;
+new_strategy(Strategy, S) ->
+ S#st{strategy = Strategy}.
+
+perform_actions(Actions, #st{table = Tab, remote = Remote} = S) ->
+ local_perform_actions(Actions, Tab),
+ ask_remote(Remote, {actions, Tab, Actions}),
+ S.
+
+
+run_stitch(#st{table = Tab,
+ module = M, function = F, modstate = MSt,
+ strategy = all_keys, remote = Remote} = St) ->
+ Keys = mnesia:dirty_all_keys(Tab),
+ lists:foldl(
+ fun(K, Sx) ->
+ [_] = A = mnesia:read({Tab,K}), % assert that A is non-empty
+ B = get_remote_obj(Remote, Tab, K),
+ io:fwrite("Calling ~p:~p(~p, ~p, ~p)~n", [M,F,A,B,MSt]),
+ check_return(M:F([{A, B}], MSt), Sx)
+ end, St, Keys).
+
+get_remote_obj(Remote, Tab, Key) ->
+ ask_remote(Remote, {get_obj, Tab, Key}).
+
+
+write_result(Data, Tab) when is_list(Data) ->
+ [mnesia:dirty_write(Tab, D) || D <- Data];
+write_result(Data, Tab) ->
+ mnesia:dirty_write(Tab, Data).
+
+
+ask_remote(Remote, Q) ->
+ rpc:call(Remote, ?MODULE, remote_handle_query, [Q]).
+
+
+remote_handle_query(Q) ->
+ case Q of
+ {get_obj, Tab, Key} ->
+ mnesia:dirty_read({Tab, Key});
+ {write, Tab, Data} ->
+ write_result(Data, Tab);
+ {actions, Tab, Actions} ->
+ local_perform_actions(Actions, Tab)
+ end.
+
+
+local_perform_actions(Actions, Tab) ->
+ lists:foreach(
+ fun({write, Data}) ->
+ write_result(Data, Tab);
+ ({delete, Data}) when is_list(Data) ->
+ [mnesia:dirty_delete({Tab,D}) || D <- Data]
+ end, Actions).
+
+
+
+affected_tables(NodeA, NodeB) ->
+ Both = [NodeA, NodeB],
+ Tabs = mnesia:system_info(tables) -- [schema],
+ lists:filter(
+ fun(T) ->
+ Nodes = lists:concat(
+ [mnesia:table_info(T, C) ||
+ C <- [ram_copies, disc_copies,
+ disc_only_copies]]),
+ (Both -- Nodes) == []
+ end, Tabs).
+
+
+
+default_method() ->
+ get_env(default_method, ?DEFAULT_METHOD).
+
+default_strategy() ->
+ get_env(default_strategy, ?DEFAULT_STRATEGY).
+
+get_env(K, Default) ->
+ case application:get_env(K) of
+ undefined ->
+ Default;
+ {ok, undefined} ->
+ Default;
+ {ok, {_,_} = Meth} ->
+ Meth
+ end.
+
+get_method(T, Def) ->
+ try mnesia:read_table_property(T, unsplit_method) of
+ {unsplit_method,Method} -> Method
+ catch
+ exit:_ ->
+ Def
+ end.
+
+get_table_chunk_factor(_) ->
+ %% initially use 1 for testing. 100 might be a better default,
+ %% and it should be made configurable per-table.
+ 1.
Please sign in to comment.
Something went wrong with that request. Please try again.