Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBD-816 Recovery mode support.

When a membase (couchbase) bucket has some vbuckets missing it can
be put into a recovery mode using startRecovery REST call:

 curl -sX POST -u Administrator:asdasd \
      http://lh:9000/pools/default/buckets/default/controller/startRecovery

In case of success, the response looks as follows:

 {
     "code": "ok",
     "recoveryMap": [
         {
             "node": "n_1@10.17.40.207",
             "vbuckets": [
                 33,
                 34,
                 35,
                 36,
                 37,
                 38,
                 39,
                 40,
                 41,
                 42,
                 54,
                 55,
                 56,
                 57,
                 58,
                 59,
                 60,
                 61,
                 62,
                 63
             ]
         }
     ],
     "uuid": "8e02b3a84e0bbf58cbbb58919f1a6563"
 }

So in this case replica vbuckets 33-42 and 54-63 were created on
node n_2@10.17.40.207. Now the client can start pushing data to
these vbuckets.

All the important recovery URIs are advertised via tasks:

 curl -sX GET -u 'Administrator:asdasd' http://lh:9000/pools/default/tasks

 [
     {
         "bucket": "default",
         "commitVbucketURI": "/pools/default/buckets/default/controller/commitVBucket?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
         "recommendedRefreshPeriod": 10.0,
         "recoveryStatusURI": "/pools/default/buckets/default/recoveryStatus?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
         "stopURI": "/pools/default/buckets/default/controller/stopRecovery?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
         "type": "recovery",
         "uuid": "8e02b3a84e0bbf58cbbb58919f1a6563"
     },
     {
         "status": "notRunning",
         "type": "rebalance"
     }
 ]

 - stopURI can be used to abort the recovery
 - recoveryStatusURI will return information about the recovery in the
   same format as startRecovery
 - commitVBucketURI will activate certain vbucket

   This call should be used after the client is done with pushing
   the data to it. VBucket is passed as a POST parameter:

    curl -sX POST -u 'Administrator:asdasd' \
         http://lh:9000/pools/default/buckets/default/controller/commitVBucket?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563 \
         -d vbucket=33

    {
        "code": "ok"
    }

All the recovery related REST calls return a JSON object having a
"code" field. This (together with HTTP status code) indicates if the
call was successful.

Here's a complete list of possible REST calls replies.

 - startRecovery

   +-------------+-------------------+------------------------------------+
   | HTTP Status |       Code        |              Comment               |
   |             |                   |                                    |
   +-------------+-------------------+------------------------------------+
   |          200|        ok         |Recovery started. Recovery map is   |
   |             |                   |returned in recoveryMap field.      |
   +-------------+-------------------+------------------------------------+
   |          400|    unsupported    |Not all nodes in the cluster support|
   |             |                   |recovery.                           |
   +-------------+-------------------+------------------------------------+
   |          400|    not_needed     |Recovery is not needed.             |
   +-------------+-------------------+------------------------------------+
   |          404|    not_present    |Specified bucket not found.         |
   +-------------+-------------------+------------------------------------+
   |          500|   failed_nodes    |Could not start recovery because    |
   |             |                   |some nodes failed. A list of failed |
   |             |                   |nodes can be found in the           |
   |             |                   |"failedNodes" field of the reply.   |
   +-------------+-------------------+------------------------------------+
   |          503| rebalance_running |Could not start recovery because    |
   |             |                   |rebalance is running.               |
   +-------------+-------------------+------------------------------------+

 - stopRecovery

   +-------------+---------------+------------------------------------+
   | HTTP Status |     Code      |              Comment               |
   |             |               |                                    |
   +-------------+---------------+------------------------------------+
   |          200|      ok       |Recovery stopped successfully.      |
   +-------------+---------------+------------------------------------+
   |          400| uuid_missing  |recovery_uuid query parameter has   |
   |             |               |not been specified.                 |
   +-------------+---------------+------------------------------------+
   |          404| bad_recovery  |Either no recovery is in progress or|
   |             |               |provided uuid does not match the    |
   |             |               |uuid of running recovery.           |
   +-------------+---------------+------------------------------------+

 - commitVBucket

   +-------------+------------------------+------------------------------------+
   | HTTP Status |          Code          |              Comment               |
   |             |                        |                                    |
   +-------------+------------------------+------------------------------------+
   |          200|           ok           |VBucket commited successfully.      |
   +-------------+------------------------+------------------------------------+
   |          200|   recovery_completed   |VBucket commited successfully. No   |
   |             |                        |more vbuckets to recover. So the    |
   |             |                        |cluster is not in recovery mode     |
   |             |                        |anymore.                            |
   +-------------+------------------------+------------------------------------+
   |          400|      uuid_missing      |recovery_uuid query parameter has   |
   |             |                        |not been specified.                 |
   +-------------+------------------------+------------------------------------+
   |          400| bad_or_missing_vbucket |VBucket is either unspecified or    |
   |             |                        |couldn't be converted to integer.   |
   +-------------+------------------------+------------------------------------+
   |          404|   vbucket_not_found    |Specified VBucket is not part of the|
   |             |                        |recovery map.                       |
   +-------------+------------------------+------------------------------------+
   |          404|      bad_recovery      |Either no recovery is in progress or|
   |             |                        |provided uuid does not match the    |
   |             |                        |uuid of running recovery.           |
   +-------------+------------------------+------------------------------------+
   |          500|      failed_nodes      |Could not commit vbucket because    |
   |             |                        |some nodes faileed. A list of failed|
   |             |                        |nodes can be found in the           |
   |             |                        |"failedNodes" field of the reply.   |
   +-------------+------------------------+------------------------------------+

 - recoveryStatus

   +-------------+---------------+------------------------------------+
   | HTTP Status |     Code      |              Comment               |
   |             |               |                                    |
   +-------------+---------------+------------------------------------+
   |          200|      ok       |Success. Recovery information is    |
   |             |               |returned in the same format as for  |
   |             |               |startRecovery.                      |
   +-------------+---------------+------------------------------------+
   |          400| uuid_missing  |recovery_uuid query parameter has   |
   |             |               |not been specified.                 |
   +-------------+---------------+------------------------------------+
   |          404| bad_recovery  |Either no recovery is in progress or|
   |             |               |provided uuid does not match the    |
   |             |               |uuid of running recovery.           |
   +-------------+---------------+------------------------------------+

Recovery map generation is very simplistic. It just distributes
missing vbuckets to the available nodes and tries to ensure that
nodes get about the same number of vbuckets. It's not always
possible though, because after failover we often have quite
unbalanced map. The resulting map is likely very unbalanced too. And
recovered vbuckets are not even replicated. So in a nutshell,
recovery is not a means of avoiding rebalance. It's suitable only
for recovering data. And rebalance will be needed anyway.

