From 5d240fa0da761ac8e0041ed887c2b623668f28c3 Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 22 Feb 2012 11:52:48 +0100 Subject: [PATCH 01/14] Add `couch_mrview:view_changes_since` function. This functions fetch all changes in a view since last update like the `couch_db:changes_since` function for the databases. It returns an error if the option `seq_indexed` hasn't been set to `true` in the design document: { "_id": "_design/somedoc", "options": { "seq_indexed": "true" }, "views": { "someview": { "map": "function(doc) { if (doc.field) emit(doc.field, null); }" } } } This is done by adding an index to the view group (a view group is the global index associated to a design doc basically). This index associate each sequences & documents ID to a view . `{{ViewId, Seq}, DocId} is stored in the seq_btree, DocId is the value. example of usage: (refuge@127.0.0.1)3> F = fun(KV, Acc) -> {ok, [KV|Acc]} end. (refuge@127.0.0.1)4> (refuge@127.0.0.1)4> couch_mrview:view_changes_since(Db, DDoc, <<"all">>, 9990, F). [info] [<0.220.0>] Opening index for db: testdb idx: _design/test sig: "85b8fa1e39dc7fbe8b11b729b319c6bf" {ok,[{10000,{<<"72f9250618e65adc8cf4552758fff137">>,0}}, {9999,{<<"72f9250618e65adc8cf4552758ffe85d">>,0}}, {9998,{<<"72f9250618e65adc8cf4552758ffdbcf">>,0}}, {9997,{<<"72f9250618e65adc8cf4552758ffd65e">>,0}}, {9996,{<<"72f9250618e65adc8cf4552758ffd611">>,0}}, {9995,{<<"72f9250618e65adc8cf4552758ffd045">>,0}}, {9994,{<<"72f9250618e65adc8cf4552758ffc982">>,0}}, {9993,{<<"72f9250618e65adc8cf4552758ffbfdb">>,0}}, {9992,{<<"72f9250618e65adc8cf4552758ffb1fc">>,0}}, {9991,{<<"72f9250618e65adc8cf4552758ffae18">>,0}}]} (refuge@127.0.0.1)5> (refuge@127.0.0.1)5> couch_mrview:view_changes_since(Db, DDoc, <<"all">>, 10000, F). {ok,[]} (refuge@127.0.0.1)7> couch_mrview:view_changes_since(Db, DDoc, <<"all">>, 10000, F). {ok,[{10002,{<<"31b0c9f8353b6b3e2a86c23d2c000a78">>,0}}]} --- apps/couch_mrview/include/couch_mrview.hrl | 4 +- apps/couch_mrview/src/couch_mrview.erl | 46 ++++++++++ .../src/couch_mrview_compactor.erl | 39 +++++--- .../couch_mrview/src/couch_mrview_updater.erl | 90 +++++++++++++------ apps/couch_mrview/src/couch_mrview_util.erl | 58 ++++++++++-- 5 files changed, 191 insertions(+), 46 deletions(-) diff --git a/apps/couch_mrview/include/couch_mrview.hrl b/apps/couch_mrview/include/couch_mrview.hrl index bf3bcace..9d8cf57c 100644 --- a/apps/couch_mrview/include/couch_mrview.hrl +++ b/apps/couch_mrview/include/couch_mrview.hrl @@ -18,12 +18,13 @@ idx_name, language, design_opts=[], + seq_indexed=false, lib, views, id_btree=nil, + seq_btree=nil, update_seq=0, purge_seq=0, - first_build, partial_resp_pid, doc_acc, @@ -49,6 +50,7 @@ seq=0, purge_seq=0, id_btree_state=nil, + seq_btree_state=nil, view_states=nil }). diff --git a/apps/couch_mrview/src/couch_mrview.erl b/apps/couch_mrview/src/couch_mrview.erl index d31ed18e..4b387c8e 100644 --- a/apps/couch_mrview/src/couch_mrview.erl +++ b/apps/couch_mrview/src/couch_mrview.erl @@ -17,6 +17,8 @@ -export([get_info/2]). -export([compact/2, compact/3, cancel_compaction/2]). -export([cleanup/1]). +-export([view_changes_since/5, view_changes_since/6, + view_changes_since/7]). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -86,6 +88,50 @@ query_view(Db, {Type, View}, Args, Callback, Acc) -> red -> red_fold(Db, View, Args, Callback, Acc) end. +view_changes_since(Db, DDoc, VName, StartSeq, Callback) -> + view_changes_since(Db, DDoc, VName, StartSeq, Callback, #mrargs{}, []). + +view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args) -> + view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, []). + +view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) + when is_list(Args) -> + view_changes_since(Db, DDoc, VName, StartSeq, Callback, to_mrargs(Args), + Acc); + +view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) -> + #mrargs{direction=Dir} = Args, + + EndSeq = case Dir of + fwd -> 16#10000000; + rev -> 0 + end, + Args1 = Args#mrargs{start_key = {VName, StartSeq + 1}, + end_key = {VName, EndSeq}}, + + {ok, {_, View, _}, State} = couch_mrview_util:get_view_state(Db, DDoc, + VName, Args1), + + #mrst{seq_indexed=SeqIndexed, seq_btree=SeqBtree} = State, + + case SeqIndexed of + false -> + {error, seqs_not_indexed}; + _ -> + #mrview{id_num=Id} = View, + WrapperFun = fun + ({{_ViewID, _Seq}, _DocId}=KV, _Reds, Acc2) -> + Callback(KV, Acc2); + (_, _, Acc2) -> + {ok , Acc2} + end, + + {ok, _R, AccOut} = couch_btree:fold(SeqBtree, WrapperFun, Acc, + [{start_key, {Id, StartSeq + 1}}, {end_key, {Id, EndSeq}}, + {dir, Dir}]), + {ok, AccOut} + end. + get_info(Db, DDoc) -> {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), diff --git a/apps/couch_mrview/src/couch_mrview_compactor.erl b/apps/couch_mrview/src/couch_mrview_compactor.erl index 508e79cc..775d7692 100644 --- a/apps/couch_mrview/src/couch_mrview_compactor.erl +++ b/apps/couch_mrview/src/couch_mrview_compactor.erl @@ -19,6 +19,8 @@ -record(acc, { btree = nil, + id_btree = nil, + seq_btree = nil, last_id = nil, kvs = [], kvs_size = 0, @@ -37,8 +39,9 @@ compact(State) -> #mrst{ db_name=DbName, idx_name=IdxName, + seq_indexed=SeqIndexed, sig=Sig, - update_seq=Seq, + update_seq=UpdateSeq, id_btree=IdBtree, views=Views } = State, @@ -51,6 +54,7 @@ compact(State) -> #mrst{ id_btree = EmptyIdBtree, + seq_btree = EmptySeqBtree, views = EmptyViews } = EmptyState, @@ -74,7 +78,8 @@ compact(State) -> BufferSize = list_to_integer(BufferSize0), FoldFun = fun({DocId, _} = KV, Acc) -> - #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc, + #acc{id_btree = IdBt, seq_btree = SeqBt, kvs = Kvs, kvs_size = + KvsSize, last_id = LastId} = Acc, if DocId =:= LastId -> % COUCHDB-999 regression test ?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`" @@ -86,20 +91,25 @@ compact(State) -> KvsSize2 = KvsSize + ?term_size(KV), case KvsSize2 >= BufferSize of true -> - {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), + ToAdd = lists:reverse([KV | Kvs]), + {ok, IdBt2} = couch_btree:add(IdBt, ToAdd), + SeqBt2 = maybe_index_seqs(SeqBt, ToAdd, SeqIndexed), Acc2 = update_task(Acc, 1 + length(Kvs)), - {ok, Acc2#acc{ - btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}}; + {ok, Acc2#acc{id_btree = IdBt2, seq_btree = SeqBt2, kvs = [], + kvs_size = 0, last_id = DocId}}; _ -> {ok, Acc#acc{ kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}} end end, - InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree}, + InitAcc = #acc{total_changes = TotalChanges, id_btree = + EmptyIdBtree, seq_btree=EmptySeqBtree}, {ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc), - #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, - {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), + #acc{id_btree = IdBt3, seq_btree= SeqBt3, kvs = Uncopied} = FinalAcc, + Uncopied1 = lists:reverse(Uncopied), + {ok, NewIdBtree} = couch_btree:add(IdBt3, Uncopied1), + NewSeqBtree = maybe_index_seqs(SeqBt3, Uncopied1, SeqIndexed), FinalAcc2 = update_task(FinalAcc, length(Uncopied)), {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> @@ -109,8 +119,9 @@ compact(State) -> unlink(EmptyState#mrst.fd), {ok, EmptyState#mrst{ id_btree=NewIdBtree, + seq_btree=NewSeqBtree, views=NewViews, - update_seq=Seq + update_seq=UpdateSeq }}. @@ -170,5 +181,13 @@ swap_compacted(OldState, NewState) -> unlink(OldState#mrst.fd), couch_ref_counter:drop(OldState#mrst.refc), {ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]), - + {ok, NewState#mrst{refc=NewRefCounter}}. + + +maybe_index_seqs(SeqBt, ToAdd, true) -> + SeqsToAdd = couch_mrview_util:to_seqkvs(ToAdd, []), + {ok, SeqBt2} = couch_btree:add(SeqBt, SeqsToAdd), + SeqBt2; +maybe_index_seqs(SeqBt, _ToAdd, _) -> + SeqBt. diff --git a/apps/couch_mrview/src/couch_mrview_updater.erl b/apps/couch_mrview/src/couch_mrview_updater.erl index 178cf3df..2764ead1 100644 --- a/apps/couch_mrview/src/couch_mrview_updater.erl +++ b/apps/couch_mrview/src/couch_mrview_updater.erl @@ -54,15 +54,28 @@ start_update(Partial, State, NumChanges) -> purge(_Db, PurgeSeq, PurgedIdRevs, State) -> #mrst{ + seq_indexed=SeqIndexed, id_btree=IdBtree, + seq_btree=SeqBtree, views=Views } = State, Ids = [Id || {Id, _Revs} <- PurgedIdRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), + SeqBtree2 = case SeqIndexed of + true -> + PurgeSeqs = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <- + couch_mrview_util:to_seqkvs(Lookups, [])], + + {ok, SeqBtree1} = couch_btree:add_remove(SeqBtree, [], PurgeSeqs), + SeqBtree1; + _ -> + nil + end, + MakeDictFun = fun - ({ok, {DocId, ViewNumRowKeys}}, DictAcc) -> + ({ok, {DocId, {_Seq, ViewNumRowKeys}}}, DictAcc) -> FoldFun = fun({ViewNum, RowKey}, DictAcc2) -> dict:append(ViewNum, {RowKey, DocId}, DictAcc2) end, @@ -89,11 +102,11 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> Views2 = lists:map(RemKeysFun, Views), {ok, State#mrst{ id_btree=IdBtree2, + seq_btree=SeqBtree2, views=Views2, purge_seq=PurgeSeq }}. - process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 -> couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)), process_doc(Doc, Seq, State#mrst{doc_acc=[]}); @@ -144,29 +157,31 @@ compute_map_results(#mrst{qserver = Qs}, Dequeued) -> % Run all the non deleted docs through the view engine and % then pass the results on to the writer process. DocFun = fun - ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), AccDel, AccNotDel}; - ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel}; - ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) -> - {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]} + ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel, AccDocSeq}) -> + {erlang:max(Seq, SeqAcc), AccDel, AccNotDel, AccDocSeq}; + ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel, AccDocSeq}) -> + {erlang:max(Seq, SeqAcc), [{Id, Seq, []} | AccDel], + AccNotDel, AccDocSeq}; + ({Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel, AccDocSec}) -> + {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel], [{Id, + Seq} | AccDocSec]} end, FoldFun = fun(Docs, Acc) -> lists:foldl(DocFun, Acc, Docs) end, - {MaxSeq, DeletedResults, Docs} = - lists:foldl(FoldFun, {0, [], []}, Dequeued), + {MaxSeq, DeletedResults, Docs, DocSeqs} = + lists:foldl(FoldFun, {0, [], [], []}, Dequeued), {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs), NotDeletedResults = lists:zipwith( - fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end, - Docs, + fun({Id, Seq}, MapResults) -> {Id, Seq, MapResults} end, + DocSeqs, MapResultList), AllMapResults = DeletedResults ++ NotDeletedResults, update_task(length(AllMapResults)), {ok, {MaxSeq, AllMapResults}}. -write_results(Parent, State) -> +write_results(Parent, #mrst{db_name=DbName, idx_name=IdxName}=State) -> case couch_work_queue:dequeue(State#mrst.write_queue) of closed -> Parent ! {new_state, State}; @@ -174,6 +189,10 @@ write_results(Parent, State) -> EmptyKVs = [{V#mrview.id_num, []} || V <- State#mrst.views], {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []), NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys), + + % notifify the view update + couch_db_update_notifier:notify({view_updated, {DbName, IdxName}}), + send_partial(NewState#mrst.partial_resp_pid, NewState), write_results(Parent, NewState) end. @@ -200,18 +219,18 @@ merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) -> merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1). -merge_results({DocId, []}, ViewKVs, DocIdKeys) -> - {ViewKVs, [{DocId, []} | DocIdKeys]}; -merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) -> +merge_results({DocId, Seq, []}, ViewKVs, DocIdKeys) -> + {ViewKVs, [{DocId, {Seq, []}} | DocIdKeys]}; +merge_results({DocId, Seq, RawResults}, ViewKVs, DocIdKeys) -> JsonResults = couch_query_servers:raw_to_ejson(RawResults), Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults], - {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []), + {ViewKVs1, ViewIdKeys} = insert_results(DocId, Seq, Results, ViewKVs, [], []), {ViewKVs1, [ViewIdKeys | DocIdKeys]}. -insert_results(DocId, [], [], ViewKVs, ViewIdKeys) -> - {lists:reverse(ViewKVs), {DocId, ViewIdKeys}}; -insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> +insert_results(DocId, Seq, [], [], ViewKVs, ViewIdKeys) -> + {lists:reverse(ViewKVs), {DocId, {Seq, ViewIdKeys}}}; +insert_results(DocId, Seq, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> CombineDupesFun = fun ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) -> {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys}; @@ -223,18 +242,31 @@ insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> InitAcc = {[], VIdKeys}, {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)), FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs, - insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0). + insert_results(DocId, Seq, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], + VIdKeys0). write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> #mrst{ + seq_indexed=SeqIndexed, id_btree=IdBtree, + seq_btree=SeqBtree, first_build=FirstBuild } = State, {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), - ToRemByView = collapse_rem_keys(ToRemove, dict:new()), + SeqBtree2 = case SeqIndexed of + true -> + ToRemBySeq = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <- + couch_mrview_util:to_seqkvs(ToRemove, [])], + {ok, SeqBtree1} = update_seq_btree(SeqBtree, DocIdKeys, + ToRemBySeq), + SeqBtree1; + _ -> nil + end, + + ToRemByView = collapse_rem_keys(ToRemove, dict:new()), UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) -> ToRem = couch_util:dict_find(ViewId, ToRemByView, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), @@ -248,23 +280,27 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> State#mrst{ views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs), update_seq=UpdateSeq, - id_btree=IdBtree2 + id_btree=IdBtree2, + seq_btree=SeqBtree2 }. update_id_btree(Btree, DocIdKeys, true) -> - ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], + ToAdd = [{Id, {Seq, Keys}} || {Id, {Seq, Keys}} <- DocIdKeys, Keys /= []], couch_btree:query_modify(Btree, [], ToAdd, []); update_id_btree(Btree, DocIdKeys, _) -> ToFind = [Id || {Id, _} <- DocIdKeys], - ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], - ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []], + ToAdd = [{Id, {Seq, Keys}} || {Id, {Seq, Keys}} <- DocIdKeys, Keys /= []], + ToRem = [Id || {Id, {_Seq, Keys}} <- DocIdKeys, Keys == []], couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem). +update_seq_btree(Btree, DocIdKeys, ToRemBySeq) -> + ToAdd = couch_mrview_util:to_seqkvs(DocIdKeys, []), + couch_btree:add_remove(Btree, ToAdd, ToRemBySeq). collapse_rem_keys([], Acc) -> Acc; -collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) -> +collapse_rem_keys([{ok, {DocId, {_Seq, ViewIdKeys}}} | Rest], Acc) -> NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) -> dict:append(ViewId, {Key, DocId}, Acc2) end, Acc, ViewIdKeys), diff --git a/apps/couch_mrview/src/couch_mrview_util.erl b/apps/couch_mrview/src/couch_mrview_util.erl index 4e5c5103..b79aea7c 100644 --- a/apps/couch_mrview/src/couch_mrview_util.erl +++ b/apps/couch_mrview/src/couch_mrview_util.erl @@ -12,7 +12,7 @@ -module(couch_mrview_util). --export([get_view/4]). +-export([get_view/4, get_view_state/4]). -export([ddoc_to_mrst/2, init_state/4, reset_index/3]). -export([make_header/1]). -export([index_file/2, compaction_file/2, open_file/1]). @@ -24,14 +24,15 @@ -export([calculate_data_size/2]). -export([validate_args/1]). -export([maybe_load_doc/3, maybe_load_doc/4]). +-export([to_seqkvs/2]). -define(MOD, couch_mrview_index). +-define(MRFMT, 0.2). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). - -get_view(Db, DDoc, ViewName, Args0) -> +get_view_state(Db, DDoc, ViewName, Args0) -> ArgCheck = fun(InitState) -> Args1 = set_view_type(Args0, ViewName, InitState#mrst.views), {ok, validate_args(Args1)} @@ -54,11 +55,16 @@ get_view(Db, DDoc, ViewName, Args0) -> end, #mrst{language=Lang, views=Views} = State, {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views), + {ok, {Type, View, Args3}, State}. + + +get_view(Db, DDoc, ViewName, Args0) -> + {ok, {Type, View, Args3}, State} = get_view_state(Db, DDoc, + ViewName, Args0), check_range(Args3, view_cmp(View)), Sig = view_sig(Db, State, View, Args3), {ok, {Type, View}, Sig, Args3}. - ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) -> case couch_util:get_value(<<"map">>, MRFuns) of @@ -93,6 +99,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> {DesignOpts} = couch_util:get_value(<<"options">>, Fields, {[]}), {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), + SeqIndexed = couch_util:get_value(<<"seq_indexed">>, DesignOpts, + false), IdxState = #mrst{ db_name=DbName, @@ -100,9 +108,11 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> lib=Lib, views=Views, language=Language, - design_opts=DesignOpts + design_opts=DesignOpts, + seq_indexed=SeqIndexed }, - SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, + SigInfo = {?MRFMT, Views, Language, DesignOpts, + couch_index_util:sort_lib(Lib)}, {ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}. @@ -165,15 +175,17 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) -> seq=0, purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, + seq_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views] }, init_state(Db, Fd, State, Header); init_state(Db, Fd, State, Header) -> - #mrst{language=Lang, views=Views} = State, + #mrst{language=Lang, seq_indexed=SeqIndexed, views=Views} = State, #mrheader{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, + seq_btree_state=SeqBtreeState, view_states=ViewStates } = Header, @@ -191,6 +203,13 @@ init_state(Db, Fd, State, Header) -> IdBtOpts = [{reduce, IdReduce}, {compression, couch_db:compression(Db)}], {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts), + {ok, SeqBtree} = case SeqIndexed of + true -> + SeqBtOpts = [{compression, couch_db:compression(Db)}], + couch_btree:open(SeqBtreeState, Fd, SeqBtOpts); + _ -> {ok, nil} + end, + OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end, Views2 = lists:zipwith(OpenViewFun, ViewStates2, Views), @@ -199,6 +218,7 @@ init_state(Db, Fd, State, Header) -> update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + seq_btree=SeqBtree, views=Views2 }. @@ -479,9 +499,11 @@ view_cmp(View) -> make_header(State) -> #mrst{ + seq_indexed=SeqIndexed, update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + seq_btree=SeqBtree, views=Views } = State, ViewStates = [ @@ -493,10 +515,17 @@ make_header(State) -> || V <- Views ], + + SeqBtreeState = case SeqIndexed of + true -> couch_btree:get_state(SeqBtree); + _ -> nil + end, + #mrheader{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=couch_btree:get_state(IdBtree), + seq_btree_state=SeqBtreeState, view_states=ViewStates }. @@ -530,7 +559,7 @@ delete_index_file(DbName, Sig) -> delete_compaction_file(DbName, Sig) -> delete_file(compaction_file(DbName, Sig)). - + delete_file(FName) -> case filelib:is_file(FName) of @@ -554,6 +583,7 @@ reset_state(State) -> qserver=nil, update_seq=0, id_btree=nil, + seq_btree=nil, views=[View#mrview{btree=nil} || View <- State#mrst.views] }. @@ -706,3 +736,15 @@ index_of(Key, [_ | Rest], Idx) -> mrverror(Mesg) -> throw({query_parse_error, Mesg}). + +to_seqkvs([], Acc) -> + lists:reverse(Acc); +to_seqkvs([{not_found, _} | Rest], Acc) -> + to_seqkvs(Rest, Acc); +to_seqkvs([{_Id, {_Seq, []}} | Rest], Acc) -> + to_seqkvs(Rest, Acc); +to_seqkvs([{Id, {Seq, Keys}} | Rest], Acc0) -> + Acc1 = lists:foldl(fun({ViewId, _}, Acc) -> + [{{ViewId, Seq}, Id} | Acc] + end, Acc0, Keys), + to_seqkvs(Rest, Acc1). From ba9246abb9fa6cca9f730ed2dc1da538b2ea6c12 Mon Sep 17 00:00:00 2001 From: benoitc Date: Thu, 23 Feb 2012 19:54:59 +0100 Subject: [PATCH 02/14] `_view` filter is now hanled by couch_mrview:view_changes_since/7 Rather than passing all doc sent y couch_db:changes_since/5 to a transformed view function we are now using the more efficient `couch_mrview:view_changes_since/7` wich use the seq btree in the view group. Also add the parameters `view_filter=DName/FilterName` which allows to filter any view changes using the fiter function `FilterName` from the design document `DName`: GET http:////_changes?filter=_view&view=&view_filter Where variables are: - : URL of the CouchDB node - : Name of the database - : DesignId/ViewNameThe name of the view in the views properties from the design document DesignId` - : DesignId1/FilterName The name of the filter in the filters properties from the design document DesignId1 The difference with the old behaviour is that you really accessing the view index instead of passing each map function to all the changed docs wich is a way more efficient (you do'nt open a system process, serialize the function, pass it to the process, wait for return, test it and eventually return a change). You can also filter the results. Other advantages are that the views are refreshed in real time when the database content changes and the view changes are only triggered if the index is updated. --- apps/couch/include/couch_db.hrl | 16 -- apps/couch_changes/include/couch_changes.hrl | 28 +++ apps/couch_changes/src/couch_changes.erl | 238 ++++++++++++------ .../src/couch_replicator_api_wrap.erl | 1 + .../src/couch_replicator_manager.erl | 1 + .../src/couch_replicator_notifier.erl | 1 + 6 files changed, 192 insertions(+), 93 deletions(-) create mode 100644 apps/couch_changes/include/couch_changes.hrl diff --git a/apps/couch/include/couch_db.hrl b/apps/couch/include/couch_db.hrl index 1ea69cdf..801b5658 100644 --- a/apps/couch/include/couch_db.hrl +++ b/apps/couch/include/couch_db.hrl @@ -221,22 +221,6 @@ % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). --record(changes_args, { - feed = "normal", - dir = fwd, - since = 0, - limit = 1000000000000000, - style = main_only, - heartbeat, - timeout, - filter = "", - filter_fun, - filter_args = [], - include_docs = false, - conflicts = false, - db_open_options = [] -}). - -record(btree, { fd, root, diff --git a/apps/couch_changes/include/couch_changes.hrl b/apps/couch_changes/include/couch_changes.hrl new file mode 100644 index 00000000..0b4dd4bc --- /dev/null +++ b/apps/couch_changes/include/couch_changes.hrl @@ -0,0 +1,28 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(changes_args, { + feed = "normal", + dir = fwd, + since = 0, + limit = 1000000000000000, + style = main_only, + heartbeat, + timeout, + filter = "", + filter_fun, + filter_args = [], + filter_view, + include_docs = false, + conflicts = false, + db_open_options = [] +}). diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index 4efa98e5..9646eb8a 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -11,8 +11,10 @@ % the License. -module(couch_changes). +-include("couch_changes/include/couch_changes.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -export([handle_changes/3]). @@ -36,16 +38,24 @@ }). %% @type Req -> #httpd{} | {json_req, JsonObj()} -handle_changes(Args1, Req, Db0) -> +handle_changes(Args0, Req, Db0) -> #changes_args{ style = Style, filter = FilterName, feed = Feed, dir = Dir, since = Since - } = Args1, + } = Args0, {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0), - Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs}, + Args1 = Args0#changes_args{filter_fun = FilterFun, + filter_args = FilterArgs}, + + Args = case FilterName of + "_view" -> + Args1#changes_args{filter_view=parse_view_param(Req)}; + _ -> + Args1 + end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), StartSeq = case Dir of @@ -68,17 +78,12 @@ handle_changes(Args1, Req, Db0) -> true -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), - Self = self(), - {ok, Notify} = couch_db_update_notifier:start_link( - fun({_, DbName}) when Db0#db.name == DbName -> - Self ! db_updated; - (_) -> - ok - end - ), + ConsumerFun = make_update_fun(Db0, FilterName, Args), + {ok, Notify} = couch_db_update_notifier:start_link(ConsumerFun), {Db, StartSeq} = Start(), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, <<"">>, Timeout, TimeoutFun), try @@ -108,6 +113,49 @@ handle_changes(Args1, Req, Db0) -> end end. +make_update_fun(#db{name=DbName0}, "_view", + #changes_args{filter_view={DName, VName}}) -> + Self = self(), + + DesignId = <<"_design/", DName/binary>>, + fun + ({view_updated, {DbName, IdxName}}) + when DbName0 == DbName, IdxName == DesignId -> + Self ! db_updated; + ({ddoc_updated, {DbName, DDocId}}) + when DbName0 == DbName, DDocId == DesignId -> + Self ! ddoc_updated; + ({updated, DbName}) when DbName0 == DbName -> + spawn(fun() -> + case couch_db:open_int(DbName0, []) of + {ok, Db} -> + try + view_trigger(Db, DesignId, VName) + catch + _:_ -> Self ! ddoc_updated + end; + _Else -> + ?LOG_INFO("db error ~p~n", [_Else]), + Self ! ddoc_updated + end + end); + (_Else) -> + ok + end; +make_update_fun(Db0, _FilterName, _ChangeArgs) -> + Self = self(), + fun + ({_, DbName}) when Db0#db.name == DbName -> + Self ! db_updated; + (_) -> + ok + end. + +view_trigger(Db, DesignId, VName) -> + DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]), + %% try to update view if the db has been updated. + couch_mrview:query_view(Db, DDoc, VName, #mrargs{limit=0}). + get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> Pair; get_callback_acc(Callback) when is_function(Callback, 2) -> @@ -156,7 +204,6 @@ os_filter_fun(FilterName, Style, Req, Db) -> parse_view_param({json_req, {Props}}) -> {Params} = couch_util:get_value(<<"query">>, Props), parse_view_param1(couch_util:get_value(<<"view">>, Params)); - parse_view_param(Req) -> parse_view_param1(?l2b(couch_httpd:qs_value(Req, "view", ""))). @@ -180,9 +227,14 @@ builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) -> {filter_docids(DocIds, Style), DocIds}; builtin_filter_fun("_design", Style, _Req, _Db) -> {filter_designdoc(Style), []}; +builtin_filter_fun("_view", Style, {json_req, {Props}}=Req, Db) -> + {Params} = couch_util:get_value(<<"query">>, Props), + FilterName = ?b2l(couch_util:get_value(<<"view_filter">>, Params, + <<"">>)), + {os_filter_fun(FilterName, Style, Req, Db), []}; builtin_filter_fun("_view", Style, Req, Db) -> - {DName, VName} = parse_view_param(Req), - {filter_view({DName, VName} , Style, Db), []}; + FilterName = couch_httpd:qs_value(Req, "view_filter", ""), + {os_filter_fun(FilterName, Style, Req, Db), []}; builtin_filter_fun(_FilterName, _Style, _Req, _Db) -> throw({bad_request, "unknown builtin filter name"}). @@ -206,31 +258,6 @@ filter_designdoc(Style) -> end end. -filter_view({DName, VName}, Style, Db) -> - DesignId = <<"_design/", DName/binary>>, - DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]), - % validate that the ddoc has the filter fun - #doc{body={Props}} = DDoc, - couch_util:get_nested_json_value({Props}, [<<"views">>, VName]), - fun(Db2, DocInfo) -> - DocInfos = - case Style of - main_only -> - [DocInfo]; - all_docs -> - [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] - end, - Docs = [Doc || {ok, Doc} <- [ - couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts]) - || DocInfo2 <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_view( - DDoc, VName, Docs - ), - [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} - || {Pass, #doc{revs={RevPos,[RevId|_]}}} - <- lists:zip(Passes, Docs), Pass == true] - end. - builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) -> case Style of main_only -> @@ -302,7 +329,8 @@ send_changes(Args, Acc0, FirstRound) -> #changes_args{ dir = Dir, filter = FilterName, - filter_args = FilterArgs + filter_args = FilterArgs, + filter_view = View } = Args, #changes_acc{ db = Db, @@ -317,13 +345,22 @@ send_changes(Args, Acc0, FirstRound) -> "_design" -> send_changes_design_docs( Db, StartSeq, Dir, fun changes_enumerator/2, Acc0); + "_view" -> + send_changes_view( + Db, View, StartSeq, Dir, fun view_changes_enumerator/2, Acc0); _ -> couch_db:changes_since( Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) end; false -> - couch_db:changes_since( - Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) + case FilterName of + "_view" -> + send_changes_view( + Db, View, StartSeq, Dir, fun view_changes_enumerator/2, Acc0); + _ -> + couch_db:changes_since( + Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) + end end. @@ -349,6 +386,14 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) -> send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0). +send_changes_view(Db, {DName, VName}, StartSeq, Dir, Fun, Acc0) -> + DesignId = <<"_design/", DName/binary>>, + DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, + [ejson_body]), + couch_mrview:view_changes_since( + Db, DDoc, VName, StartSeq, Fun, [{direction, Dir}], Acc0). + + send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> FoldFun = case Dir of fwd -> @@ -404,40 +449,47 @@ keep_sending_changes(Args, Acc0, FirstRound) -> db_open_options = DbOptions } = Args, - {ok, ChangesAcc} = send_changes( - Args#changes_args{dir=fwd}, - Acc0, - FirstRound), - #changes_acc{ - db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, - seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit - } = ChangesAcc, - - couch_db:close(Db), - if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); - true -> - case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of - {updated, UserAcc4} -> - DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], - case couch_db:open(Db#db.name, DbOptions1) of - {ok, Db2} -> - keep_sending_changes( - Args#changes_args{limit=NewLimit}, - ChangesAcc#changes_acc{ - db = Db2, - user_acc = UserAcc4, - seq = EndSeq, - prepend = Prepend2, - timeout = Timeout, - timeout_fun = TimeoutFun}, - false); - _Else -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) - end; - {stop, UserAcc4} -> - end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) - end + case send_changes(Args#changes_args{dir=fwd}, Acc0, FirstRound) of + {ok, ChangesAcc} -> + #changes_acc{ + db = Db, callback = Callback, timeout = Timeout, + timeout_fun = TimeoutFun, seq = EndSeq, prepend = Prepend2, + user_acc = UserAcc2, limit = NewLimit + } = ChangesAcc, + + couch_db:close(Db), + if Limit > NewLimit, ResponseType == "longpoll" -> + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); + true -> + case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of + {updated, UserAcc4} -> + DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], + case couch_db:open(Db#db.name, DbOptions1) of + {ok, Db2} -> + keep_sending_changes( + Args#changes_args{limit=NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun}, + false); + _Else -> + end_sending_changes(Callback, UserAcc2, EndSeq, + ResponseType) + end; + {stop, UserAcc4} -> + end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) + end + end; + Error -> + ?LOG_ERROR("Error while getting new changes: ~p~n", [Error]), + #changes_acc{ + callback = Callback, seq = EndSeq, user_acc = UserAcc2 + } = Acc0, + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) end. end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> @@ -499,6 +551,35 @@ changes_enumerator(DocInfo, Acc) -> user_acc = UserAcc2, limit = Limit - 1}} end. +view_changes_enumerator({{_ViewId, _Seq}, Id}, Acc) -> + #changes_acc{ + filter = FilterFun, callback = Callback, prepend = Prepend, + user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, + timeout = Timeout, timeout_fun = TimeoutFun + } = Acc, + {ok, DocInfo} = couch_db:get_doc_info(Db, Id), + #doc_info{high_seq = Seq} = DocInfo, + Results0 = FilterFun(Db, DocInfo), + Results = [Result || Result <- Results0, Result /= null], + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, + case Results of + [] -> + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + case Done of + stop -> + {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; + ok -> + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} + end; + _ -> + ChangesRow = changes_row(Results, DocInfo, Acc), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, prepend = <<",\n">>, + user_acc = UserAcc2, limit = Limit - 1}} + end. + changes_row(Results, DocInfo, Acc) -> #doc_info{ @@ -528,7 +609,9 @@ deleted_item(_) -> []. wait_db_updated(Timeout, TimeoutFun, UserAcc) -> receive db_updated -> - get_rest_db_updated(UserAcc) + get_rest_db_updated(UserAcc); + ddoc_updated -> + {stop, UserAcc} after Timeout -> {Go, UserAcc2} = TimeoutFun(UserAcc), case Go of @@ -547,6 +630,7 @@ get_rest_db_updated(UserAcc) -> {updated, UserAcc} end. + reset_heartbeat() -> case get(last_changes_heartbeat) of undefined -> diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.erl b/apps/couch_replicator/src/couch_replicator_api_wrap.erl index e68cd5c8..8a586715 100644 --- a/apps/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/apps/couch_replicator/src/couch_replicator_api_wrap.erl @@ -20,6 +20,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). +-include_lib("couch_changes/include/couch_changes.hrl"). -include("couch_replicator_api_wrap.hrl"). -export([ diff --git a/apps/couch_replicator/src/couch_replicator_manager.erl b/apps/couch_replicator/src/couch_replicator_manager.erl index 5960d846..f8a86b6f 100644 --- a/apps/couch_replicator/src/couch_replicator_manager.erl +++ b/apps/couch_replicator/src/couch_replicator_manager.erl @@ -23,6 +23,7 @@ -export([code_change/3, terminate/2]). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_changes/include/couch_changes.hrl"). -include("couch_replicator.hrl"). -include("couch_replicator_js_functions.hrl"). diff --git a/apps/couch_replicator/src/couch_replicator_notifier.erl b/apps/couch_replicator/src/couch_replicator_notifier.erl index 39fd68b9..26ea2e13 100644 --- a/apps/couch_replicator/src/couch_replicator_notifier.erl +++ b/apps/couch_replicator/src/couch_replicator_notifier.erl @@ -22,6 +22,7 @@ -export([handle_event/2, handle_call/2, handle_info/2]). -include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_changes/include/couch_changes.hrl"). start_link(FunAcc) -> couch_event_sup:start_link(couch_replication, From 282671ec65e3a10265674b8ed5476d3bee2c6930 Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 29 Feb 2012 21:31:41 +0100 Subject: [PATCH 03/14] fix _changes test The `_view` filter now need a view group indexed by sequences. --- apps/couch_httpd/share/www/script/test/changes.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/couch_httpd/share/www/script/test/changes.js b/apps/couch_httpd/share/www/script/test/changes.js index 5a8154cd..5ec18750 100644 --- a/apps/couch_httpd/share/www/script/test/changes.js +++ b/apps/couch_httpd/share/www/script/test/changes.js @@ -200,7 +200,8 @@ couchTests.changes = function(debug) { "conflicted" : "function(doc, req) { return (doc._conflicts);}" }, options : { - local_seq : true + local_seq : true, + seq_indexed: true }, views : { local_seq : { From e4b75f622fa04f1de0e4b0d9b449397446788d16 Mon Sep 17 00:00:00 2001 From: benoitc Date: Mon, 27 Feb 2012 00:28:37 +0100 Subject: [PATCH 04/14] add support for `include_deleted: true` option in a design document. This option allows a view to map deleted documents. Remember that documents deleted with the DELETE HTTP verb will look like {_id: id, _rev: rev, _deleted: true} to the indexer. If you want to store extra data on the deleted document you can use _bulk_docs or updating a document with the member '_deleted: true' using the HTTP verb PUT. --- apps/couch_index/src/couch_index_updater.erl | 4 +++- apps/couch_mrview/include/couch_mrview.hrl | 1 + apps/couch_mrview/src/couch_mrview.erl | 1 - apps/couch_mrview/src/couch_mrview_index.erl | 5 ++++- apps/couch_mrview/src/couch_mrview_updater.erl | 15 +++++++-------- apps/couch_mrview/src/couch_mrview_util.erl | 7 ++++++- 6 files changed, 21 insertions(+), 12 deletions(-) diff --git a/apps/couch_index/src/couch_index_updater.erl b/apps/couch_index/src/couch_index_updater.erl index 33f4e918..7fe36acf 100644 --- a/apps/couch_index/src/couch_index_updater.erl +++ b/apps/couch_index/src/couch_index_updater.erl @@ -120,11 +120,13 @@ update(Idx, Mod, IdxState) -> UpdateOpts = Mod:get(update_options, IdxState), CommittedOnly = lists:member(committed_only, UpdateOpts), IncludeDesign = lists:member(include_design, UpdateOpts), + IncludeDeleted = lists:member(include_deleted, UpdateOpts), DocOpts = case lists:member(local_seq, UpdateOpts) of true -> [conflicts, deleted_conflicts, local_seq]; _ -> [conflicts, deleted_conflicts] end, + couch_util:with_db(DbName, fun(Db) -> DbUpdateSeq = couch_db:get_update_seq(Db), DbCommittedSeq = couch_db:get_committed_update_seq(Db), @@ -146,7 +148,7 @@ update(Idx, Mod, IdxState) -> case {IncludeDesign, DocId} of {false, <<"_design/", _/binary>>} -> {nil, Seq}; - _ when Deleted -> + _ when Deleted, IncludeDeleted /= true -> {#doc{id=DocId, deleted=true}, Seq}; _ -> {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts), diff --git a/apps/couch_mrview/include/couch_mrview.hrl b/apps/couch_mrview/include/couch_mrview.hrl index 9d8cf57c..e116ff20 100644 --- a/apps/couch_mrview/include/couch_mrview.hrl +++ b/apps/couch_mrview/include/couch_mrview.hrl @@ -19,6 +19,7 @@ language, design_opts=[], seq_indexed=false, + include_deleted=false, lib, views, id_btree=nil, diff --git a/apps/couch_mrview/src/couch_mrview.erl b/apps/couch_mrview/src/couch_mrview.erl index 4b387c8e..c950aca0 100644 --- a/apps/couch_mrview/src/couch_mrview.erl +++ b/apps/couch_mrview/src/couch_mrview.erl @@ -113,7 +113,6 @@ view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) -> VName, Args1), #mrst{seq_indexed=SeqIndexed, seq_btree=SeqBtree} = State, - case SeqIndexed of false -> {error, seqs_not_indexed}; diff --git a/apps/couch_mrview/src/couch_mrview_index.erl b/apps/couch_mrview/src/couch_mrview_index.erl index 60a526be..69b3ebd3 100644 --- a/apps/couch_mrview/src/couch_mrview_index.erl +++ b/apps/couch_mrview/src/couch_mrview_index.erl @@ -38,8 +38,11 @@ get(Property, State) -> Opts = State#mrst.design_opts, IncDesign = couch_util:get_value(<<"include_design">>, Opts, false), LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), + IncludeDeleted = couch_util:get_value(<<"include_deleted">>, + Opts, false), if IncDesign -> [include_design]; true -> [] end - ++ if LocalSeq -> [local_seq]; true -> [] end; + ++ if LocalSeq -> [local_seq]; true -> [] end + ++ if IncludeDeleted -> [include_deleted]; true -> [] end; info -> #mrst{ fd = Fd, diff --git a/apps/couch_mrview/src/couch_mrview_updater.erl b/apps/couch_mrview/src/couch_mrview_updater.erl index 2764ead1..44ef2e10 100644 --- a/apps/couch_mrview/src/couch_mrview_updater.erl +++ b/apps/couch_mrview/src/couch_mrview_updater.erl @@ -67,7 +67,6 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> true -> PurgeSeqs = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <- couch_mrview_util:to_seqkvs(Lookups, [])], - {ok, SeqBtree1} = couch_btree:add_remove(SeqBtree, [], PurgeSeqs), SeqBtree1; _ -> @@ -112,7 +111,8 @@ process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 -> process_doc(Doc, Seq, State#mrst{doc_acc=[]}); process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) -> {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}}; -process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) -> +process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc, + include_deleted=false}=State) -> {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}}; process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) -> {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}. @@ -254,16 +254,15 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> first_build=FirstBuild } = State, - {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), - SeqBtree2 = case SeqIndexed of + {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), + {ok, SeqBtree2} = case SeqIndexed of true -> ToRemBySeq = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <- couch_mrview_util:to_seqkvs(ToRemove, [])], - {ok, SeqBtree1} = update_seq_btree(SeqBtree, DocIdKeys, - ToRemBySeq), - SeqBtree1; - _ -> nil + update_seq_btree(SeqBtree, DocIdKeys, ToRemBySeq); + _ -> + {ok, nil} end, ToRemByView = collapse_rem_keys(ToRemove, dict:new()), diff --git a/apps/couch_mrview/src/couch_mrview_util.erl b/apps/couch_mrview/src/couch_mrview_util.erl index b79aea7c..b78bec2f 100644 --- a/apps/couch_mrview/src/couch_mrview_util.erl +++ b/apps/couch_mrview/src/couch_mrview_util.erl @@ -101,6 +101,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), SeqIndexed = couch_util:get_value(<<"seq_indexed">>, DesignOpts, false), + IncludeDeleted = couch_util:get_value(<<"include_deleted">>, DesignOpts, + false), IdxState = #mrst{ db_name=DbName, @@ -109,7 +111,8 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> views=Views, language=Language, design_opts=DesignOpts, - seq_indexed=SeqIndexed + seq_indexed=SeqIndexed, + include_deleted=IncludeDeleted }, SigInfo = {?MRFMT, Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, @@ -741,6 +744,8 @@ to_seqkvs([], Acc) -> lists:reverse(Acc); to_seqkvs([{not_found, _} | Rest], Acc) -> to_seqkvs(Rest, Acc); +to_seqkvs([{ok, DocIdKeys} | Rest], Acc) -> + to_seqkvs([DocIdKeys | Rest], Acc); to_seqkvs([{_Id, {_Seq, []}} | Rest], Acc) -> to_seqkvs(Rest, Acc); to_seqkvs([{Id, {Seq, Keys}} | Rest], Acc0) -> From c681e1cd3c15a1b4c40ab2c56d7b042b61b152b1 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sun, 26 Feb 2012 18:49:23 +0100 Subject: [PATCH 05/14] Get view changes using a view query. Add the possibility to the function `couch_mrview:view_changes_since` to accept some **view query parameters** like `key`, `start_key` & `end_key`. Instead of fetching all changes in a view from last updated it will fetch changes only from the results of the query. It returns an error if the option `seq_indexed` hasn't been set to `true` in the view options: { "_id": "_design/somedoc", "views": { "someview": { "map": "function(doc) { if (doc.field) emit(doc.field, null); }", "options": { "seq_indexed": "true" } } } } This is done by adding an index (b-tree) to each view. This index associate the view key with the doc sequence. `{{Key, Seq}, {DocId, Value}}` is stored in this btree. --- apps/couch_changes/include/couch_changes.hrl | 1 + apps/couch_changes/src/couch_changes.erl | 94 +++++++++++++- apps/couch_mrview/include/couch_mrview.hrl | 2 + apps/couch_mrview/src/couch_mrview.erl | 50 ++++++-- .../couch_mrview/src/couch_mrview_updater.erl | 119 ++++++++++++------ apps/couch_mrview/src/couch_mrview_util.erl | 70 +++++++++-- .../src/couch_replicator_api_wrap.erl | 10 ++ 7 files changed, 275 insertions(+), 71 deletions(-) diff --git a/apps/couch_changes/include/couch_changes.hrl b/apps/couch_changes/include/couch_changes.hrl index 0b4dd4bc..446045fc 100644 --- a/apps/couch_changes/include/couch_changes.hrl +++ b/apps/couch_changes/include/couch_changes.hrl @@ -22,6 +22,7 @@ filter_fun, filter_args = [], filter_view, + view_args = [], include_docs = false, conflicts = false, db_open_options = [] diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index 9646eb8a..dc2e898f 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -52,7 +52,9 @@ handle_changes(Args0, Req, Db0) -> Args = case FilterName of "_view" -> - Args1#changes_args{filter_view=parse_view_param(Req)}; + ViewArgs = parse_view_args(Req), + Args1#changes_args{filter_view=parse_view_param(Req), + view_args=ViewArgs#mrargs{direction=Dir}}; _ -> Args1 end, @@ -330,7 +332,8 @@ send_changes(Args, Acc0, FirstRound) -> dir = Dir, filter = FilterName, filter_args = FilterArgs, - filter_view = View + filter_view = View, + view_args = ViewArgs } = Args, #changes_acc{ db = Db, @@ -347,7 +350,7 @@ send_changes(Args, Acc0, FirstRound) -> Db, StartSeq, Dir, fun changes_enumerator/2, Acc0); "_view" -> send_changes_view( - Db, View, StartSeq, Dir, fun view_changes_enumerator/2, Acc0); + Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0); _ -> couch_db:changes_since( Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) @@ -356,7 +359,7 @@ send_changes(Args, Acc0, FirstRound) -> case FilterName of "_view" -> send_changes_view( - Db, View, StartSeq, Dir, fun view_changes_enumerator/2, Acc0); + Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0); _ -> couch_db:changes_since( Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0) @@ -386,12 +389,12 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) -> send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0). -send_changes_view(Db, {DName, VName}, StartSeq, Dir, Fun, Acc0) -> +send_changes_view(Db, {DName, VName}, StartSeq, Args, Fun, Acc0) -> DesignId = <<"_design/", DName/binary>>, DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]), couch_mrview:view_changes_since( - Db, DDoc, VName, StartSeq, Fun, [{direction, Dir}], Acc0). + Db, DDoc, VName, StartSeq, Fun, Args, Acc0). send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> @@ -551,7 +554,12 @@ changes_enumerator(DocInfo, Acc) -> user_acc = UserAcc2, limit = Limit - 1}} end. +view_changes_enumerator({{_ViewId, _Seq}, {Id, _V}}, Acc) -> + view_changes_enumerator1(Id, Acc); view_changes_enumerator({{_ViewId, _Seq}, Id}, Acc) -> + view_changes_enumerator1(Id, Acc). + +view_changes_enumerator1(Id, Acc) -> #changes_acc{ filter = FilterFun, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, @@ -668,3 +676,77 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> [true, Passes] = couch_query_servers:ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq]), {ok, Passes}. + +parse_view_args({json_req, {Props}}) -> + {Query} = couch_util:get_value(<<"query">>, Props, {[]}), + parse_view_args1(Query, #mrargs{}); +parse_view_args(Req) -> + couch_mrview_http:parse_qs(Req, []). + + +parse_view_args1([], Args) -> + Args; +parse_view_args1([{Key, Val} | Rest], Args) -> + Args1 = case Key of + <<"reduce">> -> + Args#mrargs{reduce=Val}; + <<"key">> -> + Args#mrargs{start_key=Val, end_key=Val}; + <<"keys">> -> + Args#mrargs{keys=Val}; + <<"startkey">> -> + Args#mrargs{start_key=Val}; + <<"start_key">> -> + Args#mrargs{start_key=Val}; + <<"startkey_docid">> -> + Args#mrargs{start_key_docid=Val}; + <<"start_key_doc_id">> -> + Args#mrargs{start_key_docid=Val}; + <<"endkey">> -> + Args#mrargs{end_key=Val}; + <<"end_key">> -> + Args#mrargs{end_key=Val}; + <<"endkey_docid">> -> + Args#mrargs{end_key_docid=Val}; + <<"end_key_doc_id">> -> + Args#mrargs{end_key_docid=Val}; + <<"limit">> -> + Args#mrargs{limit=Val}; + <<"count">> -> + throw({query_parse_error, <<"QS param `count` is not `limit`">>}); + <<"stale">> when Val == <<"ok">> -> + Args#mrargs{stale=ok}; + <<"stale">> when Val == <<"update_after">> -> + Args#mrargs{stale=update_after}; + <<"stale">> -> + throw({query_parse_error, <<"Invalid value for `stale`.">>}); + <<"descending">> -> + case Val of + true -> Args#mrargs{direction=rev}; + _ -> Args#mrargs{direction=fwd} + end; + <<"skip">> -> + Args#mrargs{skip=Val}; + <<"group">> -> + case Val of + true -> Args#mrargs{group_level=exact}; + _ -> Args#mrargs{group_level=0} + end; + <<"group_level">> -> + Args#mrargs{group_level=Val}; + <<"inclusive_end">> -> + Args#mrargs{inclusive_end=Val}; + <<"include_docs">> -> + Args#mrargs{include_docs=Val}; + <<"update_seq">> -> + Args#mrargs{update_seq=Val}; + <<"conflicts">> -> + Args#mrargs{conflicts=Val}; + <<"list">> -> + Args#mrargs{list=Val}; + <<"callback">> -> + Args#mrargs{callback=Val}; + _ -> + Args#mrargs{extra=[{Key, Val} | Args#mrargs.extra]} + end, + parse_view_args1(Rest, Args1). diff --git a/apps/couch_mrview/include/couch_mrview.hrl b/apps/couch_mrview/include/couch_mrview.hrl index e116ff20..04c2fafe 100644 --- a/apps/couch_mrview/include/couch_mrview.hrl +++ b/apps/couch_mrview/include/couch_mrview.hrl @@ -37,12 +37,14 @@ -record(mrview, { id_num, + seq_indexed=false, update_seq=0, purge_seq=0, map_names=[], reduce_funs=[], def, btree=nil, + seq_btree=nil, options=[] }). diff --git a/apps/couch_mrview/src/couch_mrview.erl b/apps/couch_mrview/src/couch_mrview.erl index c950aca0..ce781e3d 100644 --- a/apps/couch_mrview/src/couch_mrview.erl +++ b/apps/couch_mrview/src/couch_mrview.erl @@ -101,23 +101,24 @@ view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) -> #mrargs{direction=Dir} = Args, - EndSeq = case Dir of fwd -> 16#10000000; rev -> 0 end, - Args1 = Args#mrargs{start_key = {VName, StartSeq + 1}, - end_key = {VName, EndSeq}}, - - {ok, {_, View, _}, State} = couch_mrview_util:get_view_state(Db, DDoc, - VName, Args1), - - #mrst{seq_indexed=SeqIndexed, seq_btree=SeqBtree} = State, - case SeqIndexed of + {ok, ViewId} = couch_mrview_util:get_view_id(Db, DDoc, VName), + {WithVSK, StartKey, EndKey} = couch_mrview_util:change_keys(ViewId, Args, + StartSeq, EndSeq), + + Args1 = Args#mrargs{start_key=StartKey, end_key=EndKey}, + case WithVSK of + false -> + {ok, _, State} = couch_mrview_util:get_view_state(Db, DDoc, + VName, Args1), + #mrst{seq_indexed=SeqIndexed, seq_btree=SeqBtree} = State, + case SeqIndexed of false -> {error, seqs_not_indexed}; _ -> - #mrview{id_num=Id} = View, WrapperFun = fun ({{_ViewID, _Seq}, _DocId}=KV, _Reds, Acc2) -> Callback(KV, Acc2); @@ -126,9 +127,34 @@ view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) -> end, {ok, _R, AccOut} = couch_btree:fold(SeqBtree, WrapperFun, Acc, - [{start_key, {Id, StartSeq + 1}}, {end_key, {Id, EndSeq}}, - {dir, Dir}]), + [{start_key, StartKey}, {end_key, EndKey}, {dir, Dir}]), + {ok, AccOut} + end; + _ -> + {ok, {_Type, View, _}, _} = couch_mrview_util:get_view_state(Db, + DDoc, VName, Args1), + case View#mrview.seq_indexed of + false -> + {error, seqs_not_indexed}; + _ -> + Acc0 = {Dir, StartSeq, Acc}, + WrapperFun = fun + ({{_Key, Seq}, _DocId}=KV, _Reds, {fwd, LastSeq, Acc2}) + when Seq > LastSeq, Seq =< EndSeq -> + {ok, Acc3} = Callback(KV, Acc2), + {ok, {fwd, Seq, Acc3}}; + ({{_Key, Seq}, _DocId}=KV, _Reds, {D, LastSeq, Acc2}) + when Seq < LastSeq, Seq >= EndSeq -> + {ok, Acc3} = Callback(KV, Acc2), + {ok, {D, Seq, Acc3}}; + (_, _, Acc2) -> + {ok, Acc2} + end, + Opts = [{start_key, StartKey}, {end_key, EndKey}, {dir, Dir}], + {ok, _R, {_, _, AccOut}} = couch_btree:fold(View#mrview.seq_btree, + WrapperFun, Acc0, Opts), {ok, AccOut} + end end. diff --git a/apps/couch_mrview/src/couch_mrview_updater.erl b/apps/couch_mrview/src/couch_mrview_updater.erl index 44ef2e10..7a113ba0 100644 --- a/apps/couch_mrview/src/couch_mrview_updater.erl +++ b/apps/couch_mrview/src/couch_mrview_updater.erl @@ -74,25 +74,38 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> end, MakeDictFun = fun - ({ok, {DocId, {_Seq, ViewNumRowKeys}}}, DictAcc) -> - FoldFun = fun({ViewNum, RowKey}, DictAcc2) -> - dict:append(ViewNum, {RowKey, DocId}, DictAcc2) + ({ok, {DocId, {Seq, ViewNumRowKeys}}}, DictAcc) -> + FoldFun = fun({ViewNum, RowKey}, {Acc, SAcc}) -> + Acc2 = dict:append(ViewNum, {RowKey, DocId}, Acc), + SAcc2 = dict:append(ViewNum, {RowKey, Seq}, SAcc), + {Acc2, SAcc2} end, lists:foldl(FoldFun, DictAcc, ViewNumRowKeys); ({not_found, _}, DictAcc) -> DictAcc end, - KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups), + {KeysToRem, SKeysToRem} = lists:foldl(MakeDictFun, {dict:new(), + dict:new()}, Lookups), - RemKeysFun = fun(#mrview{id_num=Num, btree=Btree}=View) -> - case dict:find(Num, KeysToRemove) of + RemKeysFun = fun(View) -> + #mrview{id_num=Num, btree=Btree, seq_btree=SBtree}=View, + case dict:find(Num, KeysToRem) of {ok, RemKeys} -> {ok, Btree2} = couch_btree:add_remove(Btree, [], RemKeys), + {ok, SBtree2} = case View#mrview.seq_indexed of + nil -> + {ok, nil}; + _ -> + {ok, RemSKeys} = dict:find(Num, SKeysToRem), + couch_btree:add_remove(SBtree, [], RemSKeys) + end, + NewPurgeSeq = case Btree2 /= Btree of true -> PurgeSeq; _ -> View#mrview.purge_seq end, - View#mrview{btree=Btree2, purge_seq=NewPurgeSeq}; + View#mrview{btree=Btree2, seq_btree=SBtree2, + purge_seq=NewPurgeSeq}; error -> View end @@ -187,8 +200,10 @@ write_results(Parent, #mrst{db_name=DbName, idx_name=IdxName}=State) -> Parent ! {new_state, State}; {ok, Info} -> EmptyKVs = [{V#mrview.id_num, []} || V <- State#mrst.views], - {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []), - NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys), + {Seq, ViewKVs, ViewSKVs, DocIdKeys} = merge_results(Info, 0, + EmptyKVs, EmptyKVs, []), + + NewState = write_kvs(State, Seq, ViewKVs, ViewSKVs, DocIdKeys), % notifify the view update couch_db_update_notifier:notify({view_updated, {DbName, IdxName}}), @@ -209,28 +224,33 @@ start_query_server(State) -> State#mrst{qserver=QServer}. -merge_results([], SeqAcc, ViewKVs, DocIdKeys) -> - {SeqAcc, ViewKVs, DocIdKeys}; -merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) -> - Fun = fun(RawResults, {VKV, DIK}) -> - merge_results(RawResults, VKV, DIK) +merge_results([], SeqAcc, ViewKVs, ViewSKVs, DocIdKeys) -> + {SeqAcc, ViewKVs, ViewSKVs, DocIdKeys}; +merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, ViewSKVs, DocIdKeys) -> + Fun = fun(RawResults, {VKV, VSKV, DIK}) -> + merge_results(RawResults, VKV, VSKV, DIK) end, - {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results), - merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1). + {ViewKVs1, ViewSKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, + ViewSKVs, DocIdKeys}, Results), + merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, ViewSKVs1, + DocIdKeys1). -merge_results({DocId, Seq, []}, ViewKVs, DocIdKeys) -> - {ViewKVs, [{DocId, {Seq, []}} | DocIdKeys]}; -merge_results({DocId, Seq, RawResults}, ViewKVs, DocIdKeys) -> +merge_results({DocId, Seq, []}, ViewKVs, ViewSKVs, DocIdKeys) -> + {ViewKVs, ViewSKVs, [{DocId, {Seq, []}} | DocIdKeys]}; +merge_results({DocId, Seq, RawResults}, ViewKVs, ViewSKVs, DocIdKeys) -> JsonResults = couch_query_servers:raw_to_ejson(RawResults), Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults], - {ViewKVs1, ViewIdKeys} = insert_results(DocId, Seq, Results, ViewKVs, [], []), - {ViewKVs1, [ViewIdKeys | DocIdKeys]}. + {ViewKVs1, ViewSKVs1, ViewIdKeys} = insert_results(DocId, Seq, + Results, ViewKVs, ViewSKVs, [], [], []), + {ViewKVs1, ViewSKVs1, [ViewIdKeys | DocIdKeys]}. -insert_results(DocId, Seq, [], [], ViewKVs, ViewIdKeys) -> - {lists:reverse(ViewKVs), {DocId, {Seq, ViewIdKeys}}}; -insert_results(DocId, Seq, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> +insert_results(DocId, Seq, [], [], [], ViewKVs, ViewSKVs, ViewIdKeys) -> + {lists:reverse(ViewKVs), lists:reverse(ViewSKVs), {DocId, {Seq, + ViewIdKeys}}}; +insert_results(DocId, Seq, [KVs | RKVs], [{Id, VKVs} | RVKVs], [{Id, + VSKVs} | RVSKVs], VKVAcc, VSKVAcc, VIdKeys) -> CombineDupesFun = fun ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) -> {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys}; @@ -242,11 +262,12 @@ insert_results(DocId, Seq, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) InitAcc = {[], VIdKeys}, {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)), FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs, - insert_results(DocId, Seq, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], - VIdKeys0). + FinalSKVs = [{{Key, Seq}, {DocId, Val}} || {Key, Val} <- Duped] ++ VSKVs, + insert_results(DocId, Seq, RKVs, RVKVs, RVSKVs, [{Id, FinalKVs} | VKVAcc], + [{Id, FinalSKVs} | VSKVAcc], VIdKeys0). -write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> +write_kvs(State, UpdateSeq, ViewKVs, ViewSKVs, DocIdKeys) -> #mrst{ seq_indexed=SeqIndexed, id_btree=IdBtree, @@ -265,19 +286,35 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> {ok, nil} end, - ToRemByView = collapse_rem_keys(ToRemove, dict:new()), - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) -> + {ToRemByView, ToRemByViewSeq} = collapse_rem_keys(ToRemove, + dict:new(), dict:new()), + + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}, + {ViewId, SKVs}) -> ToRem = couch_util:dict_find(ViewId, ToRemByView, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), + + {ok, VSBtree2} = case View#mrview.seq_indexed of + true -> + SeqToRem = couch_util:dict_find(ViewId, ToRemByViewSeq, []), + couch_btree:add_remove(View#mrview.seq_btree, SKVs, + SeqToRem); + _ -> + {ok, nil} + end, + NewUpdateSeq = case VBtree2 =/= View#mrview.btree of true -> UpdateSeq; _ -> View#mrview.update_seq end, - View#mrview{btree=VBtree2, update_seq=NewUpdateSeq} + View#mrview{btree=VBtree2, seq_btree=VSBtree2, + update_seq=NewUpdateSeq} end, + Views = lists:zipwith3(UpdateView, State#mrst.views, ViewKVs, ViewSKVs), + State#mrst{ - views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs), + views= Views, update_seq=UpdateSeq, id_btree=IdBtree2, seq_btree=SeqBtree2 @@ -297,15 +334,17 @@ update_seq_btree(Btree, DocIdKeys, ToRemBySeq) -> ToAdd = couch_mrview_util:to_seqkvs(DocIdKeys, []), couch_btree:add_remove(Btree, ToAdd, ToRemBySeq). -collapse_rem_keys([], Acc) -> - Acc; -collapse_rem_keys([{ok, {DocId, {_Seq, ViewIdKeys}}} | Rest], Acc) -> - NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) -> - dict:append(ViewId, {Key, DocId}, Acc2) - end, Acc, ViewIdKeys), - collapse_rem_keys(Rest, NewAcc); -collapse_rem_keys([{not_found, _} | Rest], Acc) -> - collapse_rem_keys(Rest, Acc). +collapse_rem_keys([], Acc, SAcc) -> + {Acc, SAcc}; +collapse_rem_keys([{ok, {DocId, {Seq, ViewIdKeys}}} | Rest], Acc, SAcc) -> + {NewAcc, NewSAcc} = lists:foldl(fun({ViewId, Key}, {Acc2, SAcc2}) -> + Acc3 = dict:append(ViewId, {Key, DocId}, Acc2), + SAcc3 = dict:append(ViewId, {Key, Seq}, SAcc2), + {Acc3, SAcc3} + end, {Acc, SAcc}, ViewIdKeys), + collapse_rem_keys(Rest, NewAcc, NewSAcc); +collapse_rem_keys([{not_found, _} | Rest], Acc, SAcc) -> + collapse_rem_keys(Rest, Acc, SAcc). send_partial(Pid, State) when is_pid(Pid) -> diff --git a/apps/couch_mrview/src/couch_mrview_util.erl b/apps/couch_mrview/src/couch_mrview_util.erl index b78bec2f..7726a6cd 100644 --- a/apps/couch_mrview/src/couch_mrview_util.erl +++ b/apps/couch_mrview/src/couch_mrview_util.erl @@ -25,6 +25,8 @@ -export([validate_args/1]). -export([maybe_load_doc/3, maybe_load_doc/4]). -export([to_seqkvs/2]). +-export([change_keys/4]). +-export([get_view_id/3]). -define(MOD, couch_mrview_index). -define(MRFMT, 0.2). @@ -65,6 +67,7 @@ get_view(Db, DDoc, ViewName, Args0) -> Sig = view_sig(Db, State, View, Args3), {ok, {Type, View}, Sig, Args3}. + ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) -> case couch_util:get_value(<<"map">>, MRFuns) of @@ -83,7 +86,10 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs], {View#mrview.map_names, RedFuns} end, - View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs}, + SeqIndexed = couch_util:get_value(<<"seq_indexed">>, + ViewOpts, false), + View2 = View#mrview{seq_indexed=SeqIndexed, + map_names=MapNames, reduce_funs=RedSrcs}, dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc); undefined -> DictBySrcAcc @@ -119,6 +125,14 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> {ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}. +get_view_id(#db{name=DbName}, DDoc, VName) -> + {ok, #mrst{views=Views}} = ddoc_to_mrst(DbName, DDoc), + ViewNames = lists:foldl(fun(#mrview{id_num=Id, map_names=[Name|_]}, Acc) -> + dict:store(Name, Id, Acc) + end, dict:new(), Views), + dict:find(VName, ViewNames). + + set_view_type(_Args, _ViewName, []) -> throw({not_found, missing_named_view}); set_view_type(Args, ViewName, [View | Rest]) -> @@ -179,7 +193,7 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) -> purge_seq=couch_db:get_purge_seq(Db), id_btree_state=nil, seq_btree_state=nil, - view_states=[{nil, 0, 0} || _ <- Views] + view_states=[{nil, nil, 0, 0} || _ <- Views] }, init_state(Db, Fd, State, Header); init_state(Db, Fd, State, Header) -> @@ -192,9 +206,10 @@ init_state(Db, Fd, State, Header) -> view_states=ViewStates } = Header, + StateUpdate = fun - ({_, _, _}=St) -> St; - (St) -> {St, 0, 0} + ({_, _, _, _}=St) -> St; + (St) -> {St, nil, 0, 0} end, ViewStates2 = lists:map(StateUpdate, ViewStates), @@ -225,8 +240,7 @@ init_state(Db, Fd, State, Header) -> views=Views2 }. - -open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) -> +open_view(Db, Fd, Lang, {BTState, SBTState, USeq, PSeq}, View) -> FunSrcs = [FunSrc || {_Name, FunSrc} <- View#mrview.reduce_funs], ReduceFun = fun(reduce, KVs) -> @@ -250,8 +264,19 @@ open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) -> {reduce, ReduceFun}, {compression, couch_db:compression(Db)} ], + {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts), - View#mrview{btree=Btree, update_seq=USeq, purge_seq=PSeq}. + + {ok, SeqBtree} = case View#mrview.seq_indexed of + true -> + ViewSBtOpts = [{compression, couch_db:compression(Db)}], + couch_btree:open(SBTState, Fd, ViewSBtOpts); + _ -> + {ok, nil} + end, + + View#mrview{btree=Btree, seq_btree=SeqBtree, + update_seq=USeq, purge_seq=PSeq}. temp_view_to_ddoc({Props}) -> @@ -509,15 +534,21 @@ make_header(State) -> seq_btree=SeqBtree, views=Views } = State, - ViewStates = [ - { + + ViewStates = lists:foldr(fun(V, Acc) -> + SBTState = case V#mrview.seq_indexed of + true -> + couch_btree:get_state(V#mrview.seq_btree); + _ -> + nil + end, + [{ couch_btree:get_state(V#mrview.btree), + SBTState, V#mrview.update_seq, V#mrview.purge_seq - } - || - V <- Views - ], + } | Acc] + end, [], Views), SeqBtreeState = case SeqIndexed of true -> couch_btree:get_state(SeqBtree); @@ -753,3 +784,16 @@ to_seqkvs([{Id, {Seq, Keys}} | Rest], Acc0) -> [{{ViewId, Seq}, Id} | Acc] end, Acc0, Keys), to_seqkvs(Rest, Acc1). + + +change_keys(ViewId, Args, StartSeq, EndSeq) -> + case {Args#mrargs.start_key, Args#mrargs.end_key} of + {undefined, undefined} -> + {false, {ViewId, StartSeq +1}, {ViewId, EndSeq}}; + {SK, undefined} -> + {true, {SK, StartSeq +1}, {SK, EndSeq}}; + {undefined, EK} -> + {true, {EK, StartSeq +1}, {EK, EndSeq}}; + {SK, EK} -> + {true, {SK, StartSeq +1}, {EK, EndSeq}} + end. diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.erl b/apps/couch_replicator/src/couch_replicator_api_wrap.erl index 8a586715..fc245e51 100644 --- a/apps/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/apps/couch_replicator/src/couch_replicator_api_wrap.erl @@ -21,6 +21,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). -include_lib("couch_changes/include/couch_changes.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). -include("couch_replicator_api_wrap.hrl"). -export([ @@ -390,6 +391,8 @@ maybe_add_changes_filter_q_args(BaseQS, Options) -> undefined -> BaseQS; FilterName -> + ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)], + ViewFields = ["key" | ViewFields0], {Params} = get_value(query_params, Options, {[]}), [{"filter", ?b2l(FilterName)} | lists:foldl( fun({K, V}, QSAcc) -> @@ -397,6 +400,13 @@ maybe_add_changes_filter_q_args(BaseQS, Options) -> case lists:keymember(Ks, 1, QSAcc) of true -> QSAcc; + false when FilterName =:= <<"_view">> -> + V1 = case lists:member(Ks, ViewFields) of + true -> + ?JSON_ENCODE(V); + false -> couch_util:to_list(V) + end, + [{Ks, V1} | QSAcc]; false -> [{Ks, couch_util:to_list(V)} | QSAcc] end From fce9e247c65fa90a58f779390ae33874e0791687 Mon Sep 17 00:00:00 2001 From: benoitc Date: Fri, 23 Mar 2012 16:49:09 +0100 Subject: [PATCH 06/14] fix imports --- apps/couch_changes/src/couch_changes.erl | 2 +- apps/couch_changes/src/couch_httpd_changes.erl | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index dc2e898f..9c6c1f10 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -11,7 +11,7 @@ % the License. -module(couch_changes). --include("couch_changes/include/couch_changes.hrl"). +-include_lib("couch_changes/include/couch_changes.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). diff --git a/apps/couch_changes/src/couch_httpd_changes.erl b/apps/couch_changes/src/couch_httpd_changes.erl index e65a6eb4..514f2145 100644 --- a/apps/couch_changes/src/couch_httpd_changes.erl +++ b/apps/couch_changes/src/couch_httpd_changes.erl @@ -11,6 +11,7 @@ % the License. -module(couch_httpd_changes). +-include_lib("couch_changes/include/couch_changes.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). -export([handle_changes_req/2]). From 37efe0015bdfad352a134f99ee40b2f17ee6b81c Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 27 Mar 2012 11:56:04 +0200 Subject: [PATCH 07/14] change order --- apps/couch_changes/src/couch_changes.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index 9c6c1f10..5e23ab11 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -11,9 +11,10 @@ % the License. -module(couch_changes). --include_lib("couch_changes/include/couch_changes.hrl"). + -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_httpd/include/couch_httpd.hrl"). +-include_lib("couch_changes/include/couch_changes.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -export([handle_changes/3]). From 950bde58cfc8c59b81239e2d43951f3f668f0e44 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 27 Mar 2012 12:31:34 +0200 Subject: [PATCH 08/14] fix include: --- apps/couch_changes/src/couch_changes.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/couch_changes/src/couch_changes.app.src b/apps/couch_changes/src/couch_changes.app.src index fd0d0079..ca5d635a 100644 --- a/apps/couch_changes/src/couch_changes.app.src +++ b/apps/couch_changes/src/couch_changes.app.src @@ -18,5 +18,5 @@ {description, "handle _changes API"}, {vsn, "0.1"}, {modules, []}, - {applications, [kernel, stdlib, crypto, couch]} + {applications, [kernel, stdlib, crypto, couch, couch_mrview]} ]}. From deb4968846fc6c46b8df5a286e7ee429bf6f5fd2 Mon Sep 17 00:00:00 2001 From: benoitc Date: Tue, 27 Mar 2012 13:40:49 +0200 Subject: [PATCH 09/14] Revert "fix include:" This reverts commit ed0076d3fdbf2b2cf28acb204f48fe07db65440f. --- apps/couch_changes/src/couch_changes.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/couch_changes/src/couch_changes.app.src b/apps/couch_changes/src/couch_changes.app.src index ca5d635a..fd0d0079 100644 --- a/apps/couch_changes/src/couch_changes.app.src +++ b/apps/couch_changes/src/couch_changes.app.src @@ -18,5 +18,5 @@ {description, "handle _changes API"}, {vsn, "0.1"}, {modules, []}, - {applications, [kernel, stdlib, crypto, couch, couch_mrview]} + {applications, [kernel, stdlib, crypto, couch]} ]}. From 93e1987252d74f824835d459d589e771c47c3d35 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Apr 2012 08:30:02 -0400 Subject: [PATCH 10/14] fix purge --- apps/couch_mrview/src/couch_mrview_updater.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/couch_mrview/src/couch_mrview_updater.erl b/apps/couch_mrview/src/couch_mrview_updater.erl index 7a113ba0..26d95d55 100644 --- a/apps/couch_mrview/src/couch_mrview_updater.erl +++ b/apps/couch_mrview/src/couch_mrview_updater.erl @@ -93,7 +93,7 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> {ok, RemKeys} -> {ok, Btree2} = couch_btree:add_remove(Btree, [], RemKeys), {ok, SBtree2} = case View#mrview.seq_indexed of - nil -> + false -> {ok, nil}; _ -> {ok, RemSKeys} = dict:find(Num, SKeysToRem), From ebab8b66211c54d662bdf1cf6befd39bbca9d0a4 Mon Sep 17 00:00:00 2001 From: benoitc Date: Sat, 12 May 2012 14:43:53 +0200 Subject: [PATCH 11/14] filter fields in included doc when returned from a _changes With this changes you can only return a specified list of fields in the included docs by passing the list of them to an optionnal `fields` parameter. For now nested fields are ignored. $ curl -XPUT localhost:5984/testdb {"ok":true} $ curl -XPUT localhost:5984/testdb/test -d'{"f1": 1, "f2": 2, "f3": 3}' {"ok":true,"id":"test","rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"} $ curl -XPUT localhost:5984/testdb/test1 -d'{"f1": 1, "f2": 4, "f3": 3}' {"ok":true,"id":"test1","rev":"1-2942771a428c4b45315710db0b6aaa7a"} $ curl -XPUT localhost:5984/testdb/test2 -d'{"f1": 1, "f2": 7, "f3": 8}' {"ok":true,"id":"test2","rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"} $ curl "http://localhost:5984/testdb/_changes?include_docs=true" {"results":[ {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2,"f3":3}}, {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4,"f3":3}}, {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7,"f3":8}} ], "last_seq":3} $ curl 'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1"\]' {"results":[ {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1}}, {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1}}, {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1}} ], "last_seq":3} $ curl 'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1","f2"\]' {"results":[ {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2}}, {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4}}, {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7}} ], "last_seq":3} --- apps/couch_changes/include/couch_changes.hrl | 1 + apps/couch_changes/src/couch_changes.erl | 31 +++++++++++++++++-- .../couch_changes/src/couch_httpd_changes.erl | 2 ++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/apps/couch_changes/include/couch_changes.hrl b/apps/couch_changes/include/couch_changes.hrl index 446045fc..f118f805 100644 --- a/apps/couch_changes/include/couch_changes.hrl +++ b/apps/couch_changes/include/couch_changes.hrl @@ -24,6 +24,7 @@ filter_view, view_args = [], include_docs = false, + fields = [], conflicts = false, db_open_options = [] }). diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index 5e23ab11..8195b5d1 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -33,6 +33,7 @@ resp_type, limit, include_docs, + fields, conflicts, timeout, timeout_fun @@ -308,6 +309,7 @@ start_sending_changes(Callback, UserAcc, ResponseType) -> build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ include_docs = IncludeDocs, + fields = Fields, conflicts = Conflicts, limit = Limit, feed = ResponseType, @@ -323,6 +325,7 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) - resp_type = ResponseType, limit = Limit, include_docs = IncludeDocs, + fields = Fields, conflicts = Conflicts, timeout = Timeout, timeout_fun = TimeoutFun @@ -594,7 +597,8 @@ changes_row(Results, DocInfo, Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, - #changes_acc{db = Db, include_docs = IncDoc, conflicts = Conflicts} = Acc, + #changes_acc{db = Db, include_docs = IncDoc, fields=Fields, + conflicts = Conflicts} = Acc, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ deleted_item(Del) ++ case IncDoc of true -> @@ -605,7 +609,16 @@ changes_row(Results, DocInfo, Acc) -> Doc = couch_index_util:load_doc(Db, DocInfo, Opts), case Doc of null -> [{doc, null}]; - _ -> [{doc, couch_doc:to_json_obj(Doc, [])}] + _ -> + case Fields of + [] -> + + [{doc, couch_doc:to_json_obj(Doc, [])}]; + _ -> + JsonObj = couch_doc:to_json_obj(Doc, []), + [{doc, filter_doc_fields(Fields, + JsonObj, [])}] + end end; false -> [] @@ -678,6 +691,20 @@ filter_docs(Req, Db, DDoc, FName, Docs) -> [JsonDocs, JsonReq]), {ok, Passes}. +filter_doc_fields([], {Props}, Acc) -> + Id = proplists:get_value(<<"_id">>, Props), + Rev = proplists:get_value(<<"_rev">>, Props), + Acc1 = [{<<"_id">>, Id}, {<<"_rev">>, Rev}] ++ lists:reverse(Acc), + {Acc1}; +filter_doc_fields([Field|Rest], {Props}=Doc, Acc) -> + Acc1 = case couch_util:get_value(Field, Props) of + undefined -> + Acc; + Value -> + [{Field, Value} | Acc] + end, + filter_doc_fields(Rest, Doc, Acc1). + parse_view_args({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), parse_view_args1(Query, #mrargs{}); diff --git a/apps/couch_changes/src/couch_httpd_changes.erl b/apps/couch_changes/src/couch_httpd_changes.erl index 514f2145..a715a2a7 100644 --- a/apps/couch_changes/src/couch_httpd_changes.erl +++ b/apps/couch_changes/src/couch_httpd_changes.erl @@ -131,6 +131,8 @@ parse_changes_query(Req) -> Args#changes_args{timeout=list_to_integer(Value)}; {"include_docs", "true"} -> Args#changes_args{include_docs=true}; + {"fields", _} -> + Args#changes_args{fields=?JSON_DECODE(Value)}; {"conflicts", "true"} -> Args#changes_args{conflicts=true}; {"filter", _} -> From 16db7936cb6cc172e155c2e12de11c8a0c395fc3 Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 16 May 2012 06:58:29 +0200 Subject: [PATCH 12/14] add a test for changes eventsource feed. --- .../share/www/script/test/changes.js | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/apps/couch_httpd/share/www/script/test/changes.js b/apps/couch_httpd/share/www/script/test/changes.js index 5ec18750..bc90b969 100644 --- a/apps/couch_httpd/share/www/script/test/changes.js +++ b/apps/couch_httpd/share/www/script/test/changes.js @@ -139,6 +139,34 @@ couchTests.changes = function(debug) { // otherwise we'll continue to receive heartbeats forever xhr.abort(); + + if (window.EventSource) { + var source = new EventSource( + "/test_suite_db/_changes?feed=eventsource"); + var results = []; + var sourceListener = function(e) { + var data = JSON.parse(e.data); + results.push(data); + + }; + + source.addEventListener('message', sourceListener , false); + + waitForSuccess(function() { + if (results.length != 3) + throw "bad seq, try again"; + }); + + source.removeEventListener('message', sourceListener, false); + + T(results[0].seq == 1); + T(results[0].id == "foo"); + + T(results[1].seq == 2); + T(results[1].id == "bar"); + T(results[1].changes[0].rev == docBar._rev); + } + // test longpolling xhr = CouchDB.newXhr(); From 4c9bafcc84e9ae104c84c2e09fa85de50a3d43d9 Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 16 May 2012 08:00:43 +0200 Subject: [PATCH 13/14] fix whitespaces --- .../share/www/script/test/changes.js | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/apps/couch_httpd/share/www/script/test/changes.js b/apps/couch_httpd/share/www/script/test/changes.js index bc90b969..ab29f334 100644 --- a/apps/couch_httpd/share/www/script/test/changes.js +++ b/apps/couch_httpd/share/www/script/test/changes.js @@ -30,7 +30,7 @@ couchTests.changes = function(debug) { T(db.save(docFoo).ok); T(db.ensureFullCommit().ok); T(db.open(docFoo._id)._id == docFoo._id); - + req = CouchDB.request("GET", "/test_suite_db/_changes"); var resp = JSON.parse(req.responseText); @@ -39,7 +39,7 @@ couchTests.changes = function(debug) { T(resp.results[0].changes[0].rev == docFoo._rev); // test with callback - + run_on_modified_server( [{section: "httpd", key: "allow_jsonp", @@ -83,7 +83,7 @@ couchTests.changes = function(debug) { var docBar = {_id:"bar", bar:1}; db.save(docBar); - + var lines, change1, change2; waitForSuccess(function() { lines = xhr.responseText.split("\n"); @@ -96,12 +96,12 @@ couchTests.changes = function(debug) { T(change1.seq == 1); T(change1.id == "foo"); - + T(change2.seq == 2); T(change2.id == "bar"); T(change2.changes[0].rev == docBar._rev); - - + + var docBaz = {_id:"baz", baz:1}; db.save(docBaz); @@ -113,7 +113,7 @@ couchTests.changes = function(debug) { throw "bad seq, try again"; } }); - + T(change3.seq == 3); T(change3.id == "baz"); T(change3.changes[0].rev == docBaz._rev); @@ -124,7 +124,7 @@ couchTests.changes = function(debug) { //verify the hearbeat newlines are sent xhr.open("GET", "/test_suite_db/_changes?feed=continuous&heartbeat=10&timeout=500", true); xhr.send(""); - + var str; waitForSuccess(function() { str = xhr.responseText; @@ -151,17 +151,17 @@ couchTests.changes = function(debug) { }; source.addEventListener('message', sourceListener , false); - + waitForSuccess(function() { - if (results.length != 3) + if (results.length != 3) throw "bad seq, try again"; }); - + source.removeEventListener('message', sourceListener, false); T(results[0].seq == 1); T(results[0].id == "foo"); - + T(results[1].seq == 2); T(results[1].id == "bar"); T(results[1].changes[0].rev == docBar._rev); @@ -172,14 +172,14 @@ couchTests.changes = function(debug) { xhr.open("GET", "/test_suite_db/_changes?feed=longpoll", true); xhr.send(""); - + waitForSuccess(function() { lines = xhr.responseText.split("\n"); if (lines[5] != '"last_seq":3}') { throw("still waiting"); } }, "last_seq"); - + xhr = CouchDB.newXhr(); xhr.open("GET", "/test_suite_db/_changes?feed=longpoll&since=3", true); @@ -187,7 +187,7 @@ couchTests.changes = function(debug) { var docBarz = {_id:"barz", bar:1}; db.save(docBarz); - + var parse_changes_line = function(line) { if (line.charAt(line.length-1) == ",") { var linetrimmed = line.substring(0, line.length-1); @@ -196,14 +196,14 @@ couchTests.changes = function(debug) { } return JSON.parse(linetrimmed); }; - + waitForSuccess(function() { lines = xhr.responseText.split("\n"); if (lines[3] != '"last_seq":4}') { throw("still waiting"); } }, "change_lines"); - + var change = parse_changes_line(lines[1]); T(change.seq == 4); T(change.id == "barz"); @@ -212,7 +212,7 @@ couchTests.changes = function(debug) { } - + // test the filtered changes var ddoc = { _id : "_design/changes_filter", @@ -253,15 +253,15 @@ couchTests.changes = function(debug) { db.save({"bop" : "foom"}); db.save({"bop" : false}); - + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/bop"); var resp = JSON.parse(req.responseText); T(resp.results.length == 1, "filtered/bop"); - + req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/dynamic&field=woox"); resp = JSON.parse(req.responseText); T(resp.results.length == 0); - + req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/dynamic&field=bop"); resp = JSON.parse(req.responseText); T(resp.results.length == 1, "changes_filter/dynamic&field=bop"); @@ -280,15 +280,15 @@ couchTests.changes = function(debug) { xhr.send(""); db.save({"_id":"falsy", "bop" : ""}); // empty string is falsy db.save({"_id":"bingo","bop" : "bingo"}); - + waitForSuccess(function() { resp = JSON.parse(xhr.responseText); }, "longpoll-since"); - + T(resp.last_seq == 9); T(resp.results && resp.results.length > 0 && resp.results[0]["id"] == "bingo", "filter the correct update"); xhr.abort(); - + var timeout = 500; var last_seq = 10; while (true) { @@ -331,31 +331,31 @@ couchTests.changes = function(debug) { // error conditions // non-existing design doc - var req = CouchDB.request("GET", + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=nothingtosee/bop"); TEquals(404, req.status, "should return 404 for non existant design doc"); - // non-existing filter - var req = CouchDB.request("GET", + // non-existing filter + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/movealong"); TEquals(404, req.status, "should return 404 for non existant filter fun"); // both - var req = CouchDB.request("GET", + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=nothingtosee/movealong"); - TEquals(404, req.status, + TEquals(404, req.status, "should return 404 for non existant design doc and filter fun"); // changes get all_docs style with deleted docs var doc = {a:1}; db.save(doc); db.deleteDoc(doc); - var req = CouchDB.request("GET", + var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/bop&style=all_docs"); var resp = JSON.parse(req.responseText); var expect = (!is_safari && xhr) ? 3: 1; TEquals(expect, resp.results.length, "should return matching rows"); - + // test filter on view function (map) // T(db.save({"_id":"blah", "bop" : "plankton"}).ok); @@ -379,7 +379,7 @@ couchTests.changes = function(debug) { var req = CouchDB.request("GET", "/_session", authOpts); var resp = JSON.parse(req.responseText); - + T(db.save({"user" : "Noah Slater"}).ok); var req = CouchDB.request("GET", "/test_suite_db/_changes?filter=changes_filter/userCtx", authOpts); var resp = JSON.parse(req.responseText); @@ -471,7 +471,7 @@ couchTests.changes = function(debug) { T(resp.results.length === 2); T(resp.results[0].id === "something"); T(resp.results[1].id === "anotherthing"); - + var docids = JSON.stringify(["something", "anotherthing", "andmore"]), req = CouchDB.request("GET", "/test_suite_db/_changes?filter=_doc_ids&doc_ids="+docids, options); var resp = JSON.parse(req.responseText); @@ -490,9 +490,9 @@ couchTests.changes = function(debug) { xhr = CouchDB.newXhr(); xhr.open("POST", "/test_suite_db/_changes?feed=continuous&timeout=500&since=7&filter=_doc_ids", true); xhr.setRequestHeader("Content-Type", "application/json"); - + xhr.send(options.body); - + T(db.save({"_id":"andmore", "bop" : "plankton"}).ok); waitForSuccess(function() { From 56bb34460ec9791eb15499921aa6a05da9f6bdf7 Mon Sep 17 00:00:00 2001 From: benoitc Date: Wed, 16 May 2012 08:01:52 +0200 Subject: [PATCH 14/14] make sure the changes test works on all browsers --- apps/couch_httpd/share/www/script/test/changes.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/couch_httpd/share/www/script/test/changes.js b/apps/couch_httpd/share/www/script/test/changes.js index ab29f334..186a3c86 100644 --- a/apps/couch_httpd/share/www/script/test/changes.js +++ b/apps/couch_httpd/share/www/script/test/changes.js @@ -140,14 +140,13 @@ couchTests.changes = function(debug) { xhr.abort(); - if (window.EventSource) { + if (!!window.EventSource) { var source = new EventSource( "/test_suite_db/_changes?feed=eventsource"); var results = []; var sourceListener = function(e) { var data = JSON.parse(e.data); results.push(data); - }; source.addEventListener('message', sourceListener , false);