Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

MB-7837: continuously tunable optimistic XDCR

This commit introduces "xdcr_optimistic_replication_threshold",
which is 256 bytes by default. This paraemter is used by XDCR to
split docs list into two: a list of big docs, whose docs are all
bigger than the threshold parameter,  and a list of small docs
whose docs are no greater than it. For small docs, we skip all
revs_diff operations and optimistically send them directly to the
remote cluster. For big docs, we still send revs_diff first and
then only send those docs surviving conflict resolution at remote
node.

By this way, users are able to continuously adjust the parameter
to determine which docs should be replicated optimistically.

The deletion, however, is always treated as small doc, regardless
of its doc size, because there is no benefit to send revs_diff for
deletions at all.

The corresponding environment parameter is:

"XDCR_OPTIMISTIC_REPLICATION_THRESHOLD"

and users can always override the ns_server parameter using the env
parameter.

The ns_server parameter "xdcr_optimistic_replication" and its env
parameters will both retire with this commit since the optimistic
replication is naturally encoded in the new parameter, which can
be set to a big number to cover all docs.

Change-Id: I739b2c63cce64e31b18387938acf1c844f023cc6
Reviewed-on: http://review.couchbase.org/24934
Tested-by: Junyi Xie <junyi.couchbase@gmail.com>
Reviewed-by: Damien Katz <damien@couchbase.com>
  • Loading branch information...
