Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

View changes #1

Closed
wants to merge 15 commits into from

1 participant

Benoit Chesneau
Benoit Chesneau
Owner

Pull request to track the view changes branch.

benoitc added some commits
Benoit Chesneau benoitc Add `couch_mrview:view_changes_since` function.
This functions fetch all changes in a view since last update like the
`couch_db:changes_since` function for the databases. It returns an error
if the option `seq_indexed` hasn't been set to `true` in the design
document:

    {
        "_id": "_design/somedoc",
        "options": {
            "seq_indexed": "true"
        },
        "views": {
            "someview": {
                "map": "function(doc) { if (doc.field) emit(doc.field, null); }"
            }
        }
    }

This is done by adding an index to the view group (a view group is the
global index associated to a design doc basically). This index associate
each sequences & documents ID to a view . `{{ViewId, Seq}, DocId} is
stored in the seq_btree, DocId is the value.

example of usage:

    (refuge@127.0.0.1)3> F = fun(KV, Acc) -> {ok, [KV|Acc]} end.
    (refuge@127.0.0.1)4>
    (refuge@127.0.0.1)4> couch_mrview:view_changes_since(Db, DDoc,
<<"all">>, 9990, F).
    [info] [<0.220.0>] Opening index for db: testdb idx: _design/test
sig:
    "85b8fa1e39dc7fbe8b11b729b319c6bf"
    {ok,[{10000,{<<"72f9250618e65adc8cf4552758fff137">>,0}},
         {9999,{<<"72f9250618e65adc8cf4552758ffe85d">>,0}},
         {9998,{<<"72f9250618e65adc8cf4552758ffdbcf">>,0}},
         {9997,{<<"72f9250618e65adc8cf4552758ffd65e">>,0}},
         {9996,{<<"72f9250618e65adc8cf4552758ffd611">>,0}},
         {9995,{<<"72f9250618e65adc8cf4552758ffd045">>,0}},
         {9994,{<<"72f9250618e65adc8cf4552758ffc982">>,0}},
         {9993,{<<"72f9250618e65adc8cf4552758ffbfdb">>,0}},
         {9992,{<<"72f9250618e65adc8cf4552758ffb1fc">>,0}},
         {9991,{<<"72f9250618e65adc8cf4552758ffae18">>,0}}]}
    (refuge@127.0.0.1)5>
    (refuge@127.0.0.1)5> couch_mrview:view_changes_since(Db, DDoc,
<<"all">>, 10000, F).
    {ok,[]}
    (refuge@127.0.0.1)7> couch_mrview:view_changes_since(Db, DDoc,
<<"all">>, 10000, F).
    {ok,[{10002,{<<"31b0c9f8353b6b3e2a86c23d2c000a78">>,0}}]}
5d240fa
Benoit Chesneau benoitc `_view` filter is now hanled by couch_mrview:view_changes_since/7
Rather than passing all doc sent y couch_db:changes_since/5 to a
transformed view function we are now using the more efficient
`couch_mrview:view_changes_since/7` wich use the seq btree in the view
group.

Also add the parameters `view_filter=DName/FilterName` which allows to
filter any view changes using the fiter function `FilterName` from the
design document `DName`:

   GET http://<node>/<dbname>/_changes?filter=_view&view=<viewname>&view_filter<filtername>

Where variables are:

- <node>: URL of the CouchDB node
- <dbname>: Name of the database
- <viewname>: DesignId/ViewNameThe name of the view in the views properties
  from the design document DesignId`
- <filtername>: DesignId1/FilterName The name of the filter in
  the filters properties from the design document DesignId1

The difference with the old behaviour is that you really accessing the
view index instead of passing each map function to all the changed docs
wich is a way more efficient (you do'nt open a system process, serialize
the function, pass it to the process, wait for return, test it and
eventually return a change). You can also filter the results. Other
advantages are that the views are refreshed in real time when the
database content changes and the view changes are only triggered if the
index is updated.
ba9246a
Benoit Chesneau benoitc Get view changes using a view query.
Add the possibility to the function `couch_mrview:view_changes_since` to
accept some **view query parameters** like `key`, `start_key` &
`end_key`. Instead of fetching all changes in a view from last updated
it will fetch changes only from the results of the query.  It returns an
error  if the option `seq_indexed` hasn't been set to `true` in the
view options:

        {
            "_id": "_design/somedoc",
            "views": {
                "someview": {
                    "map": "function(doc) { if (doc.field)
emit(doc.field, null); }",
                    "options": {
                        "seq_indexed": "true"
                    }
                }
            }
        }

This is done by adding an index (b-tree) to each view. This index
associate the view key with the doc sequence. `{{Key, Seq}, {DocId, Value}}`
is stored in this btree.
c681e1c
Benoit Chesneau benoitc add support for `include_deleted: true` option in a design document.
This option allows a view to map deleted documents.

Remember that documents deleted with the DELETE HTTP verb will look like
{_id: id, _rev: rev, _deleted: true} to the indexer. If you want to
store extra data on the deleted document you can use _bulk_docs or
updating a document with the member '_deleted: true' using the HTTP verb
PUT.
e4b75f6
Benoit Chesneau benoitc fix _changes test
The `_view` filter now need a view group indexed by sequences.
282671e
Benoit Chesneau benoitc fix imports fce9e24
Benoit Chesneau benoitc change order 37efe00
Benoit Chesneau benoitc fix include: 950bde5
Benoit Chesneau benoitc Revert "fix include:"
This reverts commit ed0076d3fdbf2b2cf28acb204f48fe07db65440f.
deb4968
Benoit Chesneau benoitc fix purge 93e1987
Benoit Chesneau benoitc filter fields in included doc when returned from a _changes
With this changes you can only return a specified list of fields in the
included docs by passing the list of them to an optionnal `fields`
parameter. For now nested fields are ignored.

    $ curl -XPUT localhost:5984/testdb
    {"ok":true}
    $ curl -XPUT localhost:5984/testdb/test -d'{"f1": 1, "f2": 2, "f3":
3}'
    {"ok":true,"id":"test","rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}
    $ curl -XPUT localhost:5984/testdb/test1 -d'{"f1": 1, "f2": 4, "f3":
3}'
    {"ok":true,"id":"test1","rev":"1-2942771a428c4b45315710db0b6aaa7a"}
    $ curl -XPUT localhost:5984/testdb/test2 -d'{"f1": 1, "f2": 7, "f3":
8}'
    {"ok":true,"id":"test2","rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}
    $ curl "http://localhost:5984/testdb/_changes?include_docs=true"
    {"results":[
    {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2,"f3":3}},
    {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4,"f3":3}},
    {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7,"f3":8}}
    ],
    "last_seq":3}
    $ curl
'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1"\]'
    {"results":[
    {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1}},
    {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1}},
    {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1}}
    ],
    "last_seq":3}

    $ curl
'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1","f2"\]'
    {"results":[
    {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2}},
    {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4}},
    {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7}}
    ],
    "last_seq":3}
ebab8b6
benoitc added some commits
Benoit Chesneau benoitc add a test for changes eventsource feed. 16db793
Benoit Chesneau benoitc fix whitespaces 4c9bafc
Benoit Chesneau benoitc make sure the changes test works on all browsers 56bb344
Benoit Chesneau benoitc Merge branch 'master' into view_changes
Conflicts:
	apps/couch_changes/include/couch_changes.hrl
	apps/couch_changes/src/couch_changes.erl
	apps/couch_changes/src/couch_httpd_changes.erl
	apps/couch_replicator/src/couch_replicator_api_wrap.erl
e942d68
Benoit Chesneau
Owner

applied in las head

