Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamed_response functionality #471

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions guide/req.md
Expand Up @@ -170,6 +170,33 @@ As you can see the call to `chunk/2` does not return a modified
request object. It may return an error, however, so you should
make sure that you match the return value on `ok`.

Streamed response
----------------

For those clients that claim to be HTTP 1.1 compliant yet still
don't support chunked responses there is an equivalent API to
allow you to stream blocks of data without wrapping them in
chunked reponse headers.

As with chunked responses, you must first initiate the response
by calling `streamed_reply/{2,3}` and then calling `stream/2`
as many times as needed as in the following snippet. Note some
use cases could also be handle by calling `set_resp_body_fun/{2,3}`
and making direct writes to the connection socket, but these
functions are particularly useful when working with
[loop handlers](loop_handlers.md)

``` erlang
{ok, Req2} = cowboy_req:streamed_reply(200, Req),
ok = cowboy_req:stream("Hello...", Req2),
ok = cowboy_req:stream("streamed...", Req2),
ok = cowboy_req:stream("world!!", Req2).
```

Again, the call to `stream/2` does not return a modified
request object. It may return an error, however, so you should
make sure that you match the return value on `ok`.

Response preconfiguration
-------------------------

Expand Down
41 changes: 40 additions & 1 deletion src/cowboy_req.erl
Expand Up @@ -103,6 +103,9 @@
-export([chunked_reply/2]).
-export([chunked_reply/3]).
-export([chunk/2]).
-export([streamed_reply/2]).
-export([streamed_reply/3]).
-export([stream/2]).
-export([upgrade_reply/3]).
-export([ensure_response/2]).

Expand Down Expand Up @@ -162,7 +165,7 @@

%% Response.
resp_compress = false :: boolean(),
resp_state = waiting :: locked | waiting | chunks | done,
resp_state = waiting :: locked | waiting | chunks | streamed | done,
resp_headers = [] :: cowboy_http:headers(),
resp_body = <<>> :: iodata() | resp_body_fun()
| {non_neg_integer(), resp_body_fun()},
Expand Down Expand Up @@ -1089,6 +1092,40 @@ chunk(Data, #http_req{socket=Socket, transport=Transport, resp_state=chunks}) ->
Transport:send(Socket, [integer_to_list(iolist_size(Data), 16),
<<"\r\n">>, Data, <<"\r\n">>]).

%% @equiv streamed_reply(Status, [], Req)
-spec streamed_reply(cowboy_http:status(), Req) -> {ok, Req} when Req::req().
streamed_reply(Status, Req) ->
streamed_reply(Status, [], Req).

%% @doc Initiate the sending of a streamed reply to the client.
%% @see cowboy_req:stream/2
-spec streamed_reply(cowboy_http:status(), cowboy_http:headers(), Req)
-> {ok, Req} when Req::req().
streamed_reply(Status, Headers, Req=#http_req{
version=Version, connection=Connection,
resp_state=waiting, resp_headers=RespHeaders}) ->
RespConn = response_connection(Headers, Connection),
HTTP11Headers = case Version of
{1, 1} -> [
{<<"connection">>, atom_to_connection(Connection)}];
_ -> []
end,
{_, Req2} = response(Status, Headers, RespHeaders, [
{<<"date">>, cowboy_clock:rfc1123()},
{<<"server">>, <<"Cowboy">>}
|HTTP11Headers], <<>>, Req),
{ok, Req2#http_req{connection=RespConn, resp_state=streamed,
resp_headers=[], resp_body= <<>>}}.

%% @doc Send a block of data.
%%
%% A streamed reply must have been initiated before calling this function.
-spec stream(iodata(), req()) -> ok | {error, atom()}.
stream(_Data, #http_req{method= <<"HEAD">>}) ->
ok;
stream(Data, #http_req{socket=Socket, transport=Transport}) ->
Transport:send(Socket, Data).

%% @doc Send an upgrade reply.
%% @private
-spec upgrade_reply(cowboy_http:status(), cowboy_http:headers(), Req)
Expand All @@ -1114,6 +1151,8 @@ ensure_response(Req=#http_req{resp_state=waiting}, Status) ->
%% Terminate the chunked body for HTTP/1.1 only.
ensure_response(#http_req{method= <<"HEAD">>, resp_state=chunks}, _) ->
ok;
ensure_response(#http_req{resp_state=streamed}, _) ->
ok;
ensure_response(#http_req{version={1, 0}, resp_state=chunks}, _) ->
ok;
ensure_response(#http_req{socket=Socket, transport=Transport,
Expand Down
14 changes: 14 additions & 0 deletions test/http_SUITE.erl
Expand Up @@ -80,6 +80,7 @@
-export([static_test_file_css/1]).
-export([stream_body_set_resp/1]).
-export([stream_body_set_resp_close/1]).
-export([streamed_response/1]).
-export([te_chunked/1]).
-export([te_chunked_chopped/1]).
-export([te_chunked_delayed/1]).
Expand Down Expand Up @@ -149,6 +150,7 @@ groups() ->
static_test_file_css,
stream_body_set_resp,
stream_body_set_resp_close,
streamed_response,
te_chunked,
te_chunked_chopped,
te_chunked_delayed,
Expand Down Expand Up @@ -318,6 +320,7 @@ init_dispatch(Config) ->
cowboy_router:compile([
{"localhost", [
{"/chunked_response", chunked_handler, []},
{"/streamed_response", streamed_handler, []},
{"/init_shutdown", http_handler_init_shutdown, []},
{"/long_polling", http_handler_long_polling, []},
{"/headers/dupe", http_handler,
Expand Down Expand Up @@ -526,6 +529,17 @@ chunked_response(Config) ->
= Transport:recv(Socket, 44, 1000),
{error, closed} = cowboy_client:response(Client3).

streamed_response(Config) ->
Client = ?config(client, Config),
{ok, Client2} = cowboy_client:request(<<"GET">>,
build_url("/streamed_response", Config), Client),
{ok, 200, Headers, Client3} = cowboy_client:response(Client2),
false = lists:keymember(<<"transfer-encoding">>, 1, Headers),
{ok, Transport, Socket} = cowboy_client:transport(Client3),
{ok, <<"streamed_handler\r\nworks fine!">>}
= Transport:recv(Socket, 29, 1000),
{error, closed} = cowboy_client:response(Client3).

%% Check if sending requests whose size is around the MTU breaks something.
echo_body(Config) ->
Client = ?config(client, Config),
Expand Down
19 changes: 19 additions & 0 deletions test/streamed_handler.erl
@@ -0,0 +1,19 @@
%% Feel free to use, reuse and abuse the code in this file.

-module(streamed_handler).
-behaviour(cowboy_http_handler).
-export([init/3, handle/2, terminate/3]).

init({_Transport, http}, Req, _Opts) ->
{ok, Req, undefined}.

handle(Req, State) ->
{ok, Req2} = cowboy_req:streamed_reply(200, Req),
timer:sleep(100),
cowboy_req:stream("streamed_handler\r\n", Req2),
timer:sleep(100),
cowboy_req:stream("works fine!", Req2),
{ok, Req2, State}.

terminate(_, _, _) ->
ok.