Permalink
Browse files

add `seq_indexed` option to a design document.

If the option `seq_indexed` is **true**, the views indexes will be also
indexed by sequences. It is possible to retrieve changes using the
`couch_mrview:view_changes_since/{5,6,7}`  function wich allows you to
get all changes in a view or changes from a view in a range since last
sequence. ex:

    couch_mrview:view_changes_since(Db, <<"_design/test">>, <<"test">>,
0, fun(KV, Acc) -> io:format("kv ~p~n", [KV]), {ok, Acc} end,
[{start_key, null}, {end_key, null}]).
    kv {{null,4},
        {<<"1f7d72c4ae8cab487c7efeff050011fe">>,
         {[{<<"_id">>,<<"1f7d72c4ae8cab487c7efeff050011fe">>},
           {<<"_rev">>,<<"1-9b2a5b981a9dd0ad1b3a3e826498e9bb">>},
           {<<"test">>,null}]}}}
    {ok,[]}

If the range is omitted all changes will be retrieved since the last
sequence.

Internals:

When `seq_indexed` is created 1 main sequence is created for all the
view group with the key `{ViewId, Seq}` and the value `{DocId, Key,
Val}`
is added. There can be multiple lines by sequences. These sequences are
retrieved by querying the b-tree in the range:  `[{ViewId, StartSeq+1},
{ViewId, EndSeq}]`
where EndSeq depending on the direction can be 0 or a the last big
integer.

To allows the querying of changes in a range, a new index is added per
views with the composite key `{Key, Seq}` (instead DocId) and the value
`{DocId, Val}` is indexed for this key. We query this index in the range
`[{StartKey, StartSeq+1}, {EndKey, EndSeq}]` to retrieve the changes.

Caveats:

The size of the indexes will grow significantly with this changes and
compaction is needed more often.
  • Loading branch information...
