diff --git a/apps/couch_changes/src/couch_changes.erl b/apps/couch_changes/src/couch_changes.erl index 1a0c7833..4efa98e5 100644 --- a/apps/couch_changes/src/couch_changes.erl +++ b/apps/couch_changes/src/couch_changes.erl @@ -64,7 +64,8 @@ handle_changes(Args1, Req, Db0) -> put(last_changes_heartbeat, now()) end, - if Feed == "continuous" orelse Feed == "longpoll" -> + case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of + true -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), @@ -90,7 +91,7 @@ handle_changes(Args1, Req, Db0) -> get_rest_db_updated(ok) % clean out any remaining update messages end end; - true -> + false -> fun(CallbackAcc) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), @@ -267,7 +268,9 @@ get_changes_timeout(Args, Callback) -> fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, UserAcc, "continuous") -> +start_sending_changes(_Callback, UserAcc, ResponseType) + when ResponseType =:= "continuous" + orelse ResponseType =:= "eventsource" -> UserAcc; start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). @@ -440,7 +443,9 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> +changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc) + when ResponseType =:= "continuous" + orelse ResponseType =:= "eventsource" -> #changes_acc{ filter = FilterFun, callback = Callback, user_acc = UserAcc, limit = Limit, db = Db, @@ -462,7 +467,7 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = "continuous"} = Acc) -> end; _ -> ChangesRow = changes_row(Results, DocInfo, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), reset_heartbeat(), {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}} end; diff --git a/apps/couch_changes/src/couch_httpd_changes.erl b/apps/couch_changes/src/couch_httpd_changes.erl index 581cdc33..e65a6eb4 100644 --- a/apps/couch_changes/src/couch_httpd_changes.erl +++ b/apps/couch_changes/src/couch_httpd_changes.erl @@ -40,14 +40,23 @@ handle_changes_req1(Req, #db{name=DbName}=Db) -> do_changes_req(Req, Db) -> MakeCallback = fun(Resp) -> - fun({change, Change, _}, "continuous") -> + fun({change, {ChangeProp}=Change, _}, "eventsource") -> + Seq = proplists:get_value(<<"seq">>, ChangeProp), + couch_httpd:send_chunk(Resp, ["data: ", ?JSON_ENCODE(Change), + "\n", "id: ", ?JSON_ENCODE(Seq), + "\n\n"]); + ({change, Change, _}, "continuous") -> couch_httpd:send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); ({change, Change, Prepend}, _) -> couch_httpd:send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + (start, "eventsource") -> + ok; (start, "continuous") -> ok; (start, _) -> couch_httpd:send_chunk(Resp, "{\"results\":[\n"); + ({stop, _EndSeq}, "eventsource") -> + couch_httpd:end_json_response(Resp); ({stop, EndSeq}, "continuous") -> couch_httpd:send_chunk( Resp, @@ -85,6 +94,14 @@ do_changes_req(Db, Req, #changes_args{feed="normal"}, ChangesFun, MakeCallback) [{"ETag", CurrentEtag}]), ChangesFun(MakeCallback(Resp)) end); +do_changes_req(_Db, Req, #changes_args{feed="eventsource"}, ChangesFun, + MakeCallback) -> + Headers = [ + {"Content-Type", "text/event-stream"}, + {"Cache-Control", "no-cache"} + ], + {ok, Resp} = couch_httpd:start_json_response(Req, 200, Headers), + ChangesFun(MakeCallback(Resp)); do_changes_req(_Db, Req, _ChangesArgs, ChangesFun, MakeCallback) -> % "longpoll" or "continuous" {ok, Resp} = couch_httpd:start_json_response(Req, 200), @@ -92,13 +109,15 @@ do_changes_req(_Db, Req, _ChangesArgs, ChangesFun, MakeCallback) -> parse_changes_query(Req) -> lists:foldl(fun({Key, Value}, Args) -> - case {Key, Value} of + case {string:to_lower(Key), Value} of {"feed", _} -> Args#changes_args{feed=Value}; {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", _} -> Args#changes_args{since=list_to_integer(Value)}; + {"last-event-id", _} -> + Args#changes_args{since=list_to_integer(Value)}; {"limit", _} -> Args#changes_args{limit=list_to_integer(Value)}; {"style", _} ->