Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don't worry, you can still create the pull request.
  • 8 commits
  • 7 files changed
  • 0 commit comments
  • 3 contributors
Commits on Nov 25, 2013
Junyi Xie bp MB-9209: hibernate XMEM processes
This is a backport to 2.2 of Alk's fix merged to 2.5
The original fix is at:
    http://review.couchbase.org/#/c/30359/2

Change-Id: I5b71695e9e6f9085e42d20f810de41d17718e92e
Reviewed-on: http://review.couchbase.org/30528
Tested-by: Junyi Xie <junyi.couchbase@gmail.com>
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
b3a681e
Junyi Xie bp:MB-9209: fix unbounded worker stats dictionary
We incorrectly used {<pid>, <unique ref>} from worker processes as
the key used to store worker stats in the vb replicator state. The
problem is each time a worker returns a different <unique ref> that
causes a new entry is created when worker reports stats to the vb
replicator. Overtime, the stats table will increase unboundedly,
causing memory leak. This fix uses a unique worker id to identify
each worker process and use that id as the key of worker stats table.
Thus the number of entries of vb stats table will be bounded by the
number of worker processes (default is 4).

Change-Id: I4c02fba251e234918c3d1b96931d0cc1acf6e4d5
Orig-Change-Id: Iac9461cb2ddad37e6ee0edce4ef7d87e0ceef1e3
Orig-Reviewed-on: http://review.couchbase.org/29907
Reviewed-on: http://review.couchbase.org/30549
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
7519de2
@alk alk bp:MB-9422 Fix recovery map generation.
Here's what recovery map generation code supposed to do:

 1. Figure out which nodes have more vbuckets than they would have
 if vbuckets were equally distributed.

 2. Distribute missing vbuckets among the underutilized nodes so that
 these nodes have more or less equal number of vbuckets.

Unfortunately the code was flawed. It assumed that it's never possible
to "run out" of missing vbuckets and that, basically, every node will
get at least some subset of them. Which is of course not true.

I removed the part that determined under/overutilized nodes and just
assume that since rebalance will be needed anyway, it's enough to just
not put on each node more vbuckets than it would have if the vbucket
map was balanced. I also replaced the call to lists:split to similar
function misc:safe_split. The difference is that the latter doesn't
crash if the list to split is larger than the split index.

I also added a test to verify for different input parameters recovery
map can be generated and that it assign every of the missing vbuckets
to some node.

Orig-Reviewed-on: http://review.couchbase.org/29769

Conflicts:
	src/misc.erl

Change-Id: Iab844794aaf6299e5e2f6280c9e083264021e2ee
Reviewed-on: http://review.couchbase.org/30550
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
b69b2c7
Commits on Nov 26, 2013
@plabee plabee CBSE-811: empty commit to force gerrit/github sync
Change-Id: I123155a5eea581590fb3b57b90e4b72bd0a033d2
bba3f64
@plabee plabee Merge "CBSE-811: empty commit to force gerrit/github sync" into 2.2.0 c78dadb
@alk alk Merge remote-tracking branch 'gerrit/2.2.0' into CBSE-811
* gerrit/2.2.0:
  CBSE-811:  empty commit to force gerrit/github sync
  bp:MB-9422 Fix recovery map generation.
  bp:MB-9209: fix unbounded worker stats dictionary
  bp MB-9209: hibernate XMEM processes
  MB-9140: сorrected link to contacts
  MB-9073 Always set type to xmem when creating a replication.

Change-Id: Iff0f7ea2123677b2ce8e8bb831f67366958c4148
180b792
Commits on Dec 03, 2013
Junyi Xie bp MB-9612: handling remote_cluster_info error more gracefully
This is a backport for a fix merged in 2.5. Original commit is:

http://review.couchbase.org/#/c/30650/