1 parent d1850f2 commit 14ad584c993e7a25c000d8db905fd0d0a88a24b4 @benoitc benoitc committed Jun 21, 2012
@@ -19,9 +19,11 @@
language,
design_opts=[],
include_deleted=false,
+ seq_indexed=false,
lib,
views,
id_btree=nil,
+ seq_btree=nil,
update_seq=0,
purge_seq=0,
@@ -36,12 +38,14 @@
-record(mrview, {
id_num,
+ seq_indexed=false,
update_seq=0,
purge_seq=0,
map_names=[],
reduce_funs=[],
def,
btree=nil,
+ seq_btree=nil,
options=[]
}).
@@ -50,6 +54,7 @@
seq=0,
purge_seq=0,
id_btree_state=nil,
+ seq_btree_state=nil,
view_states=nil
}).
@@ -14,6 +14,7 @@
-export([query_all_docs/2, query_all_docs/4]).
-export([query_view/3, query_view/4, query_view/6]).
+-export([view_changes_since/5, view_changes_since/6, view_changes_since/7]).
-export([get_info/2]).
-export([compact/2, compact/3, cancel_compaction/2]).
-export([cleanup/1]).
@@ -72,7 +73,8 @@ query_view(Db, DDoc, VName, Args) ->
query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
- {ok, VInfo, Sig, Args} = couch_mrview_util:get_view(Db, DDoc, VName, Args0),
+ {ok, VInfo, _State, Sig, Args} = couch_mrview_util:get_view(Db, DDoc,
+ VName, Args0),
{ok, Acc1} = case Args#mrargs.preflight_fun of
PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
_ -> {ok, Acc0}
@@ -86,6 +88,69 @@ query_view(Db, {Type, View}, Args, Callback, Acc) ->
red -> red_fold(Db, View, Args, Callback, Acc)
end.
+view_changes_since(Db, DDoc, VName, StartSeq, Callback) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback, #mrargs{}, []).
+
+view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, []).
+
+view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc)
+ when is_list(Args) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback,
+ to_mrargs(Args), Acc);
+view_changes_since(Db, DDoc, VName, StartSeq, Callback,
+ #mrargs{direction=Dir}=Args0, Acc) ->
+ EndSeq = case Dir of
+ fwd -> 16#10000000;
+ rev -> 0
+ end,
+ {ok, VInfo, State, _Sig, Args} = couch_mrview_util:get_view(Db, DDoc,
+ VName, Args0),
+
+ {_Type, View} = VInfo,
+ case View#mrview.seq_indexed of
+ true ->
+ Acc0 = {Dir, StartSeq, Acc},
+ WrapperFun = fun
+ ({{_ViewId, Seq}, {DocId, Key, Val}}, _Reds, {_, _, Acc2}) ->
+ {ok, Acc3} = Callback({{Key, Seq}, {DocId, Val}}, Acc2),
+ {ok, {fwd, Seq, Acc3}};
+ ({{_Key, Seq}, _DocId}=KV, _Reds, {fwd, LastSeq, Acc2})
+ when Seq > LastSeq, Seq =< EndSeq ->
+ {ok, Acc3} = Callback(KV, Acc2),
+ {ok, {fwd, Seq, Acc3}};
+ ({{_Key, Seq}, _DocId}=KV, _Reds, {D, LastSeq, Acc2})
+ when Seq < LastSeq, Seq >= EndSeq ->
+ {ok, Acc3} = Callback(KV, Acc2),
+ {ok, {D, Seq, Acc3}};
+ (_, _, Acc2) ->
+ {ok, Acc2}
+ end,
+ {Btree, Opts} = case {Args#mrargs.start_key, Args#mrargs.end_key} of
+ {undefined, undefined} ->
+ #mrst{seq_btree=SeqBtree} = State,
+ #mrview{id_num=Id}=View,
+ {SeqBtree, [{start_key, {Id, StartSeq+1}},
+ {end_key, {Id, EndSeq}}]};
+ {SK, undefined} ->
+ {View#mrview.seq_btree, [{start_key, {SK, StartSeq+1}},
+ {end_key, {SK, EndSeq}}]};
+ {undefined, EK} ->
+ {View#mrview.seq_btree, [{start_key, {EK, StartSeq+1}},
+ {end_key, {EK, EndSeq}}]};
+ {SK, EK} ->
+ {View#mrview.seq_btree, [{start_key, {SK, StartSeq+1}},
+ {end_key, {EK, EndSeq}}]}
+ end,
+
+ {ok, _R, {_, _, AccOut}} = couch_btree:fold(Btree, WrapperFun,
+ Acc0, Opts),
+ {ok, AccOut};
+ _ ->
+ {error, seqs_not_indexed}
+ end.
+
+
get_info(Db, DDoc) ->
{ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
@@ -18,13 +18,12 @@
-export([compact/3, swap_compacted/2]).
-record(acc, {
- btree = nil,
- last_id = nil,
- kvs = [],
- kvs_size = 0,
- changes = 0,
- total_changes
-}).
+ btree = nil,
+ last_id = nil,
+ kvs = [],
+ kvs_size = 0,
+ changes = 0,
+ total_changes}).
compact(_Db, State, Opts) ->
@@ -37,9 +36,11 @@ compact(State) ->
#mrst{
db_name=DbName,
idx_name=IdxName,
+ seq_indexed=SeqIndexed,
sig=Sig,
update_seq=Seq,
id_btree=IdBtree,
+ seq_btree=SeqBtree,
views=Views
} = State,
@@ -51,16 +52,25 @@ compact(State) ->
#mrst{
id_btree = EmptyIdBtree,
+ seq_btree = EmptySeqBtree,
views = EmptyViews
} = EmptyState,
{ok, Count} = couch_btree:full_reduce(IdBtree),
- TotalChanges = lists:foldl(
+ TotalChanges0 = lists:foldl(
fun(View, Acc) ->
{ok, Kvs} = couch_mrview_util:get_row_count(View),
Acc + Kvs
end,
Count, Views),
+
+
+ TotalChanges = case SeqIndexed of
+ true ->
+ TotalChanges0 * 2;
+ _ ->
+ TotalChanges0
+ end,
couch_task_status:add_task([
{type, view_compaction},
{database, DbName},
@@ -74,7 +84,8 @@ compact(State) ->
BufferSize = list_to_integer(BufferSize0),
FoldFun = fun({DocId, _} = KV, Acc) ->
- #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
+ #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize,
+ last_id = LastId} = Acc,
if DocId =:= LastId ->
% COUCHDB-999 regression test
?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`"
@@ -88,31 +99,64 @@ compact(State) ->
true ->
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
Acc2 = update_task(Acc, 1 + length(Kvs)),
- {ok, Acc2#acc{
- btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
+ {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0,
+ last_id = DocId}};
_ ->
- {ok, Acc#acc{
- kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
+ {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2,
+ last_id = DocId}}
end
end,
+ %% compact view group byte
InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
{ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc),
#acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
- {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+ Uncopied1 = lists:reverse(Uncopied),
+ {ok, NewIdBtree} = couch_btree:add(Bt3, Uncopied1),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
- {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
+ {NewViews, FinalAcc3} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
compact_view(View, EmptyView, BufferSize, Acc)
end, FinalAcc2, lists:zip(Views, EmptyViews)),
+ %% compact main seq btree
+ NewSeqBtree = compact_seq_btree(SeqBtree, EmptySeqBtree, BufferSize,
+ FinalAcc3),
+
unlink(EmptyState#mrst.fd),
{ok, EmptyState#mrst{
id_btree=NewIdBtree,
+ seq_btree=NewSeqBtree,
views=NewViews,
update_seq=Seq
}}.
+compact_seq_btree(nil, _, _, _) ->
+ nil;
+compact_seq_btree(SeqBtree, EmptySeqBtree, BufferSize, Acc0) ->
+ FoldFun = fun(KV, Acc) ->
+ #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc,
+
+ KvsSize2 = KvsSize + ?term_size(KV),
+ case KvsSize2 >= BufferSize of
+ true ->
+ ToAdd = lists:reverse([KV | Kvs]),
+ {ok, Bt2} = couch_btree:add(Bt, ToAdd),
+ Acc2 = update_task(Acc, 1 + length(Kvs)),
+ {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}};
+ _ ->
+ {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}}
+ end
+ end,
+
+ InitAcc = Acc0#acc{kvs=[], kvs_size=0, btree = EmptySeqBtree},
+ {ok, _, FinalAcc} = couch_btree:foldl(SeqBtree, FoldFun, InitAcc),
+ #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
+ Uncopied1 = lists:reverse(Uncopied),
+ {ok, NewSeqBtree} = couch_btree:add(Bt3, Uncopied1),
+ update_task(FinalAcc, length(Uncopied)),
+ NewSeqBtree.
+
recompact(State) ->
link(State#mrst.fd),
@@ -139,12 +183,28 @@ compact_view(View, EmptyView, BufferSize, Acc0) ->
end
end,
+ %% compact main view btree
InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyView#mrview.btree},
{ok, _, FinalAcc} = couch_btree:foldl(View#mrview.btree, Fun, InitAcc),
#acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
{ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
- {EmptyView#mrview{btree=NewBt}, FinalAcc2}.
+
+ %% compact view seq btree
+ SInitAcc = FinalAcc2#acc{kvs = [], kvs_size = 0,
+ btree = EmptyView#mrview.seq_btree},
+ {NewSBt, FinalAcc3} = case View#mrview.seq_btree of
+ nil ->
+ {nil, FinalAcc2};
+ SeqBtree ->
+ {ok, _, SFinalAcc} = couch_btree:foldl(SeqBtree, Fun, SInitAcc),
+ #acc{btree = SBt3, kvs = SUncopied} = SFinalAcc,
+ {ok, NewSBt1} = couch_btree:add(SBt3, lists:reverse(SUncopied)),
+ SFinalAcc1 = update_task(SFinalAcc, length(SUncopied)),
+ {NewSBt1, SFinalAcc1}
+ end,
+
+ {EmptyView#mrview{btree=NewBt, seq_btree=NewSBt}, FinalAcc3}.
update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) ->
@@ -170,5 +230,5 @@ swap_compacted(OldState, NewState) ->
unlink(OldState#mrst.fd),
couch_ref_counter:drop(OldState#mrst.refc),
{ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]),
-
+
{ok, NewState#mrst{refc=NewRefCounter}}.
@@ -40,10 +40,13 @@ get(Property, State) ->
LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
IncludeDeleted = couch_util:get_value(<<"include_deleted">>,
Opts, false),
+ SeqIndexed = couch_util:get_value(<<"seq_indexed">>,
+ Opts, false),
if IncDesign -> [include_design]; true -> [] end
++ if LocalSeq -> [local_seq]; true -> [] end
- ++ if IncludeDeleted -> [include_deleted]; true -> [] end;
+ ++ if IncludeDeleted -> [include_deleted]; true -> [] end
+ ++ if SeqIndexed -> [seq_indexed]; true -> [] end;
info ->
#mrst{
fd = Fd,
Oops, something went wrong.

0 comments on commit 14ad584

Please sign in to comment.