Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

allows the view changes based replication

  • Loading branch information...
commit e25049fe245c429ad83eafffc6aef4f3336fd65b 1 parent d7f232c
@benoitc benoitc authored
View
18 apps/couch_replicator/src/couch_replicator.erl
@@ -42,6 +42,7 @@
-record(rep_state, {
rep_details,
+ type = db,
source_name,
target_name,
source,
@@ -526,7 +527,8 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
init_state(Rep) ->
#rep{
source = Src, target = Tgt,
- options = Options, user_ctx = UserCtx
+ options = Options, user_ctx = UserCtx,
+ type = Type
} = Rep,
{ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
@@ -543,6 +545,7 @@ init_state(Rep) ->
#doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
rep_details = Rep,
+ type = Type,
source_name = couch_replicator_api_wrap:db_uri(Source),
target_name = couch_replicator_api_wrap:db_uri(Target),
source = Source,
@@ -905,7 +908,20 @@ db_monitor(#db{} = Db) ->
db_monitor(_HttpDb) ->
nil.
+source_cur_seq(#rep_state{source = #httpdb{} = Db, type = view} = State) ->
+ #rep_state{rep_details = Rep, source_seq = Seq} = State,
+ case (catch couch_replicator_api_wrap:get_view_seq(Db#httpdb{retries = 3},
+ Rep)) of
+ {ok, {Info}} ->
+ get_value(<<"last_seq">>, Info, Seq);
+ _ ->
+ Seq
+ end;
+source_cur_seq(#rep_state{source = Db, type = view} = State) ->
+ #rep_state{rep_details = Rep} = State,
+ {ok, LastSeq} = couch_replicator_api_wrap:get_view_seq(Db, Rep),
+ LastSeq;
source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
{ok, Info} ->
View
2  apps/couch_replicator/src/couch_replicator.hrl
@@ -16,6 +16,8 @@
id,
source,
target,
+ type = db,
+ view = nil,
options,
user_ctx,
doc_id
View
56 apps/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -21,13 +21,16 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_httpd/include/couch_httpd.hrl").
-include_lib("couch_changes/include/couch_changes.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-include("couch_replicator_api_wrap.hrl").
+-include("couch_replicator.hrl").
-export([
db_open/2,
db_open/3,
db_close/1,
get_db_info/1,
+ get_view_seq/2,
update_doc/3,
update_doc/4,
update_docs/3,
@@ -120,6 +123,21 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
{ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+get_view_seq(#httpdb{} = Db, Rep) ->
+ #rep{view = {DDoc, VName},
+ options = Options} = Rep,
+ Path = binary_to_list(DDoc) ++ "/_view" ++ binary_to_list(VName) ++
+ "/_last_seq",
+ send_req(Db, [{path, Path}, {qs, view_qargs(Options)}],
+ fun(200, _, {Props}) ->
+ {ok, get_value(<<"last_seq">>, Props)}
+ end);
+get_view_seq(Db, Rep) ->
+ #rep{view = {DDoc, VName},
+ options = Options} = Rep,
+ couch_mrview:get_last_seq(Db#db.name, DDoc, VName,
+ view_mrargs(Options)).
+
ensure_full_commit(#httpdb{} = Db) ->
send_req(
Db,
@@ -384,12 +402,43 @@ changes_since(Db, Style, StartSeq, UserFun, Options) ->
% internal functions
+%
+view_qargs(Options) ->
+ ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)],
+ ViewFields = ["key" | ViewFields0],
+ {Params} = get_value(query_params, Options, {[]}),
+ lists:foldl(fun({K, V}, Acc) ->
+ Ks = couch_util:to_list(K),
+ case lists:member(Ks, ViewFields) of
+ true ->
+ [{Ks, ?JSON_ENCODE(V)} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Params).
+
+view_mrargs(Options) ->
+ ViewFields = [key | record_info(fields, mrargs)],
+ {Params} = get_value(query_params, Options, {[]}),
+ lists:foldl(fun({K, V}, Acc) ->
+ K1 = list_to_atom(couch_util:to_list(K)),
+ case lists:member(K1, ViewFields) of
+ true ->
+ [{K1, V} | Acc];
+ false ->
+ Acc
+ end
+ end, [], Params).
+
maybe_add_changes_filter_q_args(BaseQS, Options) ->
case get_value(filter, Options) of
undefined ->
BaseQS;
FilterName ->
+ ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)],
+ ViewFields = ["key" | ViewFields0],
+
{Params} = get_value(query_params, Options, {[]}),
[{"filter", ?b2l(FilterName)} | lists:foldl(
fun({K, V}, QSAcc) ->
@@ -397,6 +446,13 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
case lists:keymember(Ks, 1, QSAcc) of
true ->
QSAcc;
+ false when FilterName =:= <<"_view">> ->
+ V1 = case lists:member(Ks, ViewFields) of
+ true ->
+ ?JSON_ENCODE(V);
+ false -> couch_util:to_list(V)
+ end,
+ [{Ks, V1} | QSAcc];
false ->
[{Ks, couch_util:to_list(V)} | QSAcc]
end
View
1  apps/couch_replicator/src/couch_replicator_notifier.erl
@@ -22,6 +22,7 @@
-export([handle_event/2, handle_call/2, handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_changes/include/couch_changes.hrl").
start_link(FunAcc) ->
couch_event_sup:start_link(couch_replication,
View
17 apps/couch_replicator/src/couch_replicator_utils.erl
@@ -38,11 +38,24 @@ parse_rep_doc({Props}, UserCtx) ->
true ->
{ok, #rep{options = Options, user_ctx = UserCtx}};
false ->
- Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
- Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+ Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams,
+ Options),
+ Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams,
+ Options),
+ {Type, DDoc} = case get_value(<<"filter">>, Props) of
+ <<"_view">> ->
+ {QP} = get_value(query_params, Options, {[]}),
+ View = get_value(<<"view">>, QP),
+ [DName, VName] = binary:split(View, <<"/">>),
+ {view, {<< "_design/", DName/binary >>, VName}};
+ _ ->
+ {db, nil}
+ end,
Rep = #rep{
source = Source,
target = Target,
+ type = Type,
+ view = DDoc,
options = Options,
user_ctx = UserCtx,
doc_id = get_value(<<"_id">>, Props, null)
Please sign in to comment.
Something went wrong with that request. Please try again.