Skip to content

Commit

Permalink
Towards partisan support
Browse files Browse the repository at this point in the history
Applied all the patches as in the riak_core with partisan variant at https://github.com/lasp-lang/riak_core/tree/partisan-support-r21

Some unit tests are still failing (worker_pool_test, riak_core_claim_sim), unclear why.
  • Loading branch information
bieniusa committed Nov 16, 2020
1 parent 0733f18 commit 14fd0f2
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 29 deletions.
4 changes: 3 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
{erl_opts, [debug_info]}.
{deps, [
% worker pool library
{poolboy, "~>1.5.2"}
{poolboy, "~>1.5.2"},
{partisan, ".*", {git, "https://github.com/lasp-lang/partisan", {branch, "master"}}},
{riak_core_partisan_utils, ".*", {git, "https://github.com/lasp-lang/riak_core_partisan_utils", {branch, "master"}}}
]}.
%% Defensive xref configuration
{xref_checks, [ undefined_function_calls, locals_not_used, deprecated_function_calls ]}.
Expand Down
24 changes: 22 additions & 2 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
{"1.1.0",
[{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.2">>},0}]}.
[{<<"acceptor_pool">>,{pkg,<<"acceptor_pool">>,<<"1.0.0">>},1},
{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},2},
{<<"lager">>,{pkg,<<"lager">>,<<"3.8.0">>},1},
{<<"partisan">>,
{git,"https://github.com/lasp-lang/partisan",
{ref,"b122d73d664ae7259438d3c9c4e93d34becec08a"}},
0},
{<<"poolboy">>,{pkg,<<"poolboy">>,<<"1.5.2">>},0},
{<<"quickrand">>,{pkg,<<"quickrand">>,<<"1.7.5">>},2},
{<<"riak_core_partisan_utils">>,
{git,"https://github.com/lasp-lang/riak_core_partisan_utils",
{ref,"c1e4941a60796dcf754c7eb9df61fb17b1c5498e"}},
0},
{<<"types">>,{pkg,<<"types">>,<<"0.1.8">>},1},
{<<"uuid">>,{pkg,<<"uuid_erl">>,<<"1.7.5">>},1}]}.
[
{pkg_hash,[
{<<"poolboy">>, <<"392B007A1693A64540CEAD79830443ABF5762F5D30CF50BC95CB2C1AAAFA006B">>}]}
{<<"acceptor_pool">>, <<"43C20D2ACAE35F0C2BCD64F9D2BDE267E459F0F3FD23DAB26485BF518C281B21">>},
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"lager">>, <<"3402B9A7E473680CA179FC2F1D827CAB88DD37DD1E6113090C6F45EF05228A1C">>},
{<<"poolboy">>, <<"392B007A1693A64540CEAD79830443ABF5762F5D30CF50BC95CB2C1AAAFA006B">>},
{<<"quickrand">>, <<"E3086A153EB13A057FC19192D05E2D4C6BB2BDBB55746A699BEAE9847AC17CA8">>},
{<<"types">>, <<"5782B67231E8C174FE2835395E71E669FE0121076779D2A09F1C0D58EE0E2F13">>},
{<<"uuid">>, <<"3862FF9A21C42566DFD0376B97512FA202922897129E09A05E2AFA0D9CAFD97A">>}]}
].
2 changes: 1 addition & 1 deletion src/riak_core.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[{description, "Riak Core Lite"},
{vsn,"0.10.1"},
{applications,
[kernel, stdlib, crypto, os_mon, poolboy]
[kernel, stdlib, crypto, os_mon, poolboy, partisan]
},
{mod, {riak_core_app, []}},
{env,
Expand Down
12 changes: 10 additions & 2 deletions src/riak_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ get_other_ring(Node) ->
standard_join(Node, Rejoin, Auto) when is_atom(Node) ->
case net_adm:ping(Node) of
pong ->
%% Initiate the partisan connections.
riak_core_partisan_utils:join(Node),
case get_other_ring(Node) of
{ok, Ring} -> standard_join(Node, Ring, Rejoin, Auto);
_ -> {error, unable_to_get_join_ring}
Expand All @@ -116,13 +118,15 @@ standard_join(Node, Ring, Rejoin, Auto) ->
InitComplete = init_complete(init:get_status()),
SameSize = riak_core_ring:num_partitions(MyRing) =:=
riak_core_ring:num_partitions(Ring),
Singleton = [node()] =:=
riak_core_ring:all_members(MyRing),
RemoteMembers = riak_core_ring:all_members(MyRing),
Singleton = ([node()] =:= RemoteMembers),
case {InitComplete, Rejoin or Singleton, SameSize} of
{false, _, _} -> {error, node_still_starting};
{_, false, _} -> {error, not_single_node};
{_, _, false} -> {error, different_ring_sizes};
_ ->
%% Connect all incoming members via partisan.
riak_core_partisan_utils:join(RemoteMembers),
Ring2 = riak_core_ring:add_member(node(), Ring, node()),
Ring3 = riak_core_ring:set_owner(Ring2, node()),
Ring4 = riak_core_ring:update_member_meta(node(), Ring3,
Expand Down Expand Up @@ -201,6 +205,10 @@ leave() ->
end.

standard_leave(Node) ->
%% Force leave from partisan
riak_core_partisan_utils:leave(Node),

%% Perform ring transition
riak_core_ring_manager:ring_trans(fun (Ring2, _) ->
Ring3 =
riak_core_ring:leave_member(Node,
Expand Down
2 changes: 2 additions & 0 deletions src/riak_core_apl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,14 @@ chbin_test_() ->
{timeout, 180, fun chbin_test_scenario/0}.

chbin_test_scenario() ->
meck:new(riak_core_partisan_utils, [unstick]),
[chbin_test_scenario(Size, NumNodes)
|| Size <- [32, 64, 128],
NumNodes <- [1, 2, 3, 4, 5, 8, Size div 4]],
ok.

chbin_test_scenario(Size, NumNodes) ->
meck:expect(riak_core_partisan_utils, update, fun(_) -> ok end),
RingTop = 1 bsl 160,
Ring = riak_core_test_util:fake_ring(Size, NumNodes),
Nodes = riak_core_ring:all_members(Ring),
Expand Down
9 changes: 7 additions & 2 deletions src/riak_core_gossip.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,12 @@ handle_cast({send_ring_to, Node}, State) ->
riak_core_ring:check_tainted(RingOut,
"Error: riak_core_gossip/send_ring_to "
":: Sending tainted ring over gossip"),
gen_server:cast({?MODULE, Node},
{reconcile_ring, RingOut}),

%% This will force partisan connections to initialize, bootstrapping
%% off of disterl -- this connection may be pruned by partisan but
%% once the member is valid in the ring -- it will persist.
%%
riak_core_partisan_utils:cast(gossip, {?MODULE, Node}, {reconcile_ring, RingOut}),
Tokens = State#state.gossip_tokens - 1,
{noreply, State#state{gossip_tokens = Tokens}};
handle_cast({distribute_ring, Ring}, State) ->
Expand Down Expand Up @@ -249,6 +253,7 @@ reconcile(Ring0, [OtherRing0]) ->
Ring3 = riak_core_ring:ring_changed(Node, Ring2),
%% STATS
% riak_core_stat:update(rings_reconciled),
riak_core_ring:notify_external_membership(Ring3),
log_membership_changes(Ring, Ring3),
{reconciled_ring, Ring3};
{_, _, _} -> ignore
Expand Down
99 changes: 99 additions & 0 deletions src/riak_core_partisan_proxy_service.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Christopher S. Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(riak_core_partisan_proxy_service).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").

-behaviour(gen_server).

%% API
-export([start_link/0,
start_link/1]).

%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).

%% State record.
-record(state, {}).

%%%===================================================================
%%% API
%%%===================================================================

%% @doc Same as start_link([]).
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
start_link([]).

%% @doc Start and link to calling process.
-spec start_link(list())-> {ok, pid()} | ignore | {error, term()}.
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%% @private
-spec init([]) -> {ok, #state{}}.
init([]) ->
{ok, #state{}}.

%% @private
-spec handle_call(term(), {pid(), term()}, #state{}) ->
{reply, term(), #state{}}.

%% @private
handle_call(Msg, _From, State) ->
_ = logger:warning("Unhandled messages: ~p", [Msg]),
{reply, ok, State}.

-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
%% @private
handle_cast(Msg, State) ->
_ = logger:warning("Unhandled messages: ~p", [Msg]),
{noreply, State}.

%% @private
handle_info({forward, Pid, Message}, State) ->
erlang:send(Pid, Message),
{noreply, State};
handle_info(Msg, State) ->
_ = logger:warning("Unhandled messages: ~p", [Msg]),
{noreply, State}.

%% @private
-spec terminate(term(), #state{}) -> term().
terminate(_Reason, _State) ->
ok.

%% @private
-spec code_change(term() | {down, term()}, #state{}, term()) ->
{ok, #state{}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================
49 changes: 36 additions & 13 deletions src/riak_core_ring.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
random_node/1, random_other_index/1,
random_other_index/2, random_other_node/1, reconcile/2,
rename_node/3, responsible_index/2, transfer_node/3,
update_meta/3, remove_meta/2]).
update_meta/3, remove_meta/2, notify_external_membership/1]).

-export([cluster_name/1, set_tainted/1, check_tainted/2,
nearly_equal/2, claimant/1, member_status/2,
Expand Down Expand Up @@ -688,17 +688,20 @@ update_member_meta(Node, State, Member, Key, Val) ->
update_member_meta(Node, State, Member, Key, Val,
same_vclock) ->
Members = State#chstate.members,
case orddict:is_key(Member, Members) of
Members2 = case orddict:is_key(Member, Members) of
true ->
Members2 = orddict:update(Member,
fun ({Status, VC, MD}) ->
{Status, vclock:increment(Node, VC),
orddict:store(Key, Val, MD)}
end,
Members),
State#chstate{members = Members2};
false -> State
end.
orddict:update(Member,
fun({Status, VC, MD}) ->
{Status,
vclock:increment(Node, VC),
orddict:store(Key, Val, MD)}
end,
Members);
false -> Members
end,
notify_external_membership(State#chstate{members=Members2}),

State#chstate{members=Members2}.

clear_member_meta(Node, State, Member) ->
Members = State#chstate.members,
Expand Down Expand Up @@ -744,6 +747,7 @@ set_member(Node, CState, Member, Status, same_vclock) ->
{Status, vclock:increment(Node, vclock:fresh()),
[]},
CState#chstate.members),
notify_external_membership(CState#chstate{members=Members2}),
CState#chstate{members = Members2}.

%% @doc Return a list of all members of the cluster that are eligible to
Expand Down Expand Up @@ -1594,7 +1598,7 @@ reconcile_ring(StateA = #chstate{claimant = Claimant1,
V2Newer = vclock:descends(VC2, VC1),
EqualVC = vclock:equal(VC1, VC2) and
(Claimant1 =:= Claimant2),
case {EqualVC, V1Newer, V2Newer} of
New = case {EqualVC, V1Newer, V2Newer} of
{true, _, _} ->
Next = reconcile_next(Next1, Next2),
StateA#chstate{next = Next};
Expand Down Expand Up @@ -1647,7 +1651,9 @@ reconcile_ring(StateA = #chstate{claimant = Claimant1,
StateB#chstate{next = Next}
end
end
end.
end,
notify_external_membership(New),
New.

%% @private
merge_status(invalid, _) -> invalid;
Expand Down Expand Up @@ -1760,6 +1766,23 @@ filtered_seen(State = #chstate{seen = Seen}) ->
Seen)
end.

notify_external_membership(_State=#chstate{members=Members}) ->
%% Update membership in partisan.
Valid = orddict:fold(fun(N, {S, _VC, _MD}, Acc) ->
case S of
valid ->
Acc ++ [N];
joining ->
Acc ++ [N];
_ ->
Acc
end
end, [], Members),

ok = riak_core_partisan_utils:update(Valid),

ok.

%% ===================================================================
%% EUnit tests
%% ===================================================================
Expand Down
17 changes: 16 additions & 1 deletion src/riak_core_ring_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
prune_ringfiles/0, read_ringfile/1,
find_latest_ringfile/0, force_update/0,
do_write_ringfile/1, ring_trans/2, set_cluster_name/1,
is_stable_ring/0]).
is_stable_ring/0, bloat_ring/0]).

-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
Expand Down Expand Up @@ -110,6 +110,19 @@ start_link(test) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [test],
[]).

bloat_ring() ->
Fun = fun(Ring, _Args) ->
logger:warning("Bloating ring beginning!"),
Key = '$bloat',
Value = list_to_binary(lists:flatten(lists:map(fun(X) -> integer_to_list(X) end, lists:seq(1, 1000000)))),
Ring = lists:foldl(fun(X, AccRing) ->
riak_core_ring:update_meta({Key, X}, Value, AccRing)
end, Ring, lists:seq(1, 100)),
{new_ring, Ring}
end,
gen_server:call(?MODULE, {ring_trans, Fun, []}, infinity).


-spec get_my_ring() -> {ok,
riak_core_ring:riak_core_ring()} |
{error, any()}.
Expand Down Expand Up @@ -227,6 +240,8 @@ do_write_ringfile(Ring, FN) ->
%% @spec find_latest_ringfile() -> string()
find_latest_ringfile() ->
Dir = ring_dir(),
%% Append 'ring', because ensure_dir only checks parent directories.
ok = filelib:ensure_dir(Dir ++ "/ring"),
case file:list_dir(Dir) of
{ok, Filenames} ->
{ok, Cluster} = application:get_env(riak_core,
Expand Down
Loading

0 comments on commit 14fd0f2

Please sign in to comment.