commit ee5334a784c8887821c0b13638c67d279f104b25 1 parent 527994c
Junyi Xie authored Damienkatz committed
View
2  include/xdc_replicator.hrl
@@ -232,7 +232,7 @@
target = #httpdb{}, %% target db
changes_manager, %% process to queue changes from storage
max_conns, %% max connections
- latency_opt %% latenty optimized option
+ opt_rep_threshold %% optimistic replication threshold
}).
%% statistics reported from worker process to its parent vbucket replicator
View
7 src/ns_config_default.erl
@@ -72,10 +72,9 @@ default() ->
{xdcr_num_http_connections, 20}, % max # of http conns
{xdcr_num_retries_per_request, 2}, % # of retries
- %% an option to optimistically send mutations to the destination
- %% without first doing a look-up to see if the mutation is needed
- %% on the other side without issuing a getwithmeta().
- {xdcr_optimistic_replication, false},
+ %% document body size threshold (bytes) to trigger optimistic replication
+ %% when doc body size is no greater than the threshold
+ {xdcr_optimistic_replication_threshold, 256},
{directory, path_config:component_path(data, "config")},
{index_aware_rebalance_disabled, false},
View
6 src/xdc_rep_manager.erl
@@ -320,10 +320,10 @@ dump_parameters() ->
{value, DefaultRestartWaitTime} = ns_config:search(xdcr_failure_restart_interval),
RestartWaitTime = misc:getenv_int("XDCR_FAILURE_RESTART_INTERVAL", DefaultRestartWaitTime),
- LatencyOpt = xdc_rep_utils:is_latency_optimized(),
+ OptRepThreshold = xdc_rep_utils:get_opt_replication_threshold(),
?xdcr_debug("default XDCR parameters:~n \t"
- "latency optimization mode: ~p;~n \t"
+ "optimistic replication threshold: ~p bytes;~n \t"
"number of max concurrent reps per bucket: ~p;~n \t"
"checkpoint interval in secs: ~p;~n \t"
"limit of replication batch size: ~p docs, ~p kilobytes;~n \t"
@@ -332,7 +332,7 @@ dump_parameters() ->
"max number HTTP connections per vb replicator: ~p;~n \t"
"max number retries per connection: ~p;~n \t"
"vb replicator waiting time before restart: ~p ",
- [LatencyOpt,
+ [OptRepThreshold,
MaxConcurrentReps,
IntervalSecs,
DefBatchSize, DocBatchSizeKB,
View
39 src/xdc_rep_utils.erl
@@ -23,7 +23,7 @@
-export([parse_rep_db/1,parse_rep_db/3]).
-export([split_dbname/1]).
-export([get_master_db/1, get_checkpoint_log_id/2]).
--export([is_latency_optimized/0]).
+-export([get_opt_replication_threshold/0]).
-include("xdc_replicator.hrl").
@@ -162,16 +162,16 @@ make_options(Props) ->
couch_config:get("replicator", "socket_options",
"[{keepalive, true}, {nodelay, false}]")),
- LatencyOpt= is_latency_optimized(),
+ OptRepThreshold = get_opt_replication_threshold(),
?xdcr_debug("Options for replication:["
- "latency optimized: ~p, "
+ "optimistic replication threshold: ~p bytes, "
"worker processes: ~p, "
"worker batch size (# of mutations): ~p, "
"HTTP connections: ~p, "
"connection timeout (ms): ~p,"
"num of retries per request: ~p]",
- [LatencyOpt, DefWorkers, DefBatchSize, DefConns, DefTimeout, DefRetries]),
+ [OptRepThreshold, DefWorkers, DefBatchSize, DefConns, DefTimeout, DefRetries]),
lists:ukeymerge(1, Options, lists:keysort(1, [
{connection_timeout, DefTimeout},
@@ -181,7 +181,7 @@ make_options(Props) ->
{socket_options, DefSocketOptions},
{worker_batch_size, DefBatchSize},
{worker_processes, DefWorkers},
- {latency_opt, LatencyOpt}
+ {opt_rep_threshold, OptRepThreshold}
])).
@@ -310,25 +310,14 @@ unsplit_uuid({DbName, undefined}) ->
unsplit_uuid({DbName, UUID}) ->
DbName ++ ";" ++ UUID.
--spec is_latency_optimized() -> boolean().
-is_latency_optimized() ->
- {value, DefaultLatencyOpt} = ns_config:search(xdcr_optimistic_replication),
-
- EnvVar = case (catch string:to_lower(os:getenv("XDCR_OPTIMISTIC_REPLICATION"))) of
- "true" ->
- true;
- "false" ->
- false;
- _ ->
- undefined
- end,
-
- %% env var overrides ns_config parameter, use default ns_config parameter
- %% only when env var is undefined
- case EnvVar of
- undefined ->
- DefaultLatencyOpt;
+-spec get_opt_replication_threshold() -> integer().
+get_opt_replication_threshold() ->
+ {value, DefaultOptRepThreshold} = ns_config:search(xdcr_optimistic_replication_threshold),
+ Threshold = misc:getenv_int("XDCR_OPTIMISTIC_REPLICATION_THRESHOLD", DefaultOptRepThreshold),
+
+ case Threshold of
+ V when V < 0 ->
+ 0;
_ ->
- EnvVar
+ Threshold
end.
-
View
4 src/xdc_vbucket_rep.erl
@@ -543,7 +543,7 @@ start_replication(#rep_state{
%% a batch of _changes rows to process -> check which revs are missing in the
%% target, and for the missing ones, it copies them from the source to the target.
MaxConns = get_value(http_connections, Options),
- LatencyOpt = get_value(latency_opt, Options),
+ OptRepThreshold = get_value(opt_rep_threshold, Options),
?xdcr_info("changes reader process (PID: ~p) and manager process (PID: ~p) "
"created, now starting worker processes...",
@@ -554,7 +554,7 @@ start_replication(#rep_state{
WorkerOption = #rep_worker_option{
cp = self(), source = Source, target = Target,
changes_manager = ChangesManager, max_conns = MaxConns,
- latency_opt = LatencyOpt},
+ opt_rep_threshold = OptRepThreshold},
Workers = lists:map(
fun(_) ->
View
103 src/xdc_vbucket_rep_worker.erl
@@ -21,18 +21,18 @@
%% the target should always from remote with record #httpdb{}. There is
%% no intra-cluster XDCR
start_link(#rep_worker_option{cp = Cp, source = Source, target = Target,
- changes_manager = ChangesManager, latency_opt = LatencyOptimized} = _WorkerOption) ->
+ changes_manager = ChangesManager, opt_rep_threshold = OptRepThreshold} = _WorkerOption) ->
Pid = spawn_link(fun() ->
erlang:monitor(process, ChangesManager),
- queue_fetch_loop(Source, Target, Cp, ChangesManager, LatencyOptimized)
+ queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold)
end),
?xdcr_debug("create queue_fetch_loop process (pid: ~p) within replicator (pid: ~p) "
"Source: ~p, Target: ~p, ChangesManager: ~p, latency optimized: ~p",
- [Pid, Cp, Source#db.name, Target#httpdb.url, ChangesManager, LatencyOptimized]),
+ [Pid, Cp, Source#db.name, Target#httpdb.url, ChangesManager, OptRepThreshold]),
{ok, Pid}.
-queue_fetch_loop(Source, Target, Cp, ChangesManager, LatencyOptimized) ->
+queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold) ->
?xdcr_debug("fetch changes from changes manager at ~p (target: ~p)",
[ChangesManager, Target#httpdb.url]),
ChangesManager ! {get_changes, self()},
@@ -41,7 +41,7 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager, LatencyOptimized) ->
ok = gen_server:call(Cp, {worker_done, self()}, infinity);
{changes, ChangesManager, Changes, ReportSeq} ->
%% get docinfo of missing ids
- {MissingDocInfoList, MetaLatency} = find_missing(Changes, Target, LatencyOptimized),
+ {MissingDocInfoList, MetaLatency} = find_missing(Changes, Target, OptRepThreshold),
NumChecked = length(Changes),
NumWritten = length(MissingDocInfoList),
%% use ptr in docinfo to fetch document from storage
@@ -61,7 +61,7 @@ queue_fetch_loop(Source, Target, Cp, ChangesManager, LatencyOptimized) ->
worker_item_checked = NumChecked,
worker_item_replicated = NumWritten}}, infinity),
?xdcr_debug("Worker reported completion of seq ~p", [ReportSeq]),
- queue_fetch_loop(Source, Target, Cp, ChangesManager, LatencyOptimized)
+ queue_fetch_loop(Source, Target, Cp, ChangesManager, OptRepThreshold)
end.
local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}) ->
@@ -139,24 +139,48 @@ flush_docs(Target, DocList) ->
%% return list of Docsinfos of missing keys
-spec find_missing(list(), #httpdb{}, boolean()) -> {list(), integer()}.
-find_missing(DocInfos, Target, LatencyOptimized) ->
+find_missing(DocInfos, Target, OptRepThreshold) ->
Start = now(),
- {IdRevs, AllRevsCount} = lists:foldr(
- fun(#doc_info{id = Id, rev = Rev}, {IdRevAcc, CountAcc}) ->
- {[{Id, Rev} | IdRevAcc], CountAcc + 1}
- end,
- {[], 0}, DocInfos),
-
- %% if latency optimized, skip the getMeta ops and send all docs
- Missing = case LatencyOptimized of
- false ->
- {ok, MissingIdRevs} = couch_api_wrap:get_missing_revs(Target, IdRevs),
- MissingIdRevs;
- _ ->
- IdRevs
- end,
-
- %%build list of docinfo for all missing ids
+
+ %% depending on doc body size, we separate all keys into two groups:
+ %% keys with doc body size greater than the threshold, and keys with doc body
+ %% smaller than or equal to threshold.
+ {BigDocIdRevs, SmallDocIdRevs, DelCount, BigDocCount,
+ SmallDocCount, AllRevsCount} = lists:foldr(
+ fun(#doc_info{id = Id, rev = Rev, deleted = Deleted, size = DocSize},
+ {BigIdRevAcc, SmallIdRevAcc, DelAcc, BigAcc, SmallAcc, CountAcc}) ->
+ %% deleted doc is always treated as small doc, regardless of doc size
+ {BigIdRevAcc1, SmallIdRevAcc1, DelAcc1, BigAcc1, SmallAcc1} =
+ case Deleted of
+ true ->
+ {BigIdRevAcc, [{Id, Rev} | SmallIdRevAcc], DelAcc + 1,
+ BigAcc, SmallAcc + 1};
+ _ ->
+ %% for all other mutations, check its doc size
+ case DocSize > OptRepThreshold of
+ true ->
+ {[{Id, Rev} | BigIdRevAcc], SmallIdRevAcc, DelAcc,
+ BigAcc + 1, SmallAcc};
+ _ ->
+ {BigIdRevAcc, [{Id, Rev} | SmallIdRevAcc], DelAcc,
+ BigAcc, SmallAcc + 1}
+ end
+ end,
+ {BigIdRevAcc1, SmallIdRevAcc1, DelAcc1, BigAcc1, SmallAcc1, CountAcc + 1}
+ end,
+ {[], [], 0, 0, 0, 0}, DocInfos),
+
+ %% metadata operation for big docs only
+ {Missing, MissingBigDocCount} =
+ case length(BigDocIdRevs) of
+ V when V > 0 ->
+ {ok , MissingBigIdRevs} = couch_api_wrap:get_missing_revs(Target, BigDocIdRevs),
+ {lists:flatten([SmallDocIdRevs | MissingBigIdRevs]), length(MissingBigIdRevs)};
+ _ ->
+ {SmallDocIdRevs, 0}
+ end,
+
+ %% build list of docinfo for all missing keys
MissingDocInfoList = lists:filter(
fun(#doc_info{id = Id, rev = _Rev} = _DocInfo) ->
case lists:keyfind(Id, 1, Missing) of
@@ -169,23 +193,16 @@ find_missing(DocInfos, Target, LatencyOptimized) ->
end,
DocInfos),
- case LatencyOptimized of
- false ->
- ?xdcr_debug("after conflict resolution at target (~p), out of all ~p docs "
- "the number of docs we need to replicate is: ~p",
- [Target#httpdb.url, AllRevsCount, length(Missing)]);
- _ ->
- ?xdcr_debug("latency optimized mode, no conflict resolution at target (~p), "
- "all ~p docs will be replicated",
- [Target#httpdb.url, length(IdRevs)])
- end,
-
- %% latency in millisecond, 0 if latency opt mode
- Latency = case LatencyOptimized of
- false ->
- (timer:now_diff(now(), Start) div 1000);
- _ ->
- 0
- end,
-
- {MissingDocInfoList, round(Latency)}.
+ %% latency in millisecond
+ Latency = round(timer:now_diff(now(), Start) div 1000),
+
+ ?xdcr_debug("out of all ~p docs, number of small docs (including dels: ~p) is ~p, "
+ "number of big docs is ~p, threshold is ~p bytes, ~n\t"
+ "after conflict resolution at target (~p), out of all big ~p docs "
+ "the number of docs we need to replicate is: ~p; ~n\t "
+ "total # of docs to be replicated is: ~p, total latency: ~p ms",
+ [AllRevsCount, DelCount, SmallDocCount, BigDocCount, OptRepThreshold,
+ Target#httpdb.url, BigDocCount, MissingBigDocCount,
+ length(MissingDocInfoList), Latency]),
+
+ {MissingDocInfoList, Latency}.
Please sign in to comment.
Something went wrong with that request. Please try again.