Skip to content

Commit

Permalink
add support of Server-Sent Events protocol to db changes API.
Browse files Browse the repository at this point in the history
This patch add support for the new specification of w3c by adding a new
feed type named `eventsource`:

http://www.w3.org/TR/2009/WD-eventsource-20090423/

ex:

    $ curl
'http://127.0.0.1:5984/testdb/_changes?feed=eventsource&heartbeat=true&timeout=10000'
    data:
{"seq":1,"id":"test","changes":[{"rev":"1-967a00dff5e02add41819138abb3284d"}]}
    id: 1

    data:
{"seq":2,"id":"test1","changes":[{"rev":"1-967a00dff5e02add41819138abb3284d"}]}
    id: 2
  • Loading branch information
benoitc committed May 10, 2012
1 parent 7428461 commit fdce4ab
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
15 changes: 10 additions & 5 deletions apps/couch_changes/src/couch_changes.erl
Expand Up @@ -64,7 +64,8 @@ handle_changes(Args1, Req, Db0) ->
put(last_changes_heartbeat, now())
end,

if Feed == "continuous" orelse Feed == "longpoll" ->
case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of

This comment has been minimized.

Copy link
@janl

janl May 11, 2012

Contributor

I wonder whether we should start to use atoms here, instead of strings. Strings seem error prone to me.

true ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
Self = self(),
Expand All @@ -90,7 +91,7 @@ handle_changes(Args1, Req, Db0) ->
get_rest_db_updated(ok) % clean out any remaining update messages
end
end;
true ->
false ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
Expand Down Expand Up @@ -267,7 +268,9 @@ get_changes_timeout(Args, Callback) ->
fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
end.

start_sending_changes(_Callback, UserAcc, "continuous") ->
start_sending_changes(_Callback, UserAcc, ResponseType)
when ResponseType =:= "continuous"
orelse ResponseType =:= "eventsource" ->
UserAcc;
start_sending_changes(Callback, UserAcc, ResponseType) ->
Callback(start, ResponseType, UserAcc).
Expand Down Expand Up @@ -440,7 +443,9 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).

changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
when ResponseType =:= "continuous"
orelse ResponseType =:= "eventsource" ->
#changes_acc{
filter = FilterFun, callback = Callback,
user_acc = UserAcc, limit = Limit, db = Db,
Expand All @@ -462,7 +467,7 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) ->
end;
_ ->
ChangesRow = changes_row(Results, DocInfo, Acc),
UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
reset_heartbeat(),
{Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
end;
Expand Down
23 changes: 21 additions & 2 deletions apps/couch_changes/src/couch_httpd_changes.erl
Expand Up @@ -40,14 +40,23 @@ handle_changes_req1(Req, #db{name=DbName}=Db) ->

do_changes_req(Req, Db) ->
MakeCallback = fun(Resp) ->
fun({change, Change, _}, "continuous") ->
fun({change, {ChangeProp}=Change, _}, "eventsource") ->
Seq = proplists:get_value(<<"seq">>, ChangeProp),
couch_httpd:send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change),
"\n", "id: ", ?JSON_ENCODE(Seq),
"\n\n"]);
({change, Change, _}, "continuous") ->
couch_httpd:send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
({change, Change, Prepend}, _) ->
couch_httpd:send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
(start, "eventsource") ->
ok;
(start, "continuous") ->
ok;
(start, _) ->
couch_httpd:send_chunk(Resp, "{\"results\":[\n");
({stop, _EndSeq}, "eventsource") ->
couch_httpd:end_json_response(Resp);
({stop, EndSeq}, "continuous") ->
couch_httpd:send_chunk(
Resp,
Expand Down Expand Up @@ -85,20 +94,30 @@ do_changes_req(Db, Req, #changes_args{feed="normal"}, ChangesFun, MakeCallback)
[{"ETag", CurrentEtag}]),
ChangesFun(MakeCallback(Resp))
end);
do_changes_req(_Db, Req, #changes_args{feed="eventsource"}, ChangesFun,
MakeCallback) ->
Headers = [
{"Content-Type", "text/event-stream"},
{"Cache-Control", "no-cache"}
],
{ok, Resp} = couch_httpd:start_json_response(Req, 200, Headers),
ChangesFun(MakeCallback(Resp));
do_changes_req(_Db, Req, _ChangesArgs, ChangesFun, MakeCallback) ->
% "longpoll" or "continuous"
{ok, Resp} = couch_httpd:start_json_response(Req, 200),
ChangesFun(MakeCallback(Resp)).

parse_changes_query(Req) ->
lists:foldl(fun({Key, Value}, Args) ->
case {Key, Value} of
case {string:to_lower(Key), Value} of
{"feed", _} ->
Args#changes_args{feed=Value};
{"descending", "true"} ->
Args#changes_args{dir=rev};
{"since", _} ->
Args#changes_args{since=list_to_integer(Value)};
{"last-event-id", _} ->
Args#changes_args{since=list_to_integer(Value)};
{"limit", _} ->
Args#changes_args{limit=list_to_integer(Value)};
{"style", _} ->
Expand Down

0 comments on commit fdce4ab

Please sign in to comment.