@@ -21,11 +21,10 @@
-define (MAX_VBUCKETS , 512 ). % % Maximum # of vbuckets for a vbucketmigrator
--record (child_id , {bucket :: nonempty_string (),
- vbuckets :: [non_neg_integer (), ...],
+-record (child_id , {vbuckets :: [non_neg_integer (), ...],
dest_node :: atom ()}).
--export ([start_link /0 ,
+-export ([start_link /1 ,
kill_children /3 ,
kill_dst_children /3 ,
replicators /2 ,
@@ -35,19 +34,18 @@
-export ([init /1 ]).
% % API
-start_link () ->
- supervisor :start_link ({local , ? MODULE }, ? MODULE , []).
+start_link (Bucket ) ->
+ supervisor :start_link ({local , server ( Bucket ) }, ? MODULE , []).
replicators (Nodes , Bucket ) ->
lists :flatmap (
fun (Node ) ->
- try children (Node ) of
+ try children (Node , Bucket ) of
Children ->
[{Node , Dst , VBucket } ||
- # child_id {bucket = B , vbuckets = VBuckets , dest_node = Dst }
+ # child_id {vbuckets = VBuckets , dest_node = Dst }
<- Children ,
- VBucket <- VBuckets ,
- B == Bucket ]
+ VBucket <- VBuckets ]
catch
_ :_ -> []
end
@@ -114,49 +112,55 @@ split_vbuckets(VBuckets, L) ->
split_vbuckets (T , [H |L ])
end .
-kill_child (Node , Child ) ->
- case supervisor :terminate_child ({? MODULE , Node }, Child ) of
+- spec kill_child (node (), nonempty_string (), # child_id {}) ->
+ ok .
+kill_child (Node , Bucket , Child ) ->
+ case supervisor :terminate_child ({server (Bucket ), Node }, Child ) of
ok ->
- supervisor :delete_child ({? MODULE , Node }, Child );
+ supervisor :delete_child ({server ( Bucket ) , Node }, Child );
{error , not_found } ->
ok
end .
+- spec kill_children (node (), nonempty_string (), [non_neg_integer ()]) ->
+ [# child_id {}].
kill_children (Node , Bucket , VBuckets ) ->
% % Kill any existing children for these VBuckets
- Children = [Id || Id = # child_id {bucket = B , vbuckets = Vs } <- children ( Node ),
- B == Bucket ,
+ Children = [Id || Id = # child_id {vbuckets = Vs } <-
+ children ( Node , Bucket ) ,
Vs -- VBuckets /= Vs ],
lists :foreach (fun (Child ) ->
- kill_child (Node , Child )
+ kill_child (Node , Bucket , Child )
end , Children ),
Children .
+- spec kill_dst_children (node (), nonempty_string (), node ()) ->
+ ok .
kill_dst_children (Node , Bucket , Dst ) ->
- Children = [Id || Id = # child_id {bucket = B , dest_node = D } <- children (Node ),
- B == Bucket ,
+ Children = [Id || Id = # child_id {dest_node = D } <- children (Node , Bucket ),
D == Dst ],
lists :foreach (fun (Child ) ->
- kill_child (Node , Child )
+ kill_child (Node , Bucket , Child )
end , Children ).
+- spec kill_runaway_children (node (), nonempty_string (),
+ [{non_neg_integer (), node ()}]) ->
+ [# child_id {}].
kill_runaway_children (Node , Bucket , Replicas ) ->
% % Kill any children not in Replicas
- Children = [Child || Child = # child_id {bucket = B } <- children (Node ),
- B == Bucket ],
{GoodChildren , Runaways } =
lists :partition (
fun (# child_id {vbuckets = VBuckets , dest_node = DstNode }) ->
NodeReplicas = [{V , DstNode } || V <- VBuckets ],
lists :all (fun (NR ) -> lists :member (NR , Replicas ) end ,
NodeReplicas )
- end , Children ),
+ end , children ( Node , Bucket ) ),
lists :foreach (
fun (Runaway ) ->
? log_info (
" Killing replicator ~p on node ~p " ,
[Runaway , Node ]),
- kill_child (Node , Runaway )
+ kill_child (Node , Bucket , Runaway )
end , Runaways ),
GoodChildren .
@@ -191,17 +195,24 @@ args(Node, Bucket, VBuckets, DstNode, TakeOver) ->
[use_stdio , stderr_to_stdout ,
{write_data , [Pass , " \n " ]}]].
-- spec children (atom ()) -> [# child_id {}].
-children (Node ) ->
- [Id || {Id , _ , _ , _ } <- supervisor :which_children ({? MODULE , Node })].
+- spec children (node (), nonempty_string ()) -> [# child_id {}].
+children (Node , Bucket ) ->
+ [Id || {Id , _ , _ , _ } <- supervisor :which_children ({server (Bucket ), Node })].
+
+
+- spec server (nonempty_string ()) ->
+ atom ().
+server (Bucket ) ->
+ list_to_atom (? MODULE_STRING " -" ++ Bucket ).
+
- spec start_child (atom (), nonempty_string (), [non_neg_integer (),...], atom ()) ->
{ok , pid ()}.
start_child (Node , Bucket , VBuckets , DstNode ) ->
PortServerArgs = args (Node , Bucket , VBuckets , DstNode , false ),
? log_info (" Args =~n~p " ,
[PortServerArgs ]),
- ChildSpec = {# child_id {bucket = Bucket , vbuckets = VBuckets , dest_node = DstNode },
+ ChildSpec = {# child_id {vbuckets = VBuckets , dest_node = DstNode },
{ns_port_server , start_link , PortServerArgs },
permanent , 10 , worker , [ns_port_server ]},
- supervisor :start_child ({? MODULE , Node }, ChildSpec ).
+ supervisor :start_child ({server ( Bucket ) , Node }, ChildSpec ).
0 comments on commit
d09bd46