Change-Id: I84a80b3d5643133fea3f84add06dcc3c8eb81d7b
Reviewed-on: http://review.couchbase.org/25241
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information...
commit d43a372eeb17252312ff3f27c0f7b818c272e291 1 parent b74ed9f
@aartamonau aartamonau authored alk committed
View
193 CHANGES
@@ -2,7 +2,7 @@ This file documents user-visible changes in Couchbase clustering & UI.
======================================================================
-----------------------------------------
-Between versions 2.0.1 and 2.*
+Between versions 2.0.1 and 2.0.2
-----------------------------------------
* (MB-7574) Support for REST call /pools/default/stats is discontinued.
@@ -19,6 +19,197 @@ Between versions 2.0.1 and 2.*
Instead they are collected in ETS tables and saved to plain files
from time to time.
+* (CBD-816) Recovery mode support
+
+ When a membase (couchbase) bucket has some vbuckets missing it can
+ be put into a recovery mode using startRecovery REST call:
+
+ curl -sX POST -u Administrator:asdasd \
+ http://lh:9000/pools/default/buckets/default/controller/startRecovery
+
+ In case of success, the response looks as follows:
+
+ {
+ "code": "ok",
+ "recoveryMap": [
+ {
+ "node": "n_1@10.17.40.207",
+ "vbuckets": [
+ 33,
+ 34,
+ 35,
+ 36,
+ 37,
+ 38,
+ 39,
+ 40,
+ 41,
+ 42,
+ 54,
+ 55,
+ 56,
+ 57,
+ 58,
+ 59,
+ 60,
+ 61,
+ 62,
+ 63
+ ]
+ }
+ ],
+ "uuid": "8e02b3a84e0bbf58cbbb58919f1a6563"
+ }
+
+ So in this case replica vbuckets 33-42 and 54-63 were created on
+ node n_2@10.17.40.207. Now the client can start pushing data to
+ these vbuckets.
+
+ All the important recovery URIs are advertised via tasks:
+
+ curl -sX GET -u 'Administrator:asdasd' http://lh:9000/pools/default/tasks
+
+ [
+ {
+ "bucket": "default",
+ "commitVbucketURI": "/pools/default/buckets/default/controller/commitVBucket?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
+ "recommendedRefreshPeriod": 10.0,
+ "recoveryStatusURI": "/pools/default/buckets/default/recoveryStatus?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
+ "stopURI": "/pools/default/buckets/default/controller/stopRecovery?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563",
+ "type": "recovery",
+ "uuid": "8e02b3a84e0bbf58cbbb58919f1a6563"
+ },
+ {
+ "status": "notRunning",
+ "type": "rebalance"
+ }
+ ]
+
+ - stopURI can be used to abort the recovery
+ - recoveryStatusURI will return information about the recovery in the
+ same format as startRecovery
+ - commitVBucketURI will activate certain vbucket
+
+ This call should be used after the client is done with pushing
+ the data to it. VBucket is passed as a POST parameter:
+
+ curl -sX POST -u 'Administrator:asdasd' \
+ http://lh:9000/pools/default/buckets/default/controller/commitVBucket?recovery_uuid=8e02b3a84e0bbf58cbbb58919f1a6563 \
+ -d vbucket=33
+
+ {
+ "code": "ok"
+ }
+
+
+ All the recovery related REST calls return a JSON object having a
+ "code" field. This (together with HTTP status code) indicates if the
+ call was successful.
+
+ Here's a complete list of possible REST calls replies.
+
+ - startRecovery
+
+ +-------------+-------------------+------------------------------------+
+ | HTTP Status | Code | Comment |
+ | | | |
+ +-------------+-------------------+------------------------------------+
+ | 200| ok |Recovery started. Recovery map is |
+ | | |returned in recoveryMap field. |
+ +-------------+-------------------+------------------------------------+
+ | 400| unsupported |Not all nodes in the cluster support|
+ | | |recovery. |
+ +-------------+-------------------+------------------------------------+
+ | 400| not_needed |Recovery is not needed. |
+ +-------------+-------------------+------------------------------------+
+ | 404| not_present |Specified bucket not found. |
+ +-------------+-------------------+------------------------------------+
+ | 500| failed_nodes |Could not start recovery because |
+ | | |some nodes failed. A list of failed |
+ | | |nodes can be found in the |
+ | | |"failedNodes" field of the reply. |
+ +-------------+-------------------+------------------------------------+
+ | 503| rebalance_running |Could not start recovery because |
+ | | |rebalance is running. |
+ +-------------+-------------------+------------------------------------+
+
+ - stopRecovery
+
+ +-------------+---------------+------------------------------------+
+ | HTTP Status | Code | Comment |
+ | | | |
+ +-------------+---------------+------------------------------------+
+ | 200| ok |Recovery stopped successfully. |
+ +-------------+---------------+------------------------------------+
+ | 400| uuid_missing |recovery_uuid query parameter has |
+ | | |not been specified. |
+ +-------------+---------------+------------------------------------+
+ | 404| bad_recovery |Either no recovery is in progress or|
+ | | |provided uuid does not match the |
+ | | |uuid of running recovery. |
+ +-------------+---------------+------------------------------------+
+
+ - commitVBucket
+
+ +-------------+------------------------+------------------------------------+
+ | HTTP Status | Code | Comment |
+ | | | |
+ +-------------+------------------------+------------------------------------+
+ | 200| ok |VBucket commited successfully. |
+ +-------------+------------------------+------------------------------------+
+ | 200| recovery_completed |VBucket commited successfully. No |
+ | | |more vbuckets to recover. So the |
+ | | |cluster is not in recovery mode |
+ | | |anymore. |
+ +-------------+------------------------+------------------------------------+
+ | 400| uuid_missing |recovery_uuid query parameter has |
+ | | |not been specified. |
+ +-------------+------------------------+------------------------------------+
+ | 400| bad_or_missing_vbucket |VBucket is either unspecified or |
+ | | |couldn't be converted to integer. |
+ +-------------+------------------------+------------------------------------+
+ | 404| vbucket_not_found |Specified VBucket is not part of the|
+ | | |recovery map. |
+ +-------------+------------------------+------------------------------------+
+ | 404| bad_recovery |Either no recovery is in progress or|
+ | | |provided uuid does not match the |
+ | | |uuid of running recovery. |
+ +-------------+------------------------+------------------------------------+
+ | 500| failed_nodes |Could not commit vbucket because |
+ | | |some nodes faileed. A list of failed|
+ | | |nodes can be found in the |
+ | | |"failedNodes" field of the reply. |
+ +-------------+------------------------+------------------------------------+
+
+ - recoveryStatus
+
+ +-------------+---------------+------------------------------------+
+ | HTTP Status | Code | Comment |
+ | | | |
+ +-------------+---------------+------------------------------------+
+ | 200| ok |Success. Recovery information is |
+ | | |returned in the same format as for |
+ | | |startRecovery. |
+ +-------------+---------------+------------------------------------+
+ | 400| uuid_missing |recovery_uuid query parameter has |
+ | | |not been specified. |
+ +-------------+---------------+------------------------------------+
+ | 404| bad_recovery |Either no recovery is in progress or|
+ | | |provided uuid does not match the |
+ | | |uuid of running recovery. |
+ +-------------+---------------+------------------------------------+
+
+
+ Recovery map generation is very simplistic. It just distributes
+ missing vbuckets to the available nodes and tries to ensure that
+ nodes get about the same number of vbuckets. It's not always
+ possible though, because after failover we often have quite
+ unbalanced map. The resulting map is likely very unbalanced too. And
+ recovered vbuckets are not even replicated. So in a nutshell,
+ recovery is not a means of avoiding rebalance. It's suitable only
+ for recovering data. And rebalance will be needed anyway.
+
+
-----------------------------------------
Between versions 2.0.0 and 2.0.1
-----------------------------------------
View
18 src/auto_failover.erl
@@ -71,7 +71,10 @@
reported_max_reached=false :: boolean(),
%% Whether we reported that we could not auto failover because of
%% rebalance
- reported_rebalance_running=false :: boolean()
+ reported_rebalance_running=false :: boolean(),
+ %% Whether we reported that we could not auto failover because of
+ %% recovery mode
+ reported_in_recovery=false :: boolean()
}).
%%
@@ -270,6 +273,16 @@ handle_info(tick, State0) ->
note_reported(#state.reported_rebalance_running, S);
false ->
S
+ end;
+ in_recovery ->
+ case should_report(#state.reported_in_recovery, S) of
+ true ->
+ ?user_log(?EVENT_NODE_AUTO_FAILOVERED,
+ "Could not automatically fail over node (~p)."
+ "Cluster is in recovery mode.", [Node]),
+ note_reported(#state.reported_in_recovery, S);
+ false ->
+ S
end
end
end, State#state{auto_failover_logic_state = LogicState}, Actions),
@@ -349,7 +362,8 @@ should_report(Flag, State) ->
init_reported(State) ->
State#state{reported_autofailover_unsafe=false,
reported_max_reached=false,
- reported_rebalance_running=false}.
+ reported_rebalance_running=false,
+ reported_in_recovery=false}.
update_reported_flags_by_actions(Actions, State) ->
case lists:keymember(failover, 1, Actions) of
View
4 src/menelaus_event.erl
@@ -118,6 +118,10 @@ handle_event({rebalance_status, _}, State) ->
ok = notify_watchers(rebalance_status, State),
{ok, State};
+handle_event({recovery_status, _}, State) ->
+ ok = notify_watchers(recovery_status, State),
+ {ok, State};
+
handle_event({buckets, _}, State) ->
ok = notify_watchers(buckets, State),
{ok, State};
View
16 src/menelaus_web.erl
@@ -176,6 +176,9 @@ loop(Req, AppRoot, DocRoot) ->
["pools", PoolId, "buckets", Id, "stats", StatName] ->
{auth_bucket, fun menelaus_stats:handle_specific_stat_for_buckets/4,
[PoolId, Id, StatName]};
+ ["pools", PoolId, "buckets", Id, "recoveryStatus"] ->
+ {auth, fun menelaus_web_recovery:handle_recovery_status/3,
+ [PoolId, Id]};
["pools", "default", "remoteClusters"] ->
{auth, fun menelaus_web_remote_clusters:handle_remote_clusters/1};
["nodeStatuses"] ->
@@ -320,6 +323,12 @@ loop(Req, AppRoot, DocRoot) ->
["pools", PoolId, "buckets", Id, "controller", "cancelDatabasesCompaction"] ->
{auth_check_bucket_uuid,
fun menelaus_web_buckets:handle_cancel_databases_compaction/3, [PoolId, Id]};
+ ["pools", PoolId, "buckets", Id, "controller", "startRecovery"] ->
+ {auth, fun menelaus_web_recovery:handle_start_recovery/3, [PoolId, Id]};
+ ["pools", PoolId, "buckets", Id, "controller", "stopRecovery"] ->
+ {auth, fun menelaus_web_recovery:handle_stop_recovery/3, [PoolId, Id]};
+ ["pools", PoolId, "buckets", Id, "controller", "commitVBucket"] ->
+ {auth, fun menelaus_web_recovery:handle_commit_vbucket/3, [PoolId, Id]};
["pools", PoolId, "buckets", Id,
"ddocs", DDocId, "controller", "compactView"] ->
{auth_check_bucket_uuid,
@@ -1944,7 +1953,10 @@ handle_failover(Req) ->
ok ->
Req:respond({200, [], []});
rebalance_running ->
- Req:respond({503, add_header(), "Rebalance running."})
+ Req:respond({503, add_header(), "Rebalance running."});
+ in_recovery ->
+ Req:respond({503, add_header(),
+ "Cluster is in recovery mode."})
end
end.
@@ -1984,6 +1996,8 @@ do_handle_rebalance(Req, KnownNodesS, EjectedNodesS) ->
reply_json(Req, {struct, [{mismatch, 1}]}, 400);
no_active_nodes_left ->
Req:respond({400, [], []});
+ in_recovery ->
+ Req:respond({503, [], "Cluster is in recovery mode."});
ok ->
Req:respond({200, [], []})
end.
View
12 src/menelaus_web_buckets.erl
@@ -243,7 +243,9 @@ build_bucket_info(PoolId, Id, BucketConfig, InfoLevel, LocalAddr) ->
{compactDB, bin_concat_path(["pools", PoolId,
"buckets", Id, "controller", "compactDatabases"])},
{purgeDeletes, bin_concat_path(["pools", PoolId,
- "buckets", Id, "controller", "unsafePurgeBucket"])}]},
+ "buckets", Id, "controller", "unsafePurgeBucket"])},
+ {startRecovery, bin_concat_path(["pools", PoolId,
+ "buckets", Id, "controller", "startRecovery"])}]},
{nodes, Nodes},
{stats, {struct, [{uri, StatsUri},
{directoryURI, StatsDirectoryUri},
@@ -344,6 +346,8 @@ handle_bucket_delete(_PoolId, BucketId, Req) ->
Req:respond({200, server_header(), []});
rebalance_running ->
reply_json(Req, {struct, [{'_', <<"Cannot delete buckets during rebalance.\r\n">>}]}, 503);
+ in_recovery ->
+ reply_json(Req, {struct, [{'_', <<"Cannot delete buckets when cluster is in recovery mode.\r\n">>}]}, 503);
{shutdown_failed, _} ->
reply_json(Req, {struct, [{'_', <<"Bucket deletion not yet complete, but will continue.\r\n">>}]}, 500);
{exit, {not_found, _}, _} ->
@@ -427,7 +431,9 @@ do_bucket_create(Name, ParsedProps) ->
{error, {invalid_name, _}} ->
{errors, [{name, <<"Name is invalid.">>}]};
rebalance_running ->
- {errors_500, [{'_', <<"Cannot create buckets during rebalance">>}]}
+ {errors_500, [{'_', <<"Cannot create buckets during rebalance">>}]};
+ in_recovery ->
+ {errors_500, [{'_', <<"Cannot create buckets when cluster is in recovery mode">>}]}
end.
handle_bucket_create(PoolId, Req) ->
@@ -501,6 +507,8 @@ do_handle_bucket_flush(Id, Req) ->
Req:respond({200, server_header(), []});
rebalance_running ->
reply_json(Req, {struct, [{'_', <<"Cannot flush buckets during rebalance">>}]}, 503);
+ in_recovery ->
+ reply_json(Req, {struct, [{'_', <<"Cannot flush buckets when cluster is in recovery mode">>}]}, 503);
bucket_not_found ->
Req:respond({404, server_header(), []});
flush_disabled ->
View
139 src/menelaus_web_recovery.erl
@@ -0,0 +1,139 @@
+%% @author Couchbase <info@couchbase.com>
+%% @copyright 2013 Couchbase, Inc.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(menelaus_web_recovery).
+
+-import(menelaus_util,
+ [reply_json/2,
+ reply_json/3]).
+
+-export([handle_start_recovery/3,
+ handle_recovery_status/3,
+ handle_stop_recovery/3,
+ handle_commit_vbucket/3]).
+
+handle_start_recovery(_PooldId, Bucket, Req) ->
+ case ns_orchestrator:start_recovery(Bucket) of
+ {ok, UUID, RecoveryMap} ->
+ reply_json(Req, build_start_recovery_reply(UUID, RecoveryMap));
+ Error ->
+ reply_common(Req, Error)
+ end.
+
+handle_recovery_status(_PoolId, Bucket, Req) ->
+ UUID = proplists:get_value("recovery_uuid", Req:parse_qs()),
+
+ case UUID of
+ undefined ->
+ reply_common(Req, uuid_missing);
+ _ ->
+ UUIDBinary = list_to_binary(UUID),
+ case ns_orchestrator:recovery_map(Bucket, UUIDBinary) of
+ {ok, Map} ->
+ reply_json(Req, build_start_recovery_reply(UUIDBinary, Map));
+ Error ->
+ reply_common(Req, Error)
+ end
+ end.
+
+handle_stop_recovery(_PoolId, Bucket, Req) ->
+ UUID = proplists:get_value("recovery_uuid", Req:parse_qs()),
+
+ Reply =
+ case UUID of
+ undefined ->
+ uuid_missing;
+ _ ->
+ UUIDBinary = list_to_binary(UUID),
+ ns_orchestrator:stop_recovery(Bucket, UUIDBinary)
+ end,
+
+ reply_common(Req, Reply).
+
+handle_commit_vbucket(_PoolId, Bucket, Req) ->
+ UUID = proplists:get_value("recovery_uuid", Req:parse_qs()),
+ VBucket = proplists:get_value("vbucket", Req:parse_post()),
+
+ Reply =
+ case UUID of
+ undefined ->
+ uuid_missing;
+ _ ->
+ UUIDBinary = list_to_binary(UUID),
+ try list_to_integer(VBucket) of
+ V when is_integer(V) ->
+ ns_orchestrator:commit_vbucket(Bucket, UUIDBinary, V)
+ catch
+ error:badarg ->
+ bad_or_missing_vbucket
+ end
+ end,
+
+ reply_common(Req, Reply).
+
+%% internal
+build_start_recovery_reply(UUID, RecoveryMap) ->
+ {struct, [{uuid, UUID},
+ {code, ok},
+ {recoveryMap, build_recovery_map_json(RecoveryMap)}]}.
+
+build_recovery_map_json(RecoveryMap) ->
+ dict:fold(
+ fun (Node, VBuckets, Acc) ->
+ JSON = {struct, [{node, Node},
+ {vbuckets, VBuckets}]},
+ [JSON | Acc]
+ end, [], RecoveryMap).
+
+reply_common(Req, Reply0) ->
+ Reply = extract_reply(Reply0),
+ Status = reply_status_code(Reply),
+ Code = reply_code(Reply),
+ Extra = build_common_reply_extra(Reply),
+
+ JSON = {struct, [{code, Code} | Extra]},
+ reply_json(Req, JSON, Status).
+
+build_common_reply_extra({failed_nodes, Nodes}) ->
+ [{failedNodes, Nodes}];
+build_common_reply_extra(Reply) when is_atom(Reply) ->
+ [].
+
+extract_reply({error, Error}) ->
+ Error;
+extract_reply(Reply) ->
+ Reply.
+
+reply_code({failed_nodes, _}) ->
+ failed_nodes;
+reply_code(Reply) when is_atom(Reply) ->
+ Reply.
+
+reply_status_code(ok) ->
+ 200;
+reply_status_code(recovery_completed) ->
+ 200;
+reply_status_code(not_present) ->
+ 404;
+reply_status_code(bad_recovery) ->
+ 404;
+reply_status_code(vbucket_not_found) ->
+ 404;
+reply_status_code({failed_nodes, _}) ->
+ 500;
+reply_status_code(rebalance_running) ->
+ 503;
+reply_status_code(Error) when is_atom(Error) ->
+ 400.
View
82 src/ns_doctor.erl
@@ -73,14 +73,38 @@ handle_rebalance_status_change(_, running) ->
handle_rebalance_status_change(_, State) ->
{State, false}.
-handle_config_event({rebalance_status, NewValue}, State) ->
- {NewState, Changed} = handle_rebalance_status_change(NewValue, State),
+handle_recovery_status_change(not_running, {running, _Bucket, _UUID}) ->
+ {not_running, true};
+handle_recovery_status_change({running, _NewBucket, NewUUID} = New,
+ {running, _OldBucket, OldUUID}) ->
+ case OldUUID =:= NewUUID of
+ true ->
+ {New, false};
+ false ->
+ {New, true}
+ end;
+handle_recovery_status_change({running, _NewBucket, _NewUUID} = New, not_running) ->
+ {New, true};
+handle_recovery_status_change(not_running, not_running) ->
+ {not_running, false}.
+
+handle_config_event({rebalance_status, NewValue}, {RebalanceState, RecoveryState}) ->
+ {NewState, Changed} = handle_rebalance_status_change(NewValue, RebalanceState),
case Changed of
true ->
- ns_doctor ! rebalance_status_changed;
+ ns_doctor ! significant_change;
_ -> ok
end,
- NewState;
+ {NewState, RecoveryState};
+handle_config_event({recovery_status, NewValue}, {RebalanceState, RecoveryState}) ->
+ {NewState, Changed} = handle_recovery_status_change(NewValue, RecoveryState),
+ case Changed of
+ true ->
+ ns_doctor ! significant_change;
+ false ->
+ ok
+ end,
+ {RebalanceState, NewState};
handle_config_event(_, State) ->
State.
@@ -126,7 +150,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
-handle_info(rebalance_status_changed, State) ->
+handle_info(significant_change, State) ->
%% force hash recomputation next time maybe_refresh_tasks_version is called
{noreply, State#state{tasks_hash_nodes = undefined}};
handle_info(acquire_initial_status, #state{nodes=NodeDict} = State) ->
@@ -273,17 +297,18 @@ maybe_refresh_tasks_version(State) ->
end
end, Set, proplists:get_value(local_tasks, NodeInfo, []))
end, sets:new(), Nodes),
- TasksAndRebalanceHash = erlang:phash2({erlang:phash2(TasksHashesSet),
- ns_orchestrator:is_rebalance_running()}),
- case TasksAndRebalanceHash =:= State#state.tasks_hash of
+ TasksRebalanceAndRecoveryHash = erlang:phash2({erlang:phash2(TasksHashesSet),
+ ns_orchestrator:is_rebalance_running(),
+ ns_orchestrator:is_recovery_running()}),
+ case TasksRebalanceAndRecoveryHash =:= State#state.tasks_hash of
true ->
%% hash did not change, only nodes. Cool
State#state{tasks_hash_nodes = Nodes};
_ ->
%% hash changed. Generate new version
State#state{tasks_hash_nodes = Nodes,
- tasks_hash = TasksAndRebalanceHash,
- tasks_version = integer_to_list(TasksAndRebalanceHash)}
+ tasks_hash = TasksRebalanceAndRecoveryHash,
+ tasks_version = integer_to_list(TasksRebalanceAndRecoveryHash)}
end.
task_operation(extract, Indexer, RawTask)
@@ -548,7 +573,9 @@ do_build_tasks_list(NodesDict, NeedNodeP, PoolId, AllRepDocs) ->
end]
end,
RebalanceTask = {struct, RebalanceTask0},
- [RebalanceTask | PreRebalanceTasks].
+ MaybeRecoveryTask = build_recovery_task(PoolId),
+
+ MaybeRecoveryTask ++ [RebalanceTask | PreRebalanceTasks].
task_priority(Task) ->
Type = proplists:get_value(type, Task),
@@ -583,3 +610,36 @@ get_node(Node, NodeStatuses) ->
{ok, Info} -> Info;
error -> [down]
end.
+
+build_recovery_task(PoolId) ->
+ case ns_orchestrator:recovery_status() of
+ not_in_recovery ->
+ [];
+ {ok, Status} ->
+ Bucket = proplists:get_value(bucket, Status),
+ RecoveryUUID = proplists:get_value(uuid, Status),
+ true = (Bucket =/= undefined),
+ true = (RecoveryUUID =/= undefined),
+
+ StopURI = menelaus_util:bin_concat_path(
+ ["pools", PoolId, "buckets", Bucket,
+ "controller", "stopRecovery"],
+ [{recovery_uuid, RecoveryUUID}]),
+ CommitURI = menelaus_util:bin_concat_path(
+ ["pools", PoolId, "buckets", Bucket,
+ "controller", "commitVBucket"],
+ [{recovery_uuid, RecoveryUUID}]),
+ RecoveryStatusURI =
+ menelaus_util:bin_concat_path(
+ ["pools", PoolId, "buckets", Bucket, "recoveryStatus"],
+ [{recovery_uuid, RecoveryUUID}]),
+
+ [[{type, recovery},
+ {bucket, list_to_binary(Bucket)},
+ {uuid, RecoveryUUID},
+ {recommendedRefreshPeriod, 10.0},
+
+ {stopURI, StopURI},
+ {commitVBucketURI, CommitURI},
+ {recoveryStatusURI, RecoveryStatusURI}]]
+ end.
View
304 src/ns_orchestrator.erl
@@ -28,6 +28,9 @@
-record(janitor_state, {remaining_buckets, pid}).
-record(rebalancing_state, {rebalancer, progress,
keep_nodes, eject_nodes, failed_nodes}).
+-record(recovery_state, {uuid :: binary(),
+ bucket :: bucket_name(),
+ recoverer_state :: any()}).
%% API
@@ -45,8 +48,13 @@
start_rebalance/2,
stop_rebalance/0,
update_progress/1,
- is_rebalance_running/0
- ]).
+ is_rebalance_running/0,
+ start_recovery/1,
+ stop_recovery/2,
+ commit_vbucket/3,
+ recovery_status/0,
+ recovery_map/2,
+ is_recovery_running/0]).
-define(SERVER, {global, ?MODULE}).
@@ -73,7 +81,8 @@
%% States
-export([idle/2, idle/3,
janitor_running/2, janitor_running/3,
- rebalancing/2, rebalancing/3]).
+ rebalancing/2, rebalancing/3,
+ recovery/2, recovery/3]).
%%
@@ -92,7 +101,7 @@ wait_for_orchestrator() ->
{error, {still_exists, nonempty_string()}} |
{error, {port_conflict, integer()}} |
{error, {invalid_name, nonempty_string()}} |
- rebalance_running.
+ rebalance_running | in_recovery.
create_bucket(BucketType, BucketName, NewConfig) ->
wait_for_orchestrator(),
gen_fsm:sync_send_event(?SERVER, {create_bucket, BucketType, BucketName,
@@ -108,7 +117,8 @@ create_bucket(BucketType, BucketName, NewConfig) ->
%% rebalance_running if delete bucket request came while rebalancing;
%% and {exit, ...} if bucket does not really exists
-spec delete_bucket(bucket_name()) ->
- ok | rebalance_running | {shutdown_failed, [node()]} | {exit, {not_found, bucket_name()}, _}.
+ ok | rebalance_running | in_recovery |
+ {shutdown_failed, [node()]} | {exit, {not_found, bucket_name()}, _}.
delete_bucket(BucketName) ->
wait_for_orchestrator(),
gen_fsm:sync_send_event(?SERVER, {delete_bucket, BucketName}, infinity).
@@ -116,6 +126,7 @@ delete_bucket(BucketName) ->
-spec flush_bucket(bucket_name()) ->
ok |
rebalance_running |
+ in_recovery |
bucket_not_found |
flush_disabled |
not_supported | % if we're in 1.8.x compat mode and trying to flush couchbase bucket
@@ -129,13 +140,13 @@ flush_bucket(BucketName) ->
gen_fsm:sync_send_event(?SERVER, {flush_bucket, BucketName}, infinity).
--spec failover(atom()) -> ok | rebalance_running.
+-spec failover(atom()) -> ok | rebalance_running | in_recovery.
failover(Node) ->
wait_for_orchestrator(),
gen_fsm:sync_send_event(?SERVER, {failover, Node}, infinity).
--spec try_autofailover(atom()) -> ok | rebalance_running |
+-spec try_autofailover(atom()) -> ok | rebalance_running | in_recovery |
{autofailover_unsafe, [bucket_name()]}.
try_autofailover(Node) ->
wait_for_orchestrator(),
@@ -178,7 +189,8 @@ start_rebalance_old_style(KeepNodes, EjectedNodes, FailedNodes) ->
-spec start_rebalance([node()], [node()]) ->
- ok | in_progress | already_balanced | nodes_mismatch | no_active_nodes_left.
+ ok | in_progress | already_balanced |
+ nodes_mismatch | no_active_nodes_left | in_recovery.
start_rebalance(KnownNodes, EjectNodes) ->
wait_for_orchestrator(),
gen_fsm:sync_send_all_state_event(?SERVER, {maybe_start_rebalance, KnownNodes, EjectNodes}).
@@ -190,6 +202,59 @@ stop_rebalance() ->
gen_fsm:sync_send_event(?SERVER, stop_rebalance).
+-spec start_recovery(bucket_name()) ->
+ {ok, UUID, RecoveryMap} |
+ unsupported |
+ rebalance_running |
+ not_present |
+ not_needed |
+ {error, {failed_nodes, [node()]}}
+ when UUID :: binary(),
+ RecoveryMap :: dict().
+start_recovery(Bucket) ->
+ wait_for_orchestrator(),
+ gen_fsm:sync_send_event(?SERVER, {start_recovery, Bucket}).
+
+-spec recovery_status() -> not_in_recovery | {ok, Status}
+ when Status :: [{bucket, bucket_name()} |
+ {uuid, binary()} |
+ {recovery_map, RecoveryMap}],
+ RecoveryMap :: dict().
+recovery_status() ->
+ wait_for_orchestrator(),
+ gen_fsm:sync_send_all_state_event(?SERVER, recovery_status).
+
+-spec recovery_map(bucket_name(), UUID) -> bad_recovery | {ok, RecoveryMap}
+ when RecoveryMap :: dict(),
+ UUID :: binary().
+recovery_map(Bucket, UUID) ->
+ wait_for_orchestrator(),
+ gen_fsm:sync_send_all_state_event(?SERVER, {recovery_map, Bucket, UUID}).
+
+-spec commit_vbucket(bucket_name(), UUID, vbucket_id()) ->
+ ok | recovery_completed |
+ vbucket_not_found | bad_recovery |
+ {error, {failed_nodes, [node()]}}
+ when UUID :: binary().
+commit_vbucket(Bucket, UUID, VBucket) ->
+ wait_for_orchestrator(),
+ gen_fsm:sync_send_all_state_event(?SERVER, {commit_vbucket, Bucket, UUID, VBucket}).
+
+-spec stop_recovery(bucket_name(), UUID) -> ok | bad_recovery
+ when UUID :: binary().
+stop_recovery(Bucket, UUID) ->
+ wait_for_orchestrator(),
+ gen_fsm:sync_send_all_state_event(?SERVER, {stop_recovery, Bucket, UUID}).
+
+-spec is_recovery_running() -> boolean().
+is_recovery_running() ->
+ case ns_config:search(recovery_status) of
+ {value, {running, _Bucket, _UUID}} ->
+ true;
+ _ ->
+ false
+ end.
+
%%
%% gen_fsm callbacks
%%
@@ -212,6 +277,7 @@ init([]) ->
%% There's no need to restart us here. So if we've changed compat mode in init suppress exit
ok
end,
+
{ok, idle, #idle_state{}}.
@@ -246,6 +312,35 @@ handle_sync_event({maybe_start_rebalance, KnownNodes, EjectedNodes},
{reply, nodes_mismatch, StateName, State}
end;
+handle_sync_event(recovery_status, From, StateName, State) ->
+ case StateName of
+ recovery ->
+ ?MODULE:recovery(recovery_status, From, State);
+ _ ->
+ {reply, not_in_recovery, StateName, State}
+ end;
+handle_sync_event(Msg, From, StateName, State)
+ when element(1, Msg) =:= recovery_map;
+ element(1, Msg) =:= commit_vbucket;
+ element(1, Msg) =:= stop_recovery ->
+ case StateName of
+ recovery ->
+ Bucket = element(2, Msg),
+ UUID = element(3, Msg),
+
+ #recovery_state{bucket=BucketInRecovery,
+ uuid=RecoveryUUID} = State,
+
+ case Bucket =:= BucketInRecovery andalso UUID =:= RecoveryUUID of
+ true ->
+ ?MODULE:recovery(Msg, From, State);
+ false ->
+ {reply, bad_recovery, recovery, State}
+ end;
+ _ ->
+ {reply, bad_recovery, StateName, State}
+ end;
+
handle_sync_event(Event, _From, StateName, State) ->
{stop, {unhandled, Event, StateName}, State}.
@@ -260,6 +355,7 @@ handle_info(janitor, idle, #idle_state{remaining_buckets=[]} = State) ->
end;
handle_info(janitor, idle, #idle_state{remaining_buckets=Buckets}) ->
misc:verify_name(?MODULE), % MB-3180: Make sure we're still registered
+ maybe_drop_recovery_status(),
Bucket = hd(Buckets),
Pid = proc_lib:spawn_link(ns_janitor, cleanup, [Bucket, [consider_stopping_rebalance_status]]),
%% NOTE: Bucket will be popped from Buckets when janitor run will
@@ -519,7 +615,91 @@ idle(stop_rebalance, _From, State) ->
"requested but rebalance isn't orchestrated on our node"),
none
end),
- {reply, not_rebalancing, idle, State}.
+ {reply, not_rebalancing, idle, State};
+idle({start_recovery, Bucket}, _From, State) ->
+ try
+
+ case cluster_compat_mode:is_cluster_20() of
+ true ->
+ ok;
+ false ->
+ throw(unsupported)
+ end,
+
+ BucketConfig0 = case ns_bucket:get_bucket(Bucket) of
+ {ok, V} ->
+ V;
+ Error0 ->
+ throw(Error0)
+ end,
+
+ case ns_bucket:bucket_type(BucketConfig0) of
+ membase ->
+ ok;
+ _ ->
+ throw(not_needed)
+ end,
+
+ Servers = ns_node_disco:nodes_wanted(),
+ BucketConfig = misc:update_proplist(BucketConfig0, [{servers, Servers}]),
+ ns_cluster_membership:activate(Servers),
+ ns_config:sync_announcements(),
+ case ns_config_rep:synchronize_remote(Servers) of
+ ok ->
+ ok;
+ {error, BadNodes} ->
+ ?log_error("Failed to syncrhonize config to some nodes: ~p", [BadNodes]),
+ throw({error, {failed_nodes, BadNodes}})
+ end,
+
+ case ns_rebalancer:maybe_cleanup_old_buckets(Servers) of
+ ok ->
+ ok;
+ {buckets_cleanup_failed, FailedNodes0} ->
+ throw({error, {failed_nodes, FailedNodes0}})
+ end,
+
+ ns_bucket:set_servers(Bucket, Servers),
+
+ case ns_janitor:cleanup(Bucket, [{timeout, 10}]) of
+ ok ->
+ ok;
+ {error, wait_for_memcached_failed, FailedNodes1} ->
+ error({error, {failed_nodes, FailedNodes1}})
+ end,
+
+ {ok, RecoveryMap, {NewServers, NewBucketConfig}, RecovererState} =
+ case recoverer:start_recovery(BucketConfig) of
+ {ok, _, _, _} = R ->
+ R;
+ Error1 ->
+ throw(Error1)
+ end,
+
+ true = (Servers =:= NewServers),
+
+ RV = apply_recoverer_bucket_config(Bucket, NewBucketConfig, NewServers),
+ case RV of
+ ok ->
+ RecoveryUUID = couch_uuids:random(),
+ NewState =
+ #recovery_state{bucket=Bucket,
+ uuid=RecoveryUUID,
+ recoverer_state=RecovererState},
+
+ ensure_recovery_status(Bucket, RecoveryUUID),
+
+ ale:info(?USER_LOGGER, "Put bucket `~s` into recovery mode", [Bucket]),
+
+ {reply, {ok, RecoveryUUID, RecoveryMap}, recovery, NewState};
+ Error2 ->
+ throw(Error2)
+ end
+
+ catch
+ throw:E ->
+ {reply, E, idle, State}
+ end.
janitor_running(rebalance_progress, _From, State) ->
@@ -560,6 +740,85 @@ rebalancing(Event, _From, State) ->
?log_warning("Got event ~p while rebalancing.", [Event]),
{reply, rebalance_running, rebalancing, State}.
+recovery(Event, State) ->
+ ?log_warning("Got unexpected event: ~p", [Event]),
+ {next_state, recovery_running, State}.
+
+recovery({start_recovery, Bucket}, _From,
+ #recovery_state{bucket=BucketInRecovery,
+ uuid=RecoveryUUID,
+ recoverer_state=RState} = State) ->
+ case Bucket =:= BucketInRecovery of
+ true ->
+ RecoveryMap = recoverer:get_recovery_map(RState),
+ {reply, {ok, RecoveryUUID, RecoveryMap}, recovery, State};
+ false ->
+ {reply, recovery_running, recovery, State}
+ end;
+
+recovery({commit_vbucket, Bucket, UUID, VBucket}, _From,
+ #recovery_state{recoverer_state=RState} = State) ->
+ Bucket = State#recovery_state.bucket,
+ UUID = State#recovery_state.uuid,
+
+ case recoverer:commit_vbucket(VBucket, RState) of
+ {ok, {Servers, NewBucketConfig}, RState1} ->
+ RV = apply_recoverer_bucket_config(Bucket, NewBucketConfig, Servers),
+ case RV of
+ ok ->
+ {ok, Map, RState2} = recoverer:note_commit_vbucket_done(VBucket, RState1),
+ ns_bucket:set_map(Bucket, Map),
+ case recoverer:is_recovery_complete(RState2) of
+ true ->
+ ale:info(?USER_LOGGER, "Recovery of bucket `~s` completed", [Bucket]),
+ {reply, recovery_completed, idle, #idle_state{}};
+ false ->
+ ?log_debug("Committed vbucket ~b (recovery of `~s`)", [VBucket, Bucket]),
+ {reply, ok, recovery,
+ State#recovery_state{recoverer_state=RState2}}
+ end;
+ Error ->
+ {reply, Error, recovery,
+ State#recovery_state{recoverer_state=RState1}}
+ end;
+ Error ->
+ {reply, Error, recovery, State}
+ end;
+
+recovery({stop_recovery, Bucket, UUID}, _From, State) ->
+ Bucket = State#recovery_state.bucket,
+ UUID = State#recovery_state.uuid,
+
+ ns_config:set(recovery_status, not_running),
+
+ ale:info(?USER_LOGGER, "Recovery of bucket `~s` aborted", [Bucket]),
+
+ {reply, ok, idle, #idle_state{}};
+
+recovery(recovery_status, _From,
+ #recovery_state{uuid=RecoveryUUID,
+ bucket=Bucket,
+ recoverer_state=RState} = State) ->
+ RecoveryMap = recoverer:get_recovery_map(RState),
+
+ Status = [{bucket, Bucket},
+ {uuid, RecoveryUUID},
+ {recovery_map, RecoveryMap}],
+
+ {reply, {ok, Status}, recovery, State};
+recovery({recovery_map, Bucket, RecoveryUUID}, _From,
+ #recovery_state{uuid=RecoveryUUID,
+ bucket=Bucket,
+ recoverer_state=RState} = State) ->
+ RecoveryMap = recoverer:get_recovery_map(RState),
+ {reply, {ok, RecoveryMap}, recovery, State};
+
+recovery(rebalance_progress, _From, State) ->
+ {reply, not_running, recovery, State};
+recovery(stop_rebalance, _From, State) ->
+ {reply, not_rebalancing, recovery, State};
+recovery(_Event, _From, State) ->
+ {reply, in_recovery, recovery, State}.
%%
@@ -768,3 +1027,30 @@ do_flush_old_style(BucketName, BucketConfig) ->
{old_style_flush_failed, Results, BadNodes}
end.
+apply_recoverer_bucket_config(Bucket, BucketConfig, Servers) ->
+ {ok, _, Zombies} = janitor_agent:query_states(Bucket, Servers, 1),
+ case Zombies of
+ [] ->
+ janitor_agent:apply_new_bucket_config_new_style(
+ Bucket, Servers, [], BucketConfig, []);
+ _ ->
+ ?log_error("Failed to query states from some of the nodes: ~p", [Zombies]),
+ {error, {failed_nodes, Zombies}}
+ end.
+
+maybe_drop_recovery_status() ->
+ ns_config:update(
+ fun ({recovery_status, Value} = P) ->
+ case Value of
+ not_running ->
+ P;
+ {running, _Bucket, _UUID} ->
+ ?log_warning("Dropping stale recovery status ~p", [P]),
+ {recovery_status, not_running}
+ end;
+ (Other) ->
+ Other
+ end, make_ref()).
+
+ensure_recovery_status(Bucket, UUID) ->
+ ns_config:set(recovery_status, {running, Bucket, UUID}).
View
212 src/recoverer.erl
@@ -0,0 +1,212 @@
+%% @author Couchbase <info@couchbase.com>
+%% @copyright 2013 Couchbase, Inc.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(recoverer).
+
+-include("ns_common.hrl").
+
+-export([start_recovery/1,
+ get_recovery_map/1, commit_vbucket/2, note_commit_vbucket_done/2,
+ is_recovery_complete/1]).
+
+-record(state, {bucket_config :: list(),
+ recovery_map :: dict(),
+ post_recovery_chains :: dict(),
+ apply_map :: array(),
+ effective_map :: array()}).
+
+-spec start_recovery(BucketConfig) ->
+ {ok, RecoveryMap, {Servers, BucketConfig}, #state{}}
+ | not_needed
+ when BucketConfig :: list(),
+ RecoveryMap :: dict(),
+ Servers :: [node()].
+start_recovery(BucketConfig) ->
+ NumVBuckets = proplists:get_value(num_vbuckets, BucketConfig),
+ true = is_integer(NumVBuckets),
+
+ NumReplicas = ns_bucket:num_replicas(BucketConfig),
+
+ OldMap =
+ case proplists:get_value(map, BucketConfig) of
+ undefined ->
+ lists:duplicate(NumVBuckets, lists:duplicate(NumReplicas + 1, undefined));
+ V ->
+ V
+ end,
+ Servers = proplists:get_value(servers, BucketConfig),
+ true = (Servers =/= undefined),
+
+ MissingVBuckets =
+ lists:foldr(
+ fun ({V, Chain}, Acc) ->
+ case Chain of
+ [undefined | _] ->
+ [V | Acc];
+ _ ->
+ Acc
+ end
+ end, [], misc:enumerate(OldMap, 0)),
+
+ case MissingVBuckets of
+ [] ->
+ not_needed;
+ _ ->
+ RecoveryMap0 = compute_recovery_map(OldMap, Servers, NumVBuckets, MissingVBuckets),
+ RecoveryMap = dict:from_list(RecoveryMap0),
+
+ PostRecoveryChains =
+ lists:foldl(
+ fun ({Node, VBuckets}, Acc) ->
+ lists:foldl(
+ fun (VBucket, Acc1) ->
+ Chain = [Node |
+ lists:duplicate(NumReplicas, undefined)],
+ dict:store(VBucket, Chain, Acc1)
+ end, Acc, VBuckets)
+ end, dict:new(), RecoveryMap0),
+
+ ApplyMap =
+ lists:foldr(
+ fun ({V, [undefined | _]}, Acc) ->
+ [Owner | _] = dict:fetch(V, PostRecoveryChains),
+ %% this is fake chain just to create replica
+ Chain = [undefined, Owner],
+ [Chain | Acc];
+ ({_V, Chain}, Acc) ->
+ [Chain | Acc]
+ end, [], misc:enumerate(OldMap, 0)),
+
+ NewBucketConfig = misc:update_proplist(BucketConfig,
+ [{map, ApplyMap}]),
+
+ {ok, RecoveryMap, {Servers, NewBucketConfig},
+ #state{bucket_config=BucketConfig,
+ recovery_map=RecoveryMap,
+ post_recovery_chains=PostRecoveryChains,
+ apply_map=array:from_list(ApplyMap),
+ effective_map=array:from_list(OldMap)}}
+ end.
+
+-spec get_recovery_map(#state{}) -> dict().
+get_recovery_map(#state{recovery_map=RecoveryMap}) ->
+ true = (RecoveryMap =/= undefined),
+ RecoveryMap.
+
+-spec commit_vbucket(vbucket_id(), #state{}) ->
+ {ok, {Servers, BucketConfig}, #state{}}
+ | vbucket_not_found
+ when Servers :: [node()],
+ BucketConfig :: list().
+commit_vbucket(VBucket,
+ #state{bucket_config=BucketConfig,
+ post_recovery_chains=Chains,
+ apply_map=ApplyMap} = State) ->
+ case dict:find(VBucket, Chains) of
+ {ok, Chain} ->
+ ApplyMap1 = array:to_list(array:set(VBucket, Chain, ApplyMap)),
+ NewBucketConfig = misc:update_proplist(BucketConfig,
+ [{map, ApplyMap1}]),
+ Servers = [S || S <- Chain, S =/= undefined],
+ {ok, {Servers, NewBucketConfig}, State};
+ error ->
+ vbucket_not_found
+ end.
+
+-spec note_commit_vbucket_done(vbucket_id(), #state{}) ->
+ {ok, VBucketMap, #state{}}
+ when VBucketMap :: list().
+note_commit_vbucket_done(VBucket,
+ #state{post_recovery_chains=Chains,
+ recovery_map=RecoveryMap,
+ apply_map=ApplyMap,
+ effective_map=EffectiveMap} = State) ->
+ [Node | _] = Chain = dict:fetch(VBucket, Chains),
+ VBuckets = dict:fetch(Node, RecoveryMap),
+ VBuckets1 = lists:delete(VBucket, VBuckets),
+
+ RecoveryMap1 =
+ case VBuckets1 of
+ [] ->
+ dict:erase(Node, RecoveryMap);
+ _ ->
+ dict:store(Node, VBuckets1, RecoveryMap)
+ end,
+ Chains1 = dict:erase(VBucket, Chains),
+ ApplyMap1 = array:set(VBucket, Chain, ApplyMap),
+ EffectiveMap1 = array:set(VBucket, Chain, EffectiveMap),
+
+ {ok, array:to_list(EffectiveMap1),
+ State#state{recovery_map=RecoveryMap1,
+ post_recovery_chains=Chains1,
+ apply_map=ApplyMap1,
+ effective_map=EffectiveMap1}}.
+
+-spec is_recovery_complete(#state{}) -> boolean().
+is_recovery_complete(#state{post_recovery_chains=Chains}) ->
+ true = (Chains =/= undefined),
+ dict:size(Chains) =:= 0.
+
+
+%% internal
+compute_recovery_map(OldMap, Servers, NumVBuckets, MissingVBuckets) ->
+ {PresentVBucketsCount, NodeToVBucketCountsDict} =
+ lists:foldl(
+ fun ([undefined | _], Acc) ->
+ Acc;
+ ([Node | _], {AccTotal, AccDict}) ->
+ {AccTotal + 1, dict:update(Node, fun (C) -> C + 1 end, AccDict)}
+ end,
+ {0, dict:from_list([{N, 0} || N <- Servers])}, OldMap),
+ NodeToVBucketCounts = lists:keysort(2, dict:to_list(NodeToVBucketCountsDict)),
+
+ true = (NumVBuckets == (length(MissingVBuckets) + PresentVBucketsCount)),
+
+ NodesCount = length(Servers),
+
+ {OverloadedNodesVBucketsCount, UnderloadedNodeToVBucketCounts} =
+ lists:foldr(
+ fun ({_Node, VBucketsCount} = Pair, {AccCount, Acc}) ->
+ case NodesCount * VBucketsCount >= NumVBuckets of
+ true ->
+ {AccCount + VBucketsCount, Acc};
+ false ->
+ {AccCount, [Pair | Acc]}
+ end
+ end, {0, []}, NodeToVBucketCounts),
+
+ UnderloadedNodesVBucketsCount = NumVBuckets - OverloadedNodesVBucketsCount,
+ UnderloadedNodesCount = length(UnderloadedNodeToVBucketCounts),
+
+ Q = UnderloadedNodesVBucketsCount div UnderloadedNodesCount,
+ R = UnderloadedNodesVBucketsCount - UnderloadedNodesCount * Q,
+
+ do_compute_recovery_map(UnderloadedNodeToVBucketCounts, MissingVBuckets, Q, R).
+
+do_compute_recovery_map([], _, _, _) ->
+ [];
+do_compute_recovery_map(_, [], _, _) ->
+ [];
+do_compute_recovery_map([{Node, Count} | Rest], MissingVBuckets, Q, R) ->
+ {TargetCount, R1} = case R of
+ 0 ->
+ {Q, 0};
+ _ ->
+ {Q + 1, R - 1}
+ end,
+
+ true = (TargetCount > Count),
+ {NodeVBuckets, MissingVBuckets1} = lists:split(TargetCount - Count, MissingVBuckets),
+ [{Node, NodeVBuckets} | do_compute_recovery_map(Rest, MissingVBuckets1, Q, R1)].
Please sign in to comment.
Something went wrong with that request. Please try again.