Change-Id: Iad81f2cc53d71e55ce1a69856099f6031546bba1
Reviewed-on: http://review.couchbase.org/30804
Tested-by: Junyi Xie <junyi.couchbase@gmail.com>
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
e40736f
Commits on Feb 04, 2014
@alk alk enforce port 8092 requests to use lhttpc_manager's connections
Change-Id: I2c1b1209e39a7bae8e24194c03fd74bb82e153d7
Reviewed-on: http://review.couchbase.org/32898
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Steve Yen <steve.yen@gmail.com>
b2a9a6f
View
4 include/xdc_replicator.hrl
@@ -39,6 +39,8 @@
-define(XDCR_ERROR_HISTORY, 10).
%% interval (millisecs) to compute rate stats
-define(XDCR_RATE_STAT_INTERVAL_MS, 1000).
+%% sleep time in secs before retry
+-define(XDCR_SLEEP_BEFORE_RETRY, 30).
%% constants used by XMEM
-define(XDCR_XMEM_CONNECTION_ATTEMPTS, 16).
-define(XDCR_XMEM_CONNECTION_TIMEOUT, 120000). %% timeout in ms
@@ -268,6 +270,7 @@
%% options to start xdc replication worker process
-record(rep_worker_option, {
+ worker_id, %% unique id of worker process starting from 1
cp, %% parent vb replicator process
source = #db{}, %% source db
target = #httpdb{}, %% target db
@@ -281,6 +284,7 @@
%% statistics reported from worker process to its parent vbucket replicator
-record(worker_stat, {
+ worker_id,
seq = 0,
worker_meta_latency_aggr = 0,
worker_docs_latency_aggr = 0,
View
11 src/misc.erl
@@ -1584,3 +1584,14 @@ delaying_crash(DelayBy, Body) ->
timer:sleep(DelayBy),
erlang:raise(T, E, ST)
end.
+
+%% Like lists:split but does not fail if N > length(List).
+safe_split(N, List) ->
+ do_safe_split(N, List, []).
+
+do_safe_split(_, [], Acc) ->
+ {lists:reverse(Acc), []};
+do_safe_split(0, List, Acc) ->
+ {lists:reverse(Acc), List};
+do_safe_split(N, [H|T], Acc) ->
+ do_safe_split(N - 1, T, [H|Acc]).
View
95 src/recoverer.erl
@@ -16,6 +16,7 @@
-module(recoverer).
-include("ns_common.hrl").
+-include_lib("eunit/include/eunit.hrl").
-export([start_recovery/1,
get_recovery_map/1, commit_vbucket/2, note_commit_vbucket_done/2,
@@ -49,16 +50,7 @@ start_recovery(BucketConfig) ->
Servers = proplists:get_value(servers, BucketConfig),
true = (Servers =/= undefined),
- MissingVBuckets =
- lists:foldr(
- fun ({V, Chain}, Acc) ->
- case Chain of
- [undefined | _] ->
- [V | Acc];
- _ ->
- Acc
- end
- end, [], misc:enumerate(OldMap, 0)),
+ MissingVBuckets = compute_missing_vbuckets(OldMap),
case MissingVBuckets of
[] ->
@@ -176,24 +168,10 @@ compute_recovery_map(OldMap, Servers, NumVBuckets, MissingVBuckets) ->
NodesCount = length(Servers),
- {OverloadedNodesVBucketsCount, UnderloadedNodeToVBucketCounts} =
- lists:foldr(
- fun ({_Node, VBucketsCount} = Pair, {AccCount, Acc}) ->
- case NodesCount * VBucketsCount >= NumVBuckets of
- true ->
- {AccCount + VBucketsCount, Acc};
- false ->
- {AccCount, [Pair | Acc]}
- end
- end, {0, []}, NodeToVBucketCounts),
+ Q = NumVBuckets div NodesCount,
+ R = NumVBuckets rem NodesCount,
- UnderloadedNodesVBucketsCount = NumVBuckets - OverloadedNodesVBucketsCount,
- UnderloadedNodesCount = length(UnderloadedNodeToVBucketCounts),
-
- Q = UnderloadedNodesVBucketsCount div UnderloadedNodesCount,
- R = UnderloadedNodesVBucketsCount - UnderloadedNodesCount * Q,
-
- do_compute_recovery_map(UnderloadedNodeToVBucketCounts, MissingVBuckets, Q, R).
+ do_compute_recovery_map(NodeToVBucketCounts, MissingVBuckets, Q, R).
do_compute_recovery_map([], _, _, _) ->
[];
@@ -208,5 +186,66 @@ do_compute_recovery_map([{Node, Count} | Rest], MissingVBuckets, Q, R) ->
end,
true = (TargetCount > Count),
- {NodeVBuckets, MissingVBuckets1} = lists:split(TargetCount - Count, MissingVBuckets),
+ {NodeVBuckets, MissingVBuckets1} =
+ misc:safe_split(TargetCount - Count, MissingVBuckets),
[{Node, NodeVBuckets} | do_compute_recovery_map(Rest, MissingVBuckets1, Q, R1)].
+
+compute_missing_vbuckets(Map) ->
+ lists:foldr(
+ fun ({V, Chain}, Acc) ->
+ case Chain of
+ [undefined | _] ->
+ [V | Acc];
+ _ ->
+ Acc
+ end
+ end, [], misc:enumerate(Map, 0)).
+
+-ifdef(EUNIT).
+
+-define(NUM_TEST_ATTEMPTS, 500).
+-define(MAX_NUM_SERVERS, 50).
+
+compute_recovery_map_test_() ->
+ random:seed(now()),
+
+ {timeout, 100,
+ {inparallel,
+ [begin
+ NumServers = random:uniform(?MAX_NUM_SERVERS - 1) + 1,
+ NumCopies = random:uniform(4),
+
+ Title = lists:flatten(
+ io_lib:format("NumServers=~p, NumCopies=~p",
+ [NumServers, NumCopies])),
+
+ Fun = fun () ->
+ compute_recovery_map_test__(NumServers, NumCopies)
+ end,
+ {timeout, 100, {Title, Fun}}
+ end || _ <- lists:seq(1, ?NUM_TEST_ATTEMPTS)]}}.
+
+compute_recovery_map_test__(NumServers, NumCopies) ->
+ Servers = lists:seq(1, NumServers),
+ EmptyMap = lists:duplicate(1024,
+ lists:duplicate(NumCopies, undefined)),
+ Map = mb_map:generate_map(EmptyMap, Servers, []),
+
+ FailoverServers = misc:shuffle(tl(Servers)),
+ lists:foldl(
+ fun (Server, AccMap) ->
+ AccMap1 = mb_map:promote_replicas(AccMap, [Server]),
+ MissingVBuckets = compute_missing_vbuckets(AccMap1),
+
+ RecoveryMap = compute_recovery_map(AccMap1, Servers,
+ 1024, MissingVBuckets),
+ RecoveryMapVBuckets =
+ lists:flatten([Vs || {_, Vs} <- RecoveryMap]),
+
+ ?assertEqual(lists:sort(MissingVBuckets),
+ lists:sort(RecoveryMapVBuckets)),
+
+ AccMap1
+ end, Map, FailoverServers).
+
+-endif.
View
73 src/xdc_vbucket_rep.erl
@@ -129,11 +129,13 @@ handle_info(start_replication, #rep_state{throttle = Throttle,
{noreply, start_replication(St2)}.
handle_call({report_seq_done,
- #worker_stat{seq = Seq,
- worker_item_opt_repd = NumDocsOptRepd,
- worker_item_checked = NumChecked,
- worker_item_replicated = NumWritten,
- worker_data_replicated = WorkerDataReplicated} = WorkerStat}, From,
+ #worker_stat{
+ worker_id = WorkerID,
+ seq = Seq,
+ worker_item_opt_repd = NumDocsOptRepd,
+ worker_item_checked = NumChecked,
+ worker_item_replicated = NumWritten,
+ worker_data_replicated = WorkerDataReplicated} = WorkerStat}, From,
#rep_state{seqs_in_progress = SeqsInProgress,
highest_seq_done = HighestDone,
current_through_seq = ThroughSeq,
@@ -194,8 +196,8 @@ handle_call({report_seq_done,
%% get stats
{ChangesQueueSize, ChangesQueueDocs} = get_changes_queue_stats(State),
- %% update latency stats
- NewWorkersStat = dict:store(From, WorkerStat, AllWorkersStat),
+ %% update latency stats, using worker id as key
+ NewWorkersStat = dict:store(WorkerID, WorkerStat, AllWorkersStat),
%% aggregate weighted latency as well as its weight from each worker
[VbMetaLatencyAggr, VbMetaLatencyWtAggr] = dict:fold(
@@ -492,6 +494,9 @@ update_status_to_parent(#rep_state{parent = Parent,
State#rep_state{status = NewVbStat,
work_start_time = now()}.
+http_db_open(DB, _Options) ->
+ {ok, DB#httpdb{httpc_pool = lhttpc_manager}}.
+
init_replication_state(#init_state{rep = Rep,
vb = Vb,
mode = RepMode,
@@ -503,26 +508,61 @@ init_replication_state(#init_state{rep = Rep,
options = Options
} = Rep,
SrcVbDb = xdc_rep_utils:local_couch_uri_for_vbucket(Src, Vb),
- {ok, CurrRemoteBucket} = remote_clusters_info:get_remote_bucket_by_ref(Tgt,
- false),
+ {ok, CurrRemoteBucket} =
+ case remote_clusters_info:get_remote_bucket_by_ref(Tgt, false) of
+ {ok, RBucket} ->
+ {ok, RBucket};
+ {error, ErrorMsg} ->
+ ?xdcr_error("Error in fetching remot bucket, error: ~p,"
+ "sleep for ~p secs before retry.",
+ [ErrorMsg, (?XDCR_SLEEP_BEFORE_RETRY)]),
+ %% sleep and retry once
+ timer:sleep(1000*?XDCR_SLEEP_BEFORE_RETRY),
+ remote_clusters_info:get_remote_bucket_by_ref(Tgt, false);
+ {error, Error, Msg} ->
+ ?xdcr_error("Error in fetching remot bucket, error: ~p, msg: ~p"
+ "sleep for ~p secs before retry",
+ [Error, Msg, (?XDCR_SLEEP_BEFORE_RETRY)]),
+ %% sleep and retry once
+ timer:sleep(1000*?XDCR_SLEEP_BEFORE_RETRY),
+ remote_clusters_info:get_remote_bucket_by_ref(Tgt, false);
+ {error, Error, Msg, Details} ->
+ ?xdcr_error("Error in fetching remot bucket, error: ~p, msg: ~p, details: ~p"
+ "sleep for ~p secs before retry",
+ [Error, Msg, Details, (?XDCR_SLEEP_BEFORE_RETRY)]),
+ %% sleep and retry once
+ timer:sleep(1000*?XDCR_SLEEP_BEFORE_RETRY),
+ remote_clusters_info:get_remote_bucket_by_ref(Tgt, false)
+ end,
+
TgtURI = hd(dict:fetch(Vb, CurrRemoteBucket#remote_bucket.capi_vbucket_map)),
TgtDb = xdc_rep_utils:parse_rep_db(TgtURI, [], Options),
{ok, Source} = couch_api_wrap:db_open(SrcVbDb, []),
- {ok, Target} = couch_api_wrap:db_open(TgtDb, []),
+ {ok, Target} = http_db_open(TgtDb, []),
{ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
{ok, SrcMasterDb} = couch_api_wrap:db_open(
xdc_rep_utils:get_master_db(Source),
[]),
- {ok, TgtMasterDb} = couch_api_wrap:db_open(
+ {ok, TgtMasterDb} = http_db_open(
xdc_rep_utils:get_master_db(Target),
[]),
XMemRemote = case RepMode of
"xmem" ->
{ok, {Ip, Port}, LatestRemoteBucket} =
- remote_clusters_info:get_memcached_vbucket_info_by_ref(Tgt, false, Vb),
+ case remote_clusters_info:get_memcached_vbucket_info_by_ref(Tgt, false, Vb) of
+ {ok, RemoteNode, TgtBucket} ->
+ {ok, RemoteNode, TgtBucket};
+ Error2 ->
+ ?xdcr_error("Error in fetching remot memcached vbucket info, error: ~p"
+ "sleep for ~p secs before retry",
+ [Error2, (?XDCR_SLEEP_BEFORE_RETRY)]),
+ timer:sleep(1000*?XDCR_SLEEP_BEFORE_RETRY),
+ remote_clusters_info:get_memcached_vbucket_info_by_ref(Tgt, false, Vb)
+ end,
+
{ok, {_ClusterUUID, BucketName}} = remote_clusters_info:parse_remote_bucket_reference(Tgt),
Password = binary_to_list(LatestRemoteBucket#remote_bucket.password),
#xdc_rep_xmem_remote{ip = binary_to_list(Ip), port = Port,
@@ -661,11 +701,11 @@ start_replication(#rep_state{
BatchSizeItems = get_value(worker_batch_size, Options),
{ok, Source} = couch_api_wrap:db_open(SourceName, []),
TgtURI = xdc_rep_utils:parse_rep_db(TargetName, [], Options),
- {ok, Target} = couch_api_wrap:db_open(TgtURI, []),
+ {ok, Target} = http_db_open(TgtURI, []),
{ok, SrcMasterDb} = couch_api_wrap:db_open(
xdc_rep_utils:get_master_db(Source),
[]),
- {ok, TgtMasterDb} = couch_api_wrap:db_open(
+ {ok, TgtMasterDb} = http_db_open(
xdc_rep_utils:get_master_db(Target), []),
{ok, ChangesQueue} = couch_work_queue:new([
@@ -731,8 +771,9 @@ start_replication(#rep_state{
batch_items = BatchSizeItems},
Workers = lists:map(
- fun(_) ->
- {ok, WorkerPid} = xdc_vbucket_rep_worker:start_link(WorkerOption),
+ fun(WorkerID) ->
+ WorkerOption2 = WorkerOption#rep_worker_option{worker_id = WorkerID},
+ {ok, WorkerPid} = xdc_vbucket_rep_worker:start_link(WorkerOption2),
WorkerPid
end,
lists:seq(1, NumWorkers)),
View
19 src/xdc_vbucket_rep_worker.erl
@@ -21,6 +21,7 @@
%% the target should always from remote with record #httpdb{}. There is
%% no intra-cluster XDCR
start_link(#rep_worker_option{cp = Cp, source = Source, target = Target,
+ worker_id = WorkerID,
changes_manager = ChangesManager,
opt_rep_threshold = OptRepThreshold,
xmem_server = XMemSrv,
@@ -28,21 +29,21 @@ start_link(#rep_worker_option{cp = Cp, source = Source, target = Target,
batch_items = BatchItems} = _WorkerOption) ->
Pid = spawn_link(fun() ->
erlang:monitor(process, ChangesManager),
- queue_fetch_loop(Source, Target, Cp,
+ queue_fetch_loop(WorkerID, Source, Target, Cp,
ChangesManager, OptRepThreshold,
BatchSize, BatchItems, XMemSrv)
end),
- ?xdcr_trace("create queue_fetch_loop process (pid: ~p) within replicator (pid: ~p) "
+ ?xdcr_trace("create queue_fetch_loop process (worker_id: ~p, pid: ~p) within replicator (pid: ~p) "
"Source: ~p, Target: ~p, ChangesManager: ~p, latency optimized: ~p",
- [Pid, Cp, Source#db.name, misc:sanitize_url(Target#httpdb.url), ChangesManager, OptRepThreshold]),
+ [WorkerID, Pid, Cp, Source#db.name, misc:sanitize_url(Target#httpdb.url), ChangesManager, OptRepThreshold]),
{ok, Pid}.
--spec queue_fetch_loop(#db{}, #httpdb{}, pid(), pid(),
+-spec queue_fetch_loop(integer(), #db{}, #httpdb{}, pid(), pid(),
integer(), integer(), integer(), pid() | nil) -> ok.
-queue_fetch_loop(Source, Target, Cp, ChangesManager,
+queue_fetch_loop(WorkerID, Source, Target, Cp, ChangesManager,
OptRepThreshold, BatchSize, BatchItems, nil) ->
?xdcr_trace("fetch changes from changes manager at ~p (target: ~p)",
[ChangesManager, misc:sanitize_url(Target#httpdb.url)]),
@@ -71,6 +72,7 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager,
%% report seq done and stats to vb replicator
ok = gen_server:call(Cp, {report_seq_done,
#worker_stat{
+ worker_id = WorkerID,
seq = ReportSeq,
worker_meta_latency_aggr = MetaLatency*NumChecked,
worker_docs_latency_aggr = DocLatency*NumWritten,
@@ -82,11 +84,11 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager,
?xdcr_trace("Worker reported completion of seq ~p, num docs written: ~p "
"data replicated: ~p bytes, latency: ~p ms.",
[ReportSeq, NumWritten, DataRepd, DocLatency]),
- queue_fetch_loop(Source, Target, Cp, ChangesManager,
+ queue_fetch_loop(WorkerID, Source, Target, Cp, ChangesManager,
OptRepThreshold, BatchSize, BatchItems, nil)
end;
-queue_fetch_loop(Source, Target, Cp, ChangesManager,
+queue_fetch_loop(WorkerID, Source, Target, Cp, ChangesManager,
OptRepThreshold, BatchSize, BatchItems, XMemSrv) ->
?xdcr_trace("fetch changes from changes manager at ~p (target: ~p)",
[ChangesManager, misc:sanitize_url(Target#httpdb.url)]),
@@ -118,6 +120,7 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager,
%% report seq done and stats to vb replicator
ok = gen_server:call(Cp, {report_seq_done,
#worker_stat{
+ worker_id = WorkerID,
seq = ReportSeq,
worker_meta_latency_aggr = MetaLatency*NumChecked,
worker_docs_latency_aggr = DocLatency*NumWritten,
@@ -129,7 +132,7 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager,
?xdcr_trace("Worker reported completion of seq ~p, num docs written: ~p "
"data replicated: ~p bytes, latency: ~p ms.",
[ReportSeq, NumWritten, DataRepd, DocLatency]),
- queue_fetch_loop(Source, Target, Cp, ChangesManager,
+ queue_fetch_loop(WorkerID, Source, Target, Cp, ChangesManager,
OptRepThreshold, BatchSize, BatchItems, XMemSrv)
end.
View
2  src/xdc_vbucket_rep_xmem_srv.erl
@@ -156,7 +156,7 @@ handle_call(disconnect, {_Pid, _Tag},
end,
dict:new(),
dict:to_list(Workers)),
- {reply, ok, State#xdc_vb_rep_xmem_srv_state{pid_workers = IdleWorkers}};
+ {reply, ok, State#xdc_vb_rep_xmem_srv_state{pid_workers = IdleWorkers}, hibernate};
handle_call(select_bucket, {_Pid, _Tag},
#xdc_vb_rep_xmem_srv_state{
View
2  src/xdc_vbucket_rep_xmem_worker.erl
@@ -127,7 +127,7 @@ handle_call({connect, #xdc_rep_xmem_remote{} = Remote}, {_Pid, _Tag},
handle_call(disconnect, {_Pid, _Tag}, #xdc_vb_rep_xmem_worker_state{} = State) ->
State1 = close_connection(State),
- {reply, ok, State1};
+ {reply, ok, State1, hibernate};
handle_call({select_bucket, #xdc_rep_xmem_remote{} = Remote}, {_Pid, _Tag},
#xdc_vb_rep_xmem_worker_state{id = Id, vb = Vb, socket = Socket} = State) ->

No commit comments for this range

Something went wrong with that request. Please try again.