Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Wait for ns_memcached to be running to run the janitor
Change-Id: Ib479289fa84c6e054f14b0e2c724e1b69566e4de
Reviewed-on: http://review.northscale.com/3182
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information
Sean Lynch authored and alk committed Oct 11, 2010
1 parent 4c67cc8 commit 6288bff
Showing 1 changed file with 49 additions and 27 deletions.
76 changes: 49 additions & 27 deletions src/ns_janitor.erl
Expand Up @@ -39,33 +39,38 @@ cleanup(Bucket) ->
case Servers of
[] -> ok;
_ ->
case sanify(Bucket, Map, Servers) of
Map -> ok;
Map1 ->
ns_bucket:set_map(Bucket, Map1)
end,
Replicas = lists:keysort(1, map_to_replicas(Map)),
ReplicaGroups = lists:ukeymerge(1, misc:keygroup(1, Replicas),
[{N, []} || N <- lists:sort(Servers)]),
NodesReplicas = lists:map(fun ({Src, R}) -> % R is the replicas for this node
{Src, [{V, Dst} || {_, Dst, V} <- R]}
end, ReplicaGroups),
LiveNodes = [node()|nodes()],
lists:foreach(
fun ({Src, R}) ->
case lists:member(Src, LiveNodes) of
true ->
try ns_vbm_sup:set_replicas(
Src, Bucket, R)
catch
E:R ->
?log_error("Unable to start replicators on ~p for bucket ~p: ~p",
[Src, Bucket, {E, R}])
end;
false ->
ok
end
end, NodesReplicas)
case wait_for_memcached(Servers, Bucket, 5) of
[] ->
case sanify(Bucket, Map, Servers) of
Map -> ok;
Map1 ->
ns_bucket:set_map(Bucket, Map1)
end,
Replicas = lists:keysort(1, map_to_replicas(Map)),
ReplicaGroups = lists:ukeymerge(1, misc:keygroup(1, Replicas),
[{N, []} || N <- lists:sort(Servers)]),
NodesReplicas = lists:map(fun ({Src, R}) -> % R is the replicas for this node
{Src, [{V, Dst} || {_, Dst, V} <- R]}
end, ReplicaGroups),
LiveNodes = [node()|nodes()],
lists:foreach(
fun ({Src, R}) ->
case lists:member(Src, LiveNodes) of
true ->
try ns_vbm_sup:set_replicas(
Src, Bucket, R)
catch
E:R ->
?log_error("Unable to start replicators on ~p for bucket ~p: ~p",
[Src, Bucket, {E, R}])
end;
false ->
ok
end
end, NodesReplicas);
Down ->
?log_error("Bucket ~p not yet ready on ~p", [Bucket, Down])
end
end.


Expand Down Expand Up @@ -264,3 +269,20 @@ map_to_replicas([Chain|Rest], V, Replicas) ->
Pairs = [{Src, Dst, V}||{Src, Dst} <- misc:pairs(Chain),
Src /= undefined andalso Dst /= undefined],
map_to_replicas(Rest, V+1, [Pairs|Replicas]).


%%
%% Internal functions
%%

wait_for_memcached(Nodes, _Bucket, 0) ->
Nodes;
wait_for_memcached(Nodes, Bucket, Tries) ->
case [Node || Node <- Nodes, not ns_memcached:connected(Node, Bucket)] of
[] ->
[];
Down ->
timer:sleep(1000),
?log_info("Waiting for ~p on ~p", [Bucket, Down]),
wait_for_memcached(Down, Bucket, Tries-1)
end.

0 comments on commit 6288bff

Please sign in to comment.