Benoit Chesneau benoitc closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 11, 2012
  1. Benoit Chesneau

    Add `couch_mrview:view_changes_since` function.

    benoitc authored
    This functions fetch all changes in a view since last update like the
    `couch_db:changes_since` function for the databases. It returns an error
    if the option `seq_indexed` hasn't been set to `true` in the design
    document:
    
        {
            "_id": "_design/somedoc",
            "options": {
                "seq_indexed": "true"
            },
            "views": {
                "someview": {
                    "map": "function(doc) { if (doc.field) emit(doc.field, null); }"
                }
            }
        }
    
    This is done by adding an index to the view group (a view group is the
    global index associated to a design doc basically). This index associate
    each sequences & documents ID to a view . `{{ViewId, Seq}, DocId} is
    stored in the seq_btree, DocId is the value.
    
    example of usage:
    
        (refuge@127.0.0.1)3> F = fun(KV, Acc) -> {ok, [KV|Acc]} end.
        (refuge@127.0.0.1)4>
        (refuge@127.0.0.1)4> couch_mrview:view_changes_since(Db, DDoc,
    <<"all">>, 9990, F).
        [info] [<0.220.0>] Opening index for db: testdb idx: _design/test
    sig:
        "85b8fa1e39dc7fbe8b11b729b319c6bf"
        {ok,[{10000,{<<"72f9250618e65adc8cf4552758fff137">>,0}},
             {9999,{<<"72f9250618e65adc8cf4552758ffe85d">>,0}},
             {9998,{<<"72f9250618e65adc8cf4552758ffdbcf">>,0}},
             {9997,{<<"72f9250618e65adc8cf4552758ffd65e">>,0}},
             {9996,{<<"72f9250618e65adc8cf4552758ffd611">>,0}},
             {9995,{<<"72f9250618e65adc8cf4552758ffd045">>,0}},
             {9994,{<<"72f9250618e65adc8cf4552758ffc982">>,0}},
             {9993,{<<"72f9250618e65adc8cf4552758ffbfdb">>,0}},
             {9992,{<<"72f9250618e65adc8cf4552758ffb1fc">>,0}},
             {9991,{<<"72f9250618e65adc8cf4552758ffae18">>,0}}]}
        (refuge@127.0.0.1)5>
        (refuge@127.0.0.1)5> couch_mrview:view_changes_since(Db, DDoc,
    <<"all">>, 10000, F).
        {ok,[]}
        (refuge@127.0.0.1)7> couch_mrview:view_changes_since(Db, DDoc,
    <<"all">>, 10000, F).
        {ok,[{10002,{<<"31b0c9f8353b6b3e2a86c23d2c000a78">>,0}}]}
  2. Benoit Chesneau

    `_view` filter is now hanled by couch_mrview:view_changes_since/7

    benoitc authored
    Rather than passing all doc sent y couch_db:changes_since/5 to a
    transformed view function we are now using the more efficient
    `couch_mrview:view_changes_since/7` wich use the seq btree in the view
    group.
    
    Also add the parameters `view_filter=DName/FilterName` which allows to
    filter any view changes using the fiter function `FilterName` from the
    design document `DName`:
    
       GET http://<node>/<dbname>/_changes?filter=_view&view=<viewname>&view_filter<filtername>
    
    Where variables are:
    
    - <node>: URL of the CouchDB node
    - <dbname>: Name of the database
    - <viewname>: DesignId/ViewNameThe name of the view in the views properties
      from the design document DesignId`
    - <filtername>: DesignId1/FilterName The name of the filter in
      the filters properties from the design document DesignId1
    
    The difference with the old behaviour is that you really accessing the
    view index instead of passing each map function to all the changed docs
    wich is a way more efficient (you do'nt open a system process, serialize
    the function, pass it to the process, wait for return, test it and
    eventually return a change). You can also filter the results. Other
    advantages are that the views are refreshed in real time when the
    database content changes and the view changes are only triggered if the
    index is updated.
  3. Benoit Chesneau

    Get view changes using a view query.

    benoitc authored
    Add the possibility to the function `couch_mrview:view_changes_since` to
    accept some **view query parameters** like `key`, `start_key` &
    `end_key`. Instead of fetching all changes in a view from last updated
    it will fetch changes only from the results of the query.  It returns an
    error  if the option `seq_indexed` hasn't been set to `true` in the
    view options:
    
            {
                "_id": "_design/somedoc",
                "views": {
                    "someview": {
                        "map": "function(doc) { if (doc.field)
    emit(doc.field, null); }",
                        "options": {
                            "seq_indexed": "true"
                        }
                    }
                }
            }
    
    This is done by adding an index (b-tree) to each view. This index
    associate the view key with the doc sequence. `{{Key, Seq}, {DocId, Value}}`
    is stored in this btree.
  4. Benoit Chesneau

    add support for `include_deleted: true` option in a design document.

    benoitc authored
    This option allows a view to map deleted documents.
    
    Remember that documents deleted with the DELETE HTTP verb will look like
    {_id: id, _rev: rev, _deleted: true} to the indexer. If you want to
    store extra data on the deleted document you can use _bulk_docs or
    updating a document with the member '_deleted: true' using the HTTP verb
    PUT.
  5. Benoit Chesneau

    fix _changes test

    benoitc authored
    The `_view` filter now need a view group indexed by sequences.
  6. Benoit Chesneau

    fix imports

    benoitc authored
  7. Benoit Chesneau

    change order

    benoitc authored
  8. Benoit Chesneau

    fix include:

    benoitc authored
  9. Benoit Chesneau

    Revert "fix include:"

    benoitc authored
    This reverts commit ed0076d3fdbf2b2cf28acb204f48fe07db65440f.
  10. Benoit Chesneau

    fix purge

    benoitc authored
Commits on May 12, 2012
  1. Benoit Chesneau

    filter fields in included doc when returned from a _changes

    benoitc authored
    With this changes you can only return a specified list of fields in the
    included docs by passing the list of them to an optionnal `fields`
    parameter. For now nested fields are ignored.
    
        $ curl -XPUT localhost:5984/testdb
        {"ok":true}
        $ curl -XPUT localhost:5984/testdb/test -d'{"f1": 1, "f2": 2, "f3":
    3}'
        {"ok":true,"id":"test","rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}
        $ curl -XPUT localhost:5984/testdb/test1 -d'{"f1": 1, "f2": 4, "f3":
    3}'
        {"ok":true,"id":"test1","rev":"1-2942771a428c4b45315710db0b6aaa7a"}
        $ curl -XPUT localhost:5984/testdb/test2 -d'{"f1": 1, "f2": 7, "f3":
    8}'
        {"ok":true,"id":"test2","rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}
        $ curl "http://localhost:5984/testdb/_changes?include_docs=true"
        {"results":[
        {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2,"f3":3}},
        {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4,"f3":3}},
        {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7,"f3":8}}
        ],
        "last_seq":3}
        $ curl
    'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1"\]'
        {"results":[
        {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1}},
        {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1}},
        {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1}}
        ],
        "last_seq":3}
    
        $ curl
    'http://localhost:5984/testdb/_changes?include_docs=true&fields=\["f1","f2"\]'
        {"results":[
        {"seq":1,"id":"test","changes":[{"rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478"}],"doc":{"_id":"test","_rev":"1-2b23983c05a9ca8015b6ea7bdc1b3478","f1":1,"f2":2}},
        {"seq":2,"id":"test1","changes":[{"rev":"1-2942771a428c4b45315710db0b6aaa7a"}],"doc":{"_id":"test1","_rev":"1-2942771a428c4b45315710db0b6aaa7a","f1":1,"f2":4}},
        {"seq":3,"id":"test2","changes":[{"rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3"}],"doc":{"_id":"test2","_rev":"1-ac4cd62a83a0a98421e9ed74adcadcc3","f1":1,"f2":7}}
        ],
        "last_seq":3}
