Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

`_changes` is now supporting view index updatei events

if the _view filters is passed in the url, the request process subscribe
to the index updates using the new `couch_mrview_events:subscribe/3`
function. All events are then returned to the HTTP api.

Ex:

    $ curl
'http://localhost:5984/testdb/_changes?filter=_view&view=test/test&feed=eventsource&heartbeat=true'
    data:
{"seq":1,"id":"330c284b190e1af436eab034040007cf","changes":[{"rev":"1-ea7a185b492abc69a6c8e0358d244a98"}]}
    id: 1

    data:
{"seq":4,"id":"330c284b190e1af436eab03404001904","changes":[{"rev":"1-e930be0936bead4354f6a20203e9a9dc"}]}
    id: 4

    data:
{"seq":6,"id":"9ec518da154ddc28420682144e000844","changes":[{"rev":"1-2fa51075d2e9812eaaabf98ababfdd3a"}]}
    id: 6
  • Loading branch information...
commit 64ba09f465ce67e0c597a78a7f7f10ce5b5385ae 1 parent 74c933d
@benoitc benoitc authored
View
2  apps/couch_changes/include/couch_changes.hrl
@@ -21,6 +21,8 @@
filter = "",
filter_fun,
filter_args = [],
+ filter_view,
+ view_args = [],
include_docs = false,
fields = [],
conflicts = false,
View
300 apps/couch_changes/src/couch_changes.erl
@@ -14,6 +14,7 @@
-include("couch_changes.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_httpd/include/couch_httpd.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
-export([handle_changes/3]).
@@ -38,16 +39,26 @@
}).
%% @type Req -> #httpd{} | {json_req, JsonObj()}
-handle_changes(Args1, Req, Db0) ->
+handle_changes(Args0, Req, Db0) ->
#changes_args{
style = Style,
filter = FilterName,
feed = Feed,
dir = Dir,
since = Since
- } = Args1,
+ } = Args0,
{FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
- Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+ Args1 = Args0#changes_args{filter_fun = FilterFun,
+ filter_args = FilterArgs},
+ Args = case FilterName of
+ "_view" ->
+ ViewArgs = parse_view_args(Req),
+ Args1#changes_args{filter_view=parse_view_param(Req),
+ view_args=ViewArgs#mrargs{direction=Dir}};
+ _ ->
+ Args1
+ end,
+
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
StartSeq = case Dir of
@@ -70,14 +81,7 @@ handle_changes(Args1, Req, Db0) ->
true ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
- Self = self(),
- {ok, Notify} = couch_db_update_notifier:start_link(
- fun({_, DbName}) when Db0#db.name == DbName ->
- Self ! db_updated;
- (_) ->
- ok
- end
- ),
+ Notify = subscribe_changes_events(FilterName, Db0, Args),
{Db, StartSeq} = Start(),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
@@ -89,7 +93,7 @@ handle_changes(Args1, Req, Db0) ->
Acc0,
true)
after
- couch_db_update_notifier:stop(Notify),
+ unsubscribe_changes_events(Notify, Db0, Args),
get_rest_db_updated(ok) % clean out any remaining update messages
end
end;
@@ -110,6 +114,27 @@ handle_changes(Args1, Req, Db0) ->
end
end.
+subscribe_changes_events("_view", #db{name=DbName},
+ #changes_args{filter_view={DName, _VName}}) ->
+ couch_mrview:subscribe(DbName, <<"_design/", DName/binary>>),
+ nil;
+subscribe_changes_events(_FilterName, Db0, _Args) ->
+ Self = self(),
+ {ok, Notify} = couch_db_update_notifier:start_link(
+ fun({_, DbName}) when Db0#db.name == DbName ->
+ Self ! db_updated;
+ (_) ->
+ ok
+ end
+ ),
+ Notify.
+
+unsubscribe_changes_events(nil, #db{name=DbName},
+ #changes_args{filter_view={DName, _}}) ->
+ couch_mrview:unsubscribe(DbName, <<"_design/", DName/binary>>);
+unsubscribe_changes_events(Pid, _Db, _Args) ->
+ couch_db_update_notifier:stop(Pid).
+
get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
Pair;
get_callback_acc(Callback) when is_function(Callback, 2) ->
@@ -182,9 +207,14 @@ builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
{filter_docids(DocIds, Style), DocIds};
builtin_filter_fun("_design", Style, _Req, _Db) ->
{filter_designdoc(Style), []};
+builtin_filter_fun("_view", Style, {json_req, {Props}}=Req, Db) ->
+ {Params} = couch_util:get_value(<<"query">>, Props),
+ FilterName = ?b2l(couch_util:get_value(<<"view_filter">>, Params,
+ <<"">>)),
+ {os_filter_fun(FilterName, Style, Req, Db), []};
builtin_filter_fun("_view", Style, Req, Db) ->
- {DName, VName} = parse_view_param(Req),
- {filter_view({DName, VName} , Style, Db), []};
+ FilterName = couch_httpd:qs_value(Req, "view_filter", ""),
+ {os_filter_fun(FilterName, Style, Req, Db), []};
builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
throw({bad_request, "unknown builtin filter name"}).
@@ -208,31 +238,6 @@ filter_designdoc(Style) ->
end
end.
-filter_view({DName, VName}, Style, Db) ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
- fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_view(
- DDoc, VName, Docs
- ),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
- end.
-
builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
case Style of
main_only ->
@@ -306,7 +311,9 @@ send_changes(Args, Acc0, FirstRound) ->
#changes_args{
dir = Dir,
filter = FilterName,
- filter_args = FilterArgs
+ filter_args = FilterArgs,
+ filter_view = View,
+ view_args = ViewArgs
} = Args,
#changes_acc{
db = Db,
@@ -321,13 +328,22 @@ send_changes(Args, Acc0, FirstRound) ->
"_design" ->
send_changes_design_docs(
Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+ "_view" ->
+ send_changes_view(
+ Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0);
_ ->
couch_db:changes_since(
Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
end;
false ->
- couch_db:changes_since(
- Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ case FilterName of
+ "_view" ->
+ send_changes_view(
+ Db, View, StartSeq, ViewArgs, fun view_changes_enumerator/2, Acc0);
+ _ ->
+ couch_db:changes_since(
+ Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ end
end.
@@ -352,6 +368,17 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
Db#db.fulldocinfo_by_id_btree, FoldFun, [], KeyOpts),
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+send_changes_view(#db{name=DbName}, {DName, VName}, StartSeq, Args,
+ Fun, Acc0) ->
+ DesignId = <<"_design/", DName/binary>>,
+ case couch_mrview:view_changes_since(DbName, DesignId, VName, StartSeq, Fun,
+ Args, Acc0) of
+ {error, seqs_not_indexed} ->
+ throw({bad_request, "Sequences are not indexed in " ++
+ binary_to_list(DesignId)});
+ Resp ->
+ Resp
+ end.
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
FoldFun = case Dir of
@@ -400,7 +427,6 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
{ok, FinalAcc}
end.
-
keep_sending_changes(Args, Acc0, FirstRound) ->
#changes_args{
feed = ResponseType,
@@ -408,40 +434,47 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
db_open_options = DbOptions
} = Args,
- {ok, ChangesAcc} = send_changes(
- Args#changes_args{dir=fwd},
- Acc0,
- FirstRound),
- #changes_acc{
- db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun,
- seq = EndSeq, prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
- } = ChangesAcc,
-
- couch_db:close(Db),
- if Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
- true ->
- case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
- {updated, UserAcc4} ->
- DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
- case couch_db:open(Db#db.name, DbOptions1) of
- {ok, Db2} ->
- keep_sending_changes(
- Args#changes_args{limit=NewLimit},
- ChangesAcc#changes_acc{
- db = Db2,
- user_acc = UserAcc4,
- seq = EndSeq,
- prepend = Prepend2,
- timeout = Timeout,
- timeout_fun = TimeoutFun},
- false);
- _Else ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
- end;
- {stop, UserAcc4} ->
- end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
- end
+ case send_changes(Args#changes_args{dir=fwd}, Acc0, FirstRound) of
+ {ok, ChangesAcc} ->
+ #changes_acc{
+ db = Db, callback = Callback, timeout = Timeout,
+ timeout_fun = TimeoutFun, seq = EndSeq, prepend = Prepend2,
+ user_acc = UserAcc2, limit = NewLimit
+ } = ChangesAcc,
+
+ couch_db:close(Db),
+ if Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ true ->
+ case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc4} ->
+ DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
+ case couch_db:open(Db#db.name, DbOptions1) of
+ {ok, Db2} ->
+ keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc2, EndSeq,
+ ResponseType)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ end
+ end;
+ Error ->
+ ?LOG_ERROR("Error while getting new changes: ~p~n", [Error]),
+ #changes_acc{
+ callback = Callback, seq = EndSeq, user_acc = UserAcc2
+ } = Acc0,
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
end.
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
@@ -503,6 +536,37 @@ changes_enumerator(DocInfo, Acc) ->
user_acc = UserAcc2, limit = Limit - 1}}
end.
+view_changes_enumerator({{_Key, _Seq}, {Id, _V}}, Acc) ->
+ view_changes_enumerator1(Id, Acc).
+
+view_changes_enumerator1(Id, Acc) ->
+ #changes_acc{
+ filter = FilterFun, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun
+ } = Acc,
+ {ok, DocInfo} = couch_db:get_doc_info(Db, Id),
+ #doc_info{high_seq = Seq} = DocInfo,
+ Results0 = FilterFun(Db, DocInfo),
+ Results = [Result || Result <- Results0, Result /= null],
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ ChangesRow = changes_row(Results, DocInfo, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>,
+ user_acc = UserAcc2, limit = Limit - 1}}
+ end.
changes_row(Results, DocInfo, Acc) ->
#doc_info{
@@ -542,7 +606,11 @@ deleted_item(_) -> [].
wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
receive
db_updated ->
- get_rest_db_updated(UserAcc)
+ get_rest_db_updated(UserAcc);
+ {index_update, _} ->
+ get_rest_db_updated(UserAcc);
+ {index_shutdown, _} ->
+ {stop, UserAcc}
after Timeout ->
{Go, UserAcc2} = TimeoutFun(UserAcc),
case Go of
@@ -556,6 +624,8 @@ wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
get_rest_db_updated(UserAcc) ->
receive
db_updated ->
+ get_rest_db_updated(UserAcc);
+ {index_update, _} ->
get_rest_db_updated(UserAcc)
after 0 ->
{updated, UserAcc}
@@ -612,3 +682,77 @@ filter_doc_fields([Field|Rest], {Props}=Doc, Acc) ->
[{Field, Value} | Acc]
end,
filter_doc_fields(Rest, Doc, Acc1).
+
+parse_view_args({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
+ parse_view_args1(Query, #mrargs{});
+parse_view_args(Req) ->
+ couch_mrview_http:parse_qs(Req, []).
+
+
+parse_view_args1([], Args) ->
+ Args;
+parse_view_args1([{Key, Val} | Rest], Args) ->
+ Args1 = case Key of
+ <<"reduce">> ->
+ Args#mrargs{reduce=Val};
+ <<"key">> ->
+ Args#mrargs{start_key=Val, end_key=Val};
+ <<"keys">> ->
+ Args#mrargs{keys=Val};
+ <<"startkey">> ->
+ Args#mrargs{start_key=Val};
+ <<"start_key">> ->
+ Args#mrargs{start_key=Val};
+ <<"startkey_docid">> ->
+ Args#mrargs{start_key_docid=Val};
+ <<"start_key_doc_id">> ->
+ Args#mrargs{start_key_docid=Val};
+ <<"endkey">> ->
+ Args#mrargs{end_key=Val};
+ <<"end_key">> ->
+ Args#mrargs{end_key=Val};
+ <<"endkey_docid">> ->
+ Args#mrargs{end_key_docid=Val};
+ <<"end_key_doc_id">> ->
+ Args#mrargs{end_key_docid=Val};
+ <<"limit">> ->
+ Args#mrargs{limit=Val};
+ <<"count">> ->
+ throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+ <<"stale">> when Val == <<"ok">> ->
+ Args#mrargs{stale=ok};
+ <<"stale">> when Val == <<"update_after">> ->
+ Args#mrargs{stale=update_after};
+ <<"stale">> ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ <<"descending">> ->
+ case Val of
+ true -> Args#mrargs{direction=rev};
+ _ -> Args#mrargs{direction=fwd}
+ end;
+ <<"skip">> ->
+ Args#mrargs{skip=Val};
+ <<"group">> ->
+ case Val of
+ true -> Args#mrargs{group_level=exact};
+ _ -> Args#mrargs{group_level=0}
+ end;
+ <<"group_level">> ->
+ Args#mrargs{group_level=Val};
+ <<"inclusive_end">> ->
+ Args#mrargs{inclusive_end=Val};
+ <<"include_docs">> ->
+ Args#mrargs{include_docs=Val};
+ <<"update_seq">> ->
+ Args#mrargs{update_seq=Val};
+ <<"conflicts">> ->
+ Args#mrargs{conflicts=Val};
+ <<"list">> ->
+ Args#mrargs{list=Val};
+ <<"callback">> ->
+ Args#mrargs{callback=Val};
+ _ ->
+ Args#mrargs{extra=[{Key, Val} | Args#mrargs.extra]}
+ end,
+ parse_view_args1(Rest, Args1).
Please sign in to comment.
Something went wrong with that request. Please try again.