Commits on May 16, 2012
  1. Benoit Chesneau
  2. Benoit Chesneau

    fix whitespaces

    benoitc authored
  3. Benoit Chesneau
Commits on May 26, 2012
  1. Benoit Chesneau

    Merge branch 'master' into view_changes

    benoitc authored
    Conflicts:
    	apps/couch_changes/include/couch_changes.hrl
    	apps/couch_changes/src/couch_changes.erl
    	apps/couch_changes/src/couch_httpd_changes.erl
    	apps/couch_replicator/src/couch_replicator_api_wrap.erl
This page is out of date. Refresh to see the latest.
2  apps/couch_changes/include/couch_changes.hrl
View
@@ -21,6 +21,8 @@
filter = "",
filter_fun,
filter_args = [],
+ filter_view,
+ view_args = [],
include_docs = false,
fields = [],
conflicts = false,
320 apps/couch_changes/src/couch_changes.erl
View
@@ -11,9 +11,11 @@
% the License.
-module(couch_changes).
+
-include("couch_changes.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_httpd/include/couch_httpd.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-export([handle_changes/3]).
@@ -38,16 +40,26 @@
}).
%% @type Req -> #httpd{} | {json_req, JsonObj()}
-handle_changes(Args1, Req, Db0) ->
+handle_changes(Args0, Req, Db0) ->
#changes_args{
style = Style,
filter = FilterName,
feed = Feed,
dir = Dir,
since = Since
- } = Args1,
+ } = Args0,
{FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
- Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+ Args1 = Args0#changes_args{filter_fun = FilterFun,
+ filter_args = FilterArgs},
+
+ Args = case FilterName of
+ "_view" ->
+ ViewArgs = parse_view_args(Req),
+ Args1#changes_args{filter_view=parse_view_param(Req),
+ view_args=ViewArgs#mrargs{direction=Dir}};
+ _ ->
+ Args1
+ end,
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
StartSeq = case Dir of
@@ -70,17 +82,12 @@ handle_changes(Args1, Req, Db0) ->
true ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
- Self = self(),
- {ok, Notify} = couch_db_update_notifier:start_link(
- fun({_, DbName}) when Db0#db.name == DbName ->
- Self ! db_updated;
- (_) ->
- ok
- end
- ),
+ ConsumerFun = make_update_fun(Db0, FilterName, Args),
+ {ok, Notify} = couch_db_update_notifier:start_link(ConsumerFun),
{Db, StartSeq} = Start(),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+
Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
<<"">>, Timeout, TimeoutFun),
try
@@ -110,6 +117,49 @@ handle_changes(Args1, Req, Db0) ->
end
end.
+make_update_fun(#db{name=DbName0}, "_view",
+ #changes_args{filter_view={DName, VName}}) ->
+ Self = self(),
+
+ DesignId = <<"_design/", DName/binary>>,
+ fun
+ ({view_updated, {DbName, IdxName}})
+ when DbName0 == DbName, IdxName == DesignId ->
+ Self ! db_updated;
+ ({ddoc_updated, {DbName, DDocId}})
+ when DbName0 == DbName, DDocId == DesignId ->
+ Self ! ddoc_updated;
+ ({updated, DbName}) when DbName0 == DbName ->
+ spawn(fun() ->
+ case couch_db:open_int(DbName0, []) of
+ {ok, Db} ->
+ try
+ view_trigger(Db, DesignId, VName)
+ catch
+ _:_ -> Self ! ddoc_updated
+ end;
+ _Else ->
+ ?LOG_INFO("db error ~p~n", [_Else]),
+ Self ! ddoc_updated
+ end
+ end);
+ (_Else) ->
+ ok
+ end;
+make_update_fun(Db0, _FilterName, _ChangeArgs) ->
+ Self = self(),
+ fun
+ ({_, DbName}) when Db0#db.name == DbName ->
+ Self ! db_updated;
+ (_) ->
+ ok
+ end.
+
+view_trigger(Db, DesignId, VName) ->
+ DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
+ %% try to update view if the db has been updated.
+ couch_mrview:query_view(Db, DDoc, VName, #mrargs{limit=0}).
+
get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
Pair;
get_callback_acc(Callback) when is_function(Callback, 2) ->
@@ -158,7 +208,6 @@ os_filter_fun(FilterName, Style, Req, Db) ->
parse_view_param({json_req, {Props}}) ->
{Params} = couch_util:get_value(<<"query">>, Props),
parse_view_param1(couch_util:get_value(<<"view">>, Params));
-
parse_view_param(Req) ->
parse_view_param1(?l2b(couch_httpd:qs_value(Req, "view", ""))).
@@ -182,9 +231,14 @@ builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
{filter_docids(DocIds, Style), DocIds};
builtin_filter_fun("_design", Style, _Req, _Db) ->
{filter_designdoc(Style), []};
+builtin_filter_fun("_view", Style, {json_req, {Props}}=Req, Db) ->
+ {Params} = couch_util:get_value(<<"query">>, Props),
+ FilterName = ?b2l(couch_util:get_value(<<"view_filter">>, Params,
+ <<"">>)),
+ {os_filter_fun(FilterName, Style, Req, Db), []};
builtin_filter_fun("_view", Style, Req, Db) ->
- {DName, VName} = parse_view_param(Req),
- {filter_view({DName, VName} , Style, Db), []};
+ FilterName = couch_httpd:qs_value(Req, "view_filter", ""),
+ {os_filter_fun(FilterName, Style, Req, Db), []};
builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
throw({bad_request, "unknown builtin filter name"}).
@@ -208,31 +262,6 @@ filter_designdoc(Style) ->
end
end.
-filter_view({DName, VName}, Style, Db) ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
- fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_view(
- DDoc, VName, Docs
- ),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
- end.
-
builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
case Style of
main_only ->
@@ -306,7 +335,9 @@ send_changes(Args, Acc0, FirstRound) ->
#changes_args{
dir = Dir,
filter = FilterName,
- filter_args = FilterArgs
+ filter_args = FilterArgs,
+ filter_view = View,
+ view_args = ViewArgs
} = Args,
#changes_acc{
db = Db,
@@ -321,13 +352,22 @@ send_changes(Args, Acc0, FirstRound) ->
"_design" ->
send_changes_design_docs(
Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+ "_view" ->
+ send_changes_view(
+ Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0);
_ ->
couch_db:changes_since(
Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
end;
false ->
- couch_db:changes_since(
- Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ case FilterName of
+ "_view" ->
+ send_changes_view(
+ Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0);
+ _ ->
+ couch_db:changes_since(
+ Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ end
end.
@@ -353,6 +393,14 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+send_changes_view(Db, {DName, VName}, StartSeq, Args, Fun, Acc0) ->
+ DesignId = <<"_design/", DName/binary>>,
+ DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil,
+ [ejson_body]),
+ couch_mrview:view_changes_since(
+ Db, DDoc, VName, StartSeq, Fun, Args, Acc0).
+
+
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
FoldFun = case Dir of
fwd ->
@@ -408,40 +456,47 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
db_open_options = DbOptions
} = Args,
- {ok, ChangesAcc} = send_changes(
- Args#changes_args{dir=fwd},
- Acc0,
- FirstRound),
- #changes_acc{
- db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
- seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
- } = ChangesAcc,
-
- couch_db:close(Db),
- if Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
- true ->
- case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
- {updated, UserAcc4} ->
- DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
- case couch_db:open(Db#db.name, DbOptions1) of
- {ok, Db2} ->
- keep_sending_changes(
- Args#changes_args{limit=NewLimit},
- ChangesAcc#changes_acc{
- db = Db2,
- user_acc = UserAcc4,
- seq = EndSeq,
- prepend = Prepend2,
- timeout = Timeout,
- timeout_fun = TimeoutFun},
- false);
- _Else ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
- end;
- {stop, UserAcc4} ->
- end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
- end
+ case send_changes(Args#changes_args{dir=fwd}, Acc0, FirstRound) of
+ {ok, ChangesAcc} ->
+ #changes_acc{
+ db = Db, callback = Callback, timeout = Timeout,
+ timeout_fun = TimeoutFun, seq = EndSeq, prepend = Prepend2,
+ user_acc = UserAcc2, limit = NewLimit
+ } = ChangesAcc,
+
+ couch_db:close(Db),
+ if Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ true ->
+ case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc4} ->
+ DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+ case couch_db:open(Db#db.name, DbOptions1) of
+ {ok, Db2} ->
+ keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc2, EndSeq,
+ ResponseType)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ end
+ end;
+ Error ->
+ ?LOG_ERROR("Error while getting new changes: ~p~n", [Error]),
+ #changes_acc{
+ callback = Callback, seq = EndSeq, user_acc = UserAcc2
+ } = Acc0,
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
end.
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
@@ -503,6 +558,40 @@ changes_enumerator(DocInfo, Acc) ->
user_acc = UserAcc2, limit = Limit - 1}}
end.
+view_changes_enumerator({{_ViewId, _Seq}, {Id, _V}}, Acc) ->
+ view_changes_enumerator1(Id, Acc);
+view_changes_enumerator({{_ViewId, _Seq}, Id}, Acc) ->
+ view_changes_enumerator1(Id, Acc).
+
+view_changes_enumerator1(Id, Acc) ->
+ #changes_acc{
+ filter = FilterFun, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun
+ } = Acc,
+ {ok, DocInfo} = couch_db:get_doc_info(Db, Id),
+ #doc_info{high_seq = Seq} = DocInfo,
+ Results0 = FilterFun(Db, DocInfo),
+ Results = [Result || Result <- Results0, Result /= null],
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ ChangesRow = changes_row(Results, DocInfo, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>,
+ user_acc = UserAcc2, limit = Limit - 1}}
+ end.
+
changes_row(Results, DocInfo, Acc) ->
#doc_info{
@@ -542,7 +631,9 @@ deleted_item(_) -> [].
wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
receive
db_updated ->
- get_rest_db_updated(UserAcc)
+ get_rest_db_updated(UserAcc);
+ ddoc_updated ->
+ {stop, UserAcc}
after Timeout ->
{Go, UserAcc2} = TimeoutFun(UserAcc),
case Go of
@@ -561,6 +652,7 @@ get_rest_db_updated(UserAcc) ->
{updated, UserAcc}
end.
+
reset_heartbeat() ->
case get(last_changes_heartbeat) of
undefined ->
@@ -612,3 +704,77 @@ filter_doc_fields([Field|Rest], {Props}=Doc, Acc) ->
[{Field, Value} | Acc]
end,
filter_doc_fields(Rest, Doc, Acc1).
+
+parse_view_args({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
+ parse_view_args1(Query, #mrargs{});
+parse_view_args(Req) ->
+ couch_mrview_http:parse_qs(Req, []).
+
+
+parse_view_args1([], Args) ->
+ Args;
+parse_view_args1([{Key, Val} | Rest], Args) ->
+ Args1 = case Key of
+ <<"reduce">> ->
+ Args#mrargs{reduce=Val};
+ <<"key">> ->
+ Args#mrargs{start_key=Val, end_key=Val};
+ <<"keys">> ->
+ Args#mrargs{keys=Val};
+ <<"startkey">> ->
+ Args#mrargs{start_key=Val};
+ <<"start_key">> ->
+ Args#mrargs{start_key=Val};
+ <<"startkey_docid">> ->
+ Args#mrargs{start_key_docid=Val};
+ <<"start_key_doc_id">> ->
+ Args#mrargs{start_key_docid=Val};
+ <<"endkey">> ->
+ Args#mrargs{end_key=Val};
+ <<"end_key">> ->
+ Args#mrargs{end_key=Val};
+ <<"endkey_docid">> ->
+ Args#mrargs{end_key_docid=Val};
+ <<"end_key_doc_id">> ->
+ Args#mrargs{end_key_docid=Val};
+ <<"limit">> ->
+ Args#mrargs{limit=Val};
+ <<"count">> ->
+ throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+ <<"stale">> when Val == <<"ok">> ->
+ Args#mrargs{stale=ok};
+ <<"stale">> when Val == <<"update_after">> ->
+ Args#mrargs{stale=update_after};
+ <<"stale">> ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ <<"descending">> ->
+ case Val of
+ true -> Args#mrargs{direction=rev};
+ _ -> Args#mrargs{direction=fwd}
+ end;
+ <<"skip">> ->
+ Args#mrargs{skip=Val};
+ <<"group">> ->
+ case Val of
+ true -> Args#mrargs{group_level=exact};
+ _ -> Args#mrargs{group_level=0}
+ end;
+ <<"group_level">> ->
+ Args#mrargs{group_level=Val};
+ <<"inclusive_end">> ->
+ Args#mrargs{inclusive_end=Val};
+ <<"include_docs">> ->
+ Args#mrargs{include_docs=Val};
+ <<"update_seq">> ->
+ Args#mrargs{update_seq=Val};
+ <<"conflicts">> ->
+ Args#mrargs{conflicts=Val};
+ <<"list">> ->
+ Args#mrargs{list=Val};
+ <<"callback">> ->
+ Args#mrargs{callback=Val};
+ _ ->
+ Args#mrargs{extra=[{Key, Val} | Args#mrargs.extra]}
+ end,
+ parse_view_args1(Rest, Args1).
3  apps/couch_httpd/share/www/script/test/changes.js
View
@@ -227,7 +227,8 @@ couchTests.changes = function(debug) {
"conflicted" : "function(doc, req) { return (doc._conflicts);}"
},
options : {
- local_seq : true
+ local_seq : true,
+ seq_indexed: true
},
views : {
local_seq : {
4 apps/couch_index/src/couch_index_updater.erl
View
@@ -120,11 +120,13 @@ update(Idx, Mod, IdxState) ->
UpdateOpts = Mod:get(update_options, IdxState),
CommittedOnly = lists:member(committed_only, UpdateOpts),
IncludeDesign = lists:member(include_design, UpdateOpts),
+ IncludeDeleted = lists:member(include_deleted, UpdateOpts),
DocOpts = case lists:member(local_seq, UpdateOpts) of
true -> [conflicts, deleted_conflicts, local_seq];
_ -> [conflicts, deleted_conflicts]
end,
+
couch_util:with_db(DbName, fun(Db) ->
DbUpdateSeq = couch_db:get_update_seq(Db),
DbCommittedSeq = couch_db:get_committed_update_seq(Db),
@@ -146,7 +148,7 @@ update(Idx, Mod, IdxState) ->
case {IncludeDesign, DocId} of
{false, <<"_design/", _/binary>>} ->
{nil, Seq};
- _ when Deleted ->
+ _ when Deleted, IncludeDeleted /= true ->
{#doc{id=DocId, deleted=true}, Seq};
_ ->
{ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
7 apps/couch_mrview/include/couch_mrview.hrl
View
@@ -18,12 +18,14 @@
idx_name,
language,
design_opts=[],
+ seq_indexed=false,
+ include_deleted=false,
lib,
views,
id_btree=nil,
+ seq_btree=nil,
update_seq=0,
purge_seq=0,
-
first_build,
partial_resp_pid,
doc_acc,
@@ -35,12 +37,14 @@
-record(mrview, {
id_num,
+ seq_indexed=false,
update_seq=0,
purge_seq=0,
map_names=[],
reduce_funs=[],
def,
btree=nil,
+ seq_btree=nil,
options=[]
}).
@@ -49,6 +53,7 @@
seq=0,
purge_seq=0,
id_btree_state=nil,
+ seq_btree_state=nil,
view_states=nil
}).
71 apps/couch_mrview/src/couch_mrview.erl
View
@@ -17,6 +17,8 @@
-export([get_info/2]).
-export([compact/2, compact/3, cancel_compaction/2]).
-export([cleanup/1]).
+-export([view_changes_since/5, view_changes_since/6,
+ view_changes_since/7]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -86,6 +88,75 @@ query_view(Db, {Type, View}, Args, Callback, Acc) ->
red -> red_fold(Db, View, Args, Callback, Acc)
end.
+view_changes_since(Db, DDoc, VName, StartSeq, Callback) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback, #mrargs{}, []).
+
+view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, []).
+
+view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc)
+ when is_list(Args) ->
+ view_changes_since(Db, DDoc, VName, StartSeq, Callback, to_mrargs(Args),
+ Acc);
+
+view_changes_since(Db, DDoc, VName, StartSeq, Callback, Args, Acc) ->
+ #mrargs{direction=Dir} = Args,
+ EndSeq = case Dir of
+ fwd -> 16#10000000;
+ rev -> 0
+ end,
+ {ok, ViewId} = couch_mrview_util:get_view_id(Db, DDoc, VName),
+ {WithVSK, StartKey, EndKey} = couch_mrview_util:change_keys(ViewId, Args,
+ StartSeq, EndSeq),
+
+ Args1 = Args#mrargs{start_key=StartKey, end_key=EndKey},
+ case WithVSK of
+ false ->
+ {ok, _, State} = couch_mrview_util:get_view_state(Db, DDoc,
+ VName, Args1),
+ #mrst{seq_indexed=SeqIndexed, seq_btree=SeqBtree} = State,
+ case SeqIndexed of
+ false ->
+ {error, seqs_not_indexed};
+ _ ->
+ WrapperFun = fun
+ ({{_ViewID, _Seq}, _DocId}=KV, _Reds, Acc2) ->
+ Callback(KV, Acc2);
+ (_, _, Acc2) ->
+ {ok , Acc2}
+ end,
+
+ {ok, _R, AccOut} = couch_btree:fold(SeqBtree, WrapperFun, Acc,
+ [{start_key, StartKey}, {end_key, EndKey}, {dir, Dir}]),
+ {ok, AccOut}
+ end;
+ _ ->
+ {ok, {_Type, View, _}, _} = couch_mrview_util:get_view_state(Db,
+ DDoc, VName, Args1),
+ case View#mrview.seq_indexed of
+ false ->
+ {error, seqs_not_indexed};
+ _ ->
+ Acc0 = {Dir, StartSeq, Acc},
+ WrapperFun = fun
+ ({{_Key, Seq}, _DocId}=KV, _Reds, {fwd, LastSeq, Acc2})
+ when Seq > LastSeq, Seq =< EndSeq ->
+ {ok, Acc3} = Callback(KV, Acc2),
+ {ok, {fwd, Seq, Acc3}};
+ ({{_Key, Seq}, _DocId}=KV, _Reds, {D, LastSeq, Acc2})
+ when Seq < LastSeq, Seq >= EndSeq ->
+ {ok, Acc3} = Callback(KV, Acc2),
+ {ok, {D, Seq, Acc3}};
+ (_, _, Acc2) ->
+ {ok, Acc2}
+ end,
+ Opts = [{start_key, StartKey}, {end_key, EndKey}, {dir, Dir}],
+ {ok, _R, {_, _, AccOut}} = couch_btree:fold(View#mrview.seq_btree,
+ WrapperFun, Acc0, Opts),
+ {ok, AccOut}
+ end
+ end.
+
get_info(Db, DDoc) ->
{ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
39 apps/couch_mrview/src/couch_mrview_compactor.erl
View
@@ -19,6 +19,8 @@
-record(acc, {
btree = nil,
+ id_btree = nil,
+ seq_btree = nil,
last_id = nil,
kvs = [],
kvs_size = 0,
@@ -37,8 +39,9 @@ compact(State) ->
#mrst{
db_name=DbName,
idx_name=IdxName,
+ seq_indexed=SeqIndexed,
sig=Sig,
- update_seq=Seq,
+ update_seq=UpdateSeq,
id_btree=IdBtree,
views=Views
} = State,
@@ -51,6 +54,7 @@ compact(State) ->
#mrst{
id_btree = EmptyIdBtree,
+ seq_btree = EmptySeqBtree,
views = EmptyViews
} = EmptyState,
@@ -74,7 +78,8 @@ compact(State) ->
BufferSize = list_to_integer(BufferSize0),
FoldFun = fun({DocId, _} = KV, Acc) ->
- #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
+ #acc{id_btree = IdBt, seq_btree = SeqBt, kvs = Kvs, kvs_size =
+ KvsSize, last_id = LastId} = Acc,
if DocId =:= LastId ->
% COUCHDB-999 regression test
?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`"
@@ -86,20 +91,25 @@ compact(State) ->
KvsSize2 = KvsSize + ?term_size(KV),
case KvsSize2 >= BufferSize of
true ->
- {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
+ ToAdd = lists:reverse([KV | Kvs]),
+ {ok, IdBt2} = couch_btree:add(IdBt, ToAdd),
+ SeqBt2 = maybe_index_seqs(SeqBt, ToAdd, SeqIndexed),
Acc2 = update_task(Acc, 1 + length(Kvs)),
- {ok, Acc2#acc{
- btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
+ {ok, Acc2#acc{id_btree = IdBt2, seq_btree = SeqBt2, kvs = [],
+ kvs_size = 0, last_id = DocId}};
_ ->
{ok, Acc#acc{
kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
end
end,
- InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
+ InitAcc = #acc{total_changes = TotalChanges, id_btree =
+ EmptyIdBtree, seq_btree=EmptySeqBtree},
{ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc),
- #acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
- {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
+ #acc{id_btree = IdBt3, seq_btree= SeqBt3, kvs = Uncopied} = FinalAcc,
+ Uncopied1 = lists:reverse(Uncopied),
+ {ok, NewIdBtree} = couch_btree:add(IdBt3, Uncopied1),
+ NewSeqBtree = maybe_index_seqs(SeqBt3, Uncopied1, SeqIndexed),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
{NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
@@ -109,8 +119,9 @@ compact(State) ->
unlink(EmptyState#mrst.fd),
{ok, EmptyState#mrst{
id_btree=NewIdBtree,
+ seq_btree=NewSeqBtree,
views=NewViews,
- update_seq=Seq
+ update_seq=UpdateSeq
}}.
@@ -170,5 +181,13 @@ swap_compacted(OldState, NewState) ->
unlink(OldState#mrst.fd),
couch_ref_counter:drop(OldState#mrst.refc),
{ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]),
-
+
{ok, NewState#mrst{refc=NewRefCounter}}.
+
+
+maybe_index_seqs(SeqBt, ToAdd, true) ->
+ SeqsToAdd = couch_mrview_util:to_seqkvs(ToAdd, []),
+ {ok, SeqBt2} = couch_btree:add(SeqBt, SeqsToAdd),
+ SeqBt2;
+maybe_index_seqs(SeqBt, _ToAdd, _) ->
+ SeqBt.
5 apps/couch_mrview/src/couch_mrview_index.erl
View
@@ -38,8 +38,11 @@ get(Property, State) ->
Opts = State#mrst.design_opts,
IncDesign = couch_util:get_value(<<"include_design">>, Opts, false),
LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false),
+ IncludeDeleted = couch_util:get_value(<<"include_deleted">>,
+ Opts, false),
if IncDesign -> [include_design]; true -> [] end
- ++ if LocalSeq -> [local_seq]; true -> [] end;
+ ++ if LocalSeq -> [local_seq]; true -> [] end
+ ++ if IncludeDeleted -> [include_deleted]; true -> [] end;
info ->
#mrst{
fd = Fd,
188 apps/couch_mrview/src/couch_mrview_updater.erl
View
@@ -54,33 +54,58 @@ start_update(Partial, State, NumChanges) ->
purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
#mrst{
+ seq_indexed=SeqIndexed,
id_btree=IdBtree,
+ seq_btree=SeqBtree,
views=Views
} = State,
Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
{ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+ SeqBtree2 = case SeqIndexed of
+ true ->
+ PurgeSeqs = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <-
+ couch_mrview_util:to_seqkvs(Lookups, [])],
+ {ok, SeqBtree1} = couch_btree:add_remove(SeqBtree, [], PurgeSeqs),
+ SeqBtree1;
+ _ ->
+ nil
+ end,
+
MakeDictFun = fun
- ({ok, {DocId, ViewNumRowKeys}}, DictAcc) ->
- FoldFun = fun({ViewNum, RowKey}, DictAcc2) ->
- dict:append(ViewNum, {RowKey, DocId}, DictAcc2)
+ ({ok, {DocId, {Seq, ViewNumRowKeys}}}, DictAcc) ->
+ FoldFun = fun({ViewNum, RowKey}, {Acc, SAcc}) ->
+ Acc2 = dict:append(ViewNum, {RowKey, DocId}, Acc),
+ SAcc2 = dict:append(ViewNum, {RowKey, Seq}, SAcc),
+ {Acc2, SAcc2}
end,
lists:foldl(FoldFun, DictAcc, ViewNumRowKeys);
({not_found, _}, DictAcc) ->
DictAcc
end,
- KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups),
+ {KeysToRem, SKeysToRem} = lists:foldl(MakeDictFun, {dict:new(),
+ dict:new()}, Lookups),
- RemKeysFun = fun(#mrview{id_num=Num, btree=Btree}=View) ->
- case dict:find(Num, KeysToRemove) of
+ RemKeysFun = fun(View) ->
+ #mrview{id_num=Num, btree=Btree, seq_btree=SBtree}=View,
+ case dict:find(Num, KeysToRem) of
{ok, RemKeys} ->
{ok, Btree2} = couch_btree:add_remove(Btree, [], RemKeys),
+ {ok, SBtree2} = case View#mrview.seq_indexed of
+ false ->
+ {ok, nil};
+ _ ->
+ {ok, RemSKeys} = dict:find(Num, SKeysToRem),
+ couch_btree:add_remove(SBtree, [], RemSKeys)
+ end,
+
NewPurgeSeq = case Btree2 /= Btree of
true -> PurgeSeq;
_ -> View#mrview.purge_seq
end,
- View#mrview{btree=Btree2, purge_seq=NewPurgeSeq};
+ View#mrview{btree=Btree2, seq_btree=SBtree2,
+ purge_seq=NewPurgeSeq};
error ->
View
end
@@ -89,17 +114,18 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
Views2 = lists:map(RemKeysFun, Views),
{ok, State#mrst{
id_btree=IdBtree2,
+ seq_btree=SeqBtree2,
views=Views2,
purge_seq=PurgeSeq
}}.
-
process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 ->
couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)),
process_doc(Doc, Seq, State#mrst{doc_acc=[]});
process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) ->
{ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}};
-process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) ->
+process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc,
+ include_deleted=false}=State) ->
{ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}};
process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
{ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}.
@@ -144,36 +170,44 @@ compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
% Run all the non deleted docs through the view engine and
% then pass the results on to the writer process.
DocFun = fun
- ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
- ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
- ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
- {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
+ ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel, AccDocSeq}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, AccNotDel, AccDocSeq};
+ ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel, AccDocSeq}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, Seq, []} | AccDel],
+ AccNotDel, AccDocSeq};
+ ({Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel, AccDocSec}) ->
+ {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel], [{Id,
+ Seq} | AccDocSec]}
end,
FoldFun = fun(Docs, Acc) ->
lists:foldl(DocFun, Acc, Docs)
end,
- {MaxSeq, DeletedResults, Docs} =
- lists:foldl(FoldFun, {0, [], []}, Dequeued),
+ {MaxSeq, DeletedResults, Docs, DocSeqs} =
+ lists:foldl(FoldFun, {0, [], [], []}, Dequeued),
{ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
NotDeletedResults = lists:zipwith(
- fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
- Docs,
+ fun({Id, Seq}, MapResults) -> {Id, Seq, MapResults} end,
+ DocSeqs,
MapResultList),
AllMapResults = DeletedResults ++ NotDeletedResults,
update_task(length(AllMapResults)),
{ok, {MaxSeq, AllMapResults}}.
-write_results(Parent, State) ->
+write_results(Parent, #mrst{db_name=DbName, idx_name=IdxName}=State) ->
case couch_work_queue:dequeue(State#mrst.write_queue) of
closed ->
Parent ! {new_state, State};
{ok, Info} ->
EmptyKVs = [{V#mrview.id_num, []} || V <- State#mrst.views],
- {Seq, ViewKVs, DocIdKeys} = merge_results(Info, 0, EmptyKVs, []),
- NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
+ {Seq, ViewKVs, ViewSKVs, DocIdKeys} = merge_results(Info, 0,
+ EmptyKVs, EmptyKVs, []),
+
+ NewState = write_kvs(State, Seq, ViewKVs, ViewSKVs, DocIdKeys),
+
+ % notifify the view update
+ couch_db_update_notifier:notify({view_updated, {DbName, IdxName}}),
+
send_partial(NewState#mrst.partial_resp_pid, NewState),
write_results(Parent, NewState)
end.
@@ -190,28 +224,33 @@ start_query_server(State) ->
State#mrst{qserver=QServer}.
-merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
- {SeqAcc, ViewKVs, DocIdKeys};
-merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
- Fun = fun(RawResults, {VKV, DIK}) ->
- merge_results(RawResults, VKV, DIK)
+merge_results([], SeqAcc, ViewKVs, ViewSKVs, DocIdKeys) ->
+ {SeqAcc, ViewKVs, ViewSKVs, DocIdKeys};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, ViewSKVs, DocIdKeys) ->
+ Fun = fun(RawResults, {VKV, VSKV, DIK}) ->
+ merge_results(RawResults, VKV, VSKV, DIK)
end,
- {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
- merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
+ {ViewKVs1, ViewSKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs,
+ ViewSKVs, DocIdKeys}, Results),
+ merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, ViewSKVs1,
+ DocIdKeys1).
-merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
- {ViewKVs, [{DocId, []} | DocIdKeys]};
-merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
+merge_results({DocId, Seq, []}, ViewKVs, ViewSKVs, DocIdKeys) ->
+ {ViewKVs, ViewSKVs, [{DocId, {Seq, []}} | DocIdKeys]};
+merge_results({DocId, Seq, RawResults}, ViewKVs, ViewSKVs, DocIdKeys) ->
JsonResults = couch_query_servers:raw_to_ejson(RawResults),
Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
- {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
- {ViewKVs1, [ViewIdKeys | DocIdKeys]}.
+ {ViewKVs1, ViewSKVs1, ViewIdKeys} = insert_results(DocId, Seq,
+ Results, ViewKVs, ViewSKVs, [], [], []),
+ {ViewKVs1, ViewSKVs1, [ViewIdKeys | DocIdKeys]}.
-insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
- {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
-insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
+insert_results(DocId, Seq, [], [], [], ViewKVs, ViewSKVs, ViewIdKeys) ->
+ {lists:reverse(ViewKVs), lists:reverse(ViewSKVs), {DocId, {Seq,
+ ViewIdKeys}}};
+insert_results(DocId, Seq, [KVs | RKVs], [{Id, VKVs} | RVKVs], [{Id,
+ VSKVs} | RVSKVs], VKVAcc, VSKVAcc, VIdKeys) ->
CombineDupesFun = fun
({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) ->
{[{Key, {dups, [Val | Vals]}} | Rest], IdKeys};
@@ -223,54 +262,89 @@ insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
InitAcc = {[], VIdKeys},
{Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)),
FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
- insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
+ FinalSKVs = [{{Key, Seq}, {DocId, Val}} || {Key, Val} <- Duped] ++ VSKVs,
+ insert_results(DocId, Seq, RKVs, RVKVs, RVSKVs, [{Id, FinalKVs} | VKVAcc],
+ [{Id, FinalSKVs} | VSKVAcc], VIdKeys0).
-write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
+write_kvs(State, UpdateSeq, ViewKVs, ViewSKVs, DocIdKeys) ->
#mrst{
+ seq_indexed=SeqIndexed,
id_btree=IdBtree,
+ seq_btree=SeqBtree,
first_build=FirstBuild
} = State,
+
{ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
- ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
+ {ok, SeqBtree2} = case SeqIndexed of
+ true ->
+ ToRemBySeq = [{ViewId, Seq} || {{ViewId, Seq}, _Id} <-
+ couch_mrview_util:to_seqkvs(ToRemove, [])],
+ update_seq_btree(SeqBtree, DocIdKeys, ToRemBySeq);
+ _ ->
+ {ok, nil}
+ end,
+
+ {ToRemByView, ToRemByViewSeq} = collapse_rem_keys(ToRemove,
+ dict:new(), dict:new()),
- UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs}) ->
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs},
+ {ViewId, SKVs}) ->
ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
{ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
+
+ {ok, VSBtree2} = case View#mrview.seq_indexed of
+ true ->
+ SeqToRem = couch_util:dict_find(ViewId, ToRemByViewSeq, []),
+ couch_btree:add_remove(View#mrview.seq_btree, SKVs,
+ SeqToRem);
+ _ ->
+ {ok, nil}
+ end,
+
NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
true -> UpdateSeq;
_ -> View#mrview.update_seq
end,
- View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}
+ View#mrview{btree=VBtree2, seq_btree=VSBtree2,
+ update_seq=NewUpdateSeq}
end,
+ Views = lists:zipwith3(UpdateView, State#mrst.views, ViewKVs, ViewSKVs),
+
State#mrst{
- views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
+ views= Views,
update_seq=UpdateSeq,
- id_btree=IdBtree2
+ id_btree=IdBtree2,
+ seq_btree=SeqBtree2
}.
update_id_btree(Btree, DocIdKeys, true) ->
- ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+ ToAdd = [{Id, {Seq, Keys}} || {Id, {Seq, Keys}} <- DocIdKeys, Keys /= []],
couch_btree:query_modify(Btree, [], ToAdd, []);
update_id_btree(Btree, DocIdKeys, _) ->
ToFind = [Id || {Id, _} <- DocIdKeys],
- ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
- ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
+ ToAdd = [{Id, {Seq, Keys}} || {Id, {Seq, Keys}} <- DocIdKeys, Keys /= []],
+ ToRem = [Id || {Id, {_Seq, Keys}} <- DocIdKeys, Keys == []],
couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
-
-collapse_rem_keys([], Acc) ->
- Acc;
-collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
- NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
- dict:append(ViewId, {Key, DocId}, Acc2)
- end, Acc, ViewIdKeys),
- collapse_rem_keys(Rest, NewAcc);
-collapse_rem_keys([{not_found, _} | Rest], Acc) ->
- collapse_rem_keys(Rest, Acc).
+update_seq_btree(Btree, DocIdKeys, ToRemBySeq) ->
+ ToAdd = couch_mrview_util:to_seqkvs(DocIdKeys, []),
+ couch_btree:add_remove(Btree, ToAdd, ToRemBySeq).
+
+collapse_rem_keys([], Acc, SAcc) ->
+ {Acc, SAcc};
+collapse_rem_keys([{ok, {DocId, {Seq, ViewIdKeys}}} | Rest], Acc, SAcc) ->
+ {NewAcc, NewSAcc} = lists:foldl(fun({ViewId, Key}, {Acc2, SAcc2}) ->
+ Acc3 = dict:append(ViewId, {Key, DocId}, Acc2),
+ SAcc3 = dict:append(ViewId, {Key, Seq}, SAcc2),
+ {Acc3, SAcc3}
+ end, {Acc, SAcc}, ViewIdKeys),
+ collapse_rem_keys(Rest, NewAcc, NewSAcc);
+collapse_rem_keys([{not_found, _} | Rest], Acc, SAcc) ->
+ collapse_rem_keys(Rest, Acc, SAcc).
send_partial(Pid, State) when is_pid(Pid) ->
131 apps/couch_mrview/src/couch_mrview_util.erl
View
@@ -12,7 +12,7 @@
-module(couch_mrview_util).
--export([get_view/4]).
+-export([get_view/4, get_view_state/4]).
-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
-export([make_header/1]).
-export([index_file/2, compaction_file/2, open_file/1]).
@@ -24,14 +24,17 @@
-export([calculate_data_size/2]).
-export([validate_args/1]).
-export([maybe_load_doc/3, maybe_load_doc/4]).
+-export([to_seqkvs/2]).
+-export([change_keys/4]).
+-export([get_view_id/3]).
-define(MOD, couch_mrview_index).
+-define(MRFMT, 0.2).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-
-get_view(Db, DDoc, ViewName, Args0) ->
+get_view_state(Db, DDoc, ViewName, Args0) ->
ArgCheck = fun(InitState) ->
Args1 = set_view_type(Args0, ViewName, InitState#mrst.views),
{ok, validate_args(Args1)}
@@ -54,6 +57,12 @@ get_view(Db, DDoc, ViewName, Args0) ->
end,
#mrst{language=Lang, views=Views} = State,
{Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
+ {ok, {Type, View, Args3}, State}.
+
+
+get_view(Db, DDoc, ViewName, Args0) ->
+ {ok, {Type, View, Args3}, State} = get_view_state(Db, DDoc,
+ ViewName, Args0),
check_range(Args3, view_cmp(View)),
Sig = view_sig(Db, State, View, Args3),
{ok, {Type, View}, Sig, Args3}.
@@ -77,7 +86,10 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs],
{View#mrview.map_names, RedFuns}
end,
- View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs},
+ SeqIndexed = couch_util:get_value(<<"seq_indexed">>,
+ ViewOpts, false),
+ View2 = View#mrview{seq_indexed=SeqIndexed,
+ map_names=MapNames, reduce_funs=RedSrcs},
dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc);
undefined ->
DictBySrcAcc
@@ -93,6 +105,10 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
{DesignOpts} = couch_util:get_value(<<"options">>, Fields, {[]}),
{RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}),
+ SeqIndexed = couch_util:get_value(<<"seq_indexed">>, DesignOpts,
+ false),
+ IncludeDeleted = couch_util:get_value(<<"include_deleted">>, DesignOpts,
+ false),
IdxState = #mrst{
db_name=DbName,
@@ -100,12 +116,23 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
lib=Lib,
views=Views,
language=Language,
- design_opts=DesignOpts
+ design_opts=DesignOpts,
+ seq_indexed=SeqIndexed,
+ include_deleted=IncludeDeleted
},
- SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
+ SigInfo = {?MRFMT, Views, Language, DesignOpts,
+ couch_index_util:sort_lib(Lib)},
{ok, IdxState#mrst{sig=couch_util:md5(term_to_binary(SigInfo))}}.
+get_view_id(#db{name=DbName}, DDoc, VName) ->
+ {ok, #mrst{views=Views}} = ddoc_to_mrst(DbName, DDoc),
+ ViewNames = lists:foldl(fun(#mrview{id_num=Id, map_names=[Name|_]}, Acc) ->
+ dict:store(Name, Id, Acc)
+ end, dict:new(), Views),
+ dict:find(VName, ViewNames).
+
+
set_view_type(_Args, _ViewName, []) ->
throw({not_found, missing_named_view});
set_view_type(Args, ViewName, [View | Rest]) ->
@@ -165,21 +192,24 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
seq=0,
purge_seq=couch_db:get_purge_seq(Db),
id_btree_state=nil,
- view_states=[{nil, 0, 0} || _ <- Views]
+ seq_btree_state=nil,
+ view_states=[{nil, nil, 0, 0} || _ <- Views]
},
init_state(Db, Fd, State, Header);
init_state(Db, Fd, State, Header) ->
- #mrst{language=Lang, views=Views} = State,
+ #mrst{language=Lang, seq_indexed=SeqIndexed, views=Views} = State,
#mrheader{
seq=Seq,
purge_seq=PurgeSeq,
id_btree_state=IdBtreeState,
+ seq_btree_state=SeqBtreeState,
view_states=ViewStates
} = Header,
+
StateUpdate = fun
- ({_, _, _}=St) -> St;
- (St) -> {St, 0, 0}
+ ({_, _, _, _}=St) -> St;
+ (St) -> {St, nil, 0, 0}
end,
ViewStates2 = lists:map(StateUpdate, ViewStates),
@@ -191,6 +221,13 @@ init_state(Db, Fd, State, Header) ->
IdBtOpts = [{reduce, IdReduce}, {compression, couch_db:compression(Db)}],
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts),
+ {ok, SeqBtree} = case SeqIndexed of
+ true ->
+ SeqBtOpts = [{compression, couch_db:compression(Db)}],
+ couch_btree:open(SeqBtreeState, Fd, SeqBtOpts);
+ _ -> {ok, nil}
+ end,
+
OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
Views2 = lists:zipwith(OpenViewFun, ViewStates2, Views),
@@ -199,11 +236,11 @@ init_state(Db, Fd, State, Header) ->
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,
+ seq_btree=SeqBtree,
views=Views2
}.
-
-open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) ->
+open_view(Db, Fd, Lang, {BTState, SBTState, USeq, PSeq}, View) ->
FunSrcs = [FunSrc || {_Name, FunSrc} <- View#mrview.reduce_funs],
ReduceFun =
fun(reduce, KVs) ->
@@ -227,8 +264,19 @@ open_view(Db, Fd, Lang, {BTState, USeq, PSeq}, View) ->
{reduce, ReduceFun},
{compression, couch_db:compression(Db)}
],
+
{ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts),
- View#mrview{btree=Btree, update_seq=USeq, purge_seq=PSeq}.
+
+ {ok, SeqBtree} = case View#mrview.seq_indexed of
+ true ->
+ ViewSBtOpts = [{compression, couch_db:compression(Db)}],
+ couch_btree:open(SBTState, Fd, ViewSBtOpts);
+ _ ->
+ {ok, nil}
+ end,
+
+ View#mrview{btree=Btree, seq_btree=SeqBtree,
+ update_seq=USeq, purge_seq=PSeq}.
temp_view_to_ddoc({Props}) ->
@@ -479,24 +527,39 @@ view_cmp(View) ->
make_header(State) ->
#mrst{
+ seq_indexed=SeqIndexed,
update_seq=Seq,
purge_seq=PurgeSeq,
id_btree=IdBtree,
+ seq_btree=SeqBtree,
views=Views
} = State,
- ViewStates = [
- {
+
+ ViewStates = lists:foldr(fun(V, Acc) ->
+ SBTState = case V#mrview.seq_indexed of
+ true ->
+ couch_btree:get_state(V#mrview.seq_btree);
+ _ ->
+ nil
+ end,
+ [{
couch_btree:get_state(V#mrview.btree),
+ SBTState,
V#mrview.update_seq,
V#mrview.purge_seq
- }
- ||
- V <- Views
- ],
+ } | Acc]
+ end, [], Views),
+
+ SeqBtreeState = case SeqIndexed of
+ true -> couch_btree:get_state(SeqBtree);
+ _ -> nil
+ end,
+
#mrheader{
seq=Seq,
purge_seq=PurgeSeq,
id_btree_state=couch_btree:get_state(IdBtree),
+ seq_btree_state=SeqBtreeState,
view_states=ViewStates
}.
@@ -530,7 +593,7 @@ delete_index_file(DbName, Sig) ->
delete_compaction_file(DbName, Sig) ->
delete_file(compaction_file(DbName, Sig)).
-
+
delete_file(FName) ->
case filelib:is_file(FName) of
@@ -554,6 +617,7 @@ reset_state(State) ->
qserver=nil,
update_seq=0,
id_btree=nil,
+ seq_btree=nil,
views=[View#mrview{btree=nil} || View <- State#mrst.views]
}.
@@ -706,3 +770,30 @@ index_of(Key, [_ | Rest], Idx) ->
mrverror(Mesg) ->
throw({query_parse_error, Mesg}).
+
+to_seqkvs([], Acc) ->
+ lists:reverse(Acc);
+to_seqkvs([{not_found, _} | Rest], Acc) ->
+ to_seqkvs(Rest, Acc);
+to_seqkvs([{ok, DocIdKeys} | Rest], Acc) ->
+ to_seqkvs([DocIdKeys | Rest], Acc);
+to_seqkvs([{_Id, {_Seq, []}} | Rest], Acc) ->
+ to_seqkvs(Rest, Acc);
+to_seqkvs([{Id, {Seq, Keys}} | Rest], Acc0) ->
+ Acc1 = lists:foldl(fun({ViewId, _}, Acc) ->
+ [{{ViewId, Seq}, Id} | Acc]
+ end, Acc0, Keys),
+ to_seqkvs(Rest, Acc1).
+
+
+change_keys(ViewId, Args, StartSeq, EndSeq) ->
+ case {Args#mrargs.start_key, Args#mrargs.end_key} of
+ {undefined, undefined} ->
+ {false, {ViewId, StartSeq +1}, {ViewId, EndSeq}};
+ {SK, undefined} ->
+ {true, {SK, StartSeq +1}, {SK, EndSeq}};
+ {undefined, EK} ->
+ {true, {EK, StartSeq +1}, {EK, EndSeq}};
+ {SK, EK} ->
+ {true, {SK, StartSeq +1}, {EK, EndSeq}}
+ end.
10 apps/couch_replicator/src/couch_replicator_api_wrap.erl
View
@@ -21,6 +21,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_httpd/include/couch_httpd.hrl").
-include_lib("couch_changes/include/couch_changes.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-include("couch_replicator_api_wrap.hrl").
-export([
@@ -390,6 +391,8 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
undefined ->
BaseQS;
FilterName ->
+ ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)],
+ ViewFields = ["key" | ViewFields0],
{Params} = get_value(query_params, Options, {[]}),
[{"filter", ?b2l(FilterName)} | lists:foldl(
fun({K, V}, QSAcc) ->
@@ -397,6 +400,13 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
case lists:keymember(Ks, 1, QSAcc) of
true ->
QSAcc;
+ false when FilterName =:= <<"_view">> ->
+ V1 = case lists:member(Ks, ViewFields) of
+ true ->
+ ?JSON_ENCODE(V);
+ false -> couch_util:to_list(V)
+ end,
+ [{Ks, V1} | QSAcc];
false ->
[{Ks, couch_util:to_list(V)} | QSAcc]
end
1  apps/couch_replicator/src/couch_replicator_notifier.erl
View
@@ -22,6 +22,7 @@
-export([handle_event/2, handle_call/2, handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_changes/include/couch_changes.hrl").
start_link(FunAcc) ->
couch_event_sup:start_link(couch_replication,
Something went wrong with that request. Please try again.