Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add chunked response body fun #439

Merged
merged 1 commit into from

2 participants

@fishcakez

Extends cowboy:set_resp_body_fun/3 to allow chunked responses in streaming response funs. This is a backwards compatible change. Useful when the body length is unknown and you want to keep the connection alive.

StreamFun = fun(SendChunk) -> lists:foreach(SendChunk, ["ChunkA", "ChunkB"]) end,
Req2 = cowboy_req:set_resp_body_fun(chunked, StreamFun, Req),
cowboy_req:reply(Status, Req2),

The code attached is (currently) a proof of concept with a (passing) testcase.

@essen
Owner

And now for the hard question: why?

I don't see how this is any different than doing a chunked_reply followed by many chunks. Am I missing something?

@fishcakez

What is the intended way to do a chunked response with a REST handler? Currently you have to use chunked_reply and chunk inside a callback and once done return {halt, Req, State}.

Use case:
Send a body of unknown length and keep the connection open for subsequent requests.

@essen
Owner

I'm OK in principle but you need to rebase/cleanup (there's a ct:log still) first.

@essen
Owner

For info: test handlers have moved in the suite's data directory and their name has been shortened.

@fishcakez

Ok, rebased/removed ct:log.

src/cowboy_req.erl
@@ -959,6 +967,18 @@ reply(Status, Headers, Body, Req=#http_req{
true -> ok
end,
Req2#http_req{connection=RespConn};
+ {chunked, BodyFun} ->
+ %% We stream the response body in chunks.
+ {RespType, Req2} = chunked_response(Status, Headers, Req),
+ if RespType =/= hook, Method =/= <<"HEAD">> ->
+ ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
+ BodyFun(ChunkFun),
+ %% Empty chunk signals end of body.
+ _ = chunk(<<>>, Req2),
@essen Owner
essen added a note

You don't need to do that, Cowboy will do it later.

No it won't, reply/4 sets the resp_state to done, so ensure_response won't finish it off.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/cowboy_req.erl
((6 lines not shown))
-> Req when Req::req().
set_resp_body_fun(StreamLen, StreamFun, Req)
when is_integer(StreamLen), is_function(StreamFun) ->
- Req#http_req{resp_body={StreamLen, StreamFun}}.
+ Req#http_req{resp_body={StreamLen, StreamFun}};
+set_resp_body_fun(chunked, StreamFun, Req) ->
@essen Owner
essen added a note

Missing an is_function.

ok.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@essen
Owner

Can you explicitly send the empty chunk like in ensure_response then, will be clearer.

@fishcakez

Guilty as charged re your last inline comment ;). I've done as you said and added a test for 1.0.

@essen
Owner

+-type resp_chunked_fun() :: fun((fun((iodata()) -> ok | {error, atom})) -> ok).
atom()

@fishcakez fishcakez Add chunked response body fun
Adds a new type of streaming response fun. It can be set in a similar
way to a streaming body fun with known length:

Req2 = cowboy_req:set_resp_body_fun(chunked, StreamFun, Req)

The fun, StreamFun, should accept a fun as its single argument. This
fun, ChunkFun, is used to send chunks of iodata:

ok = ChunkFun(IoData)

ChunkFun should not be called with an empty binary or iolist as this
will cause HTTP 1.1 clients to believe the stream is over. The final (0
length) chunk will be sent automatically - even if it has already been
sent - assuming no exception is raised.

Also note that the connection will close after the last chunk for HTTP
1.0 clients.
c8242ab
@essen essen merged commit c8242ab into ninenines:master
@essen
Owner

Merged, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 26, 2013
  1. @fishcakez

    Add chunked response body fun

    fishcakez authored
    Adds a new type of streaming response fun. It can be set in a similar
    way to a streaming body fun with known length:
    
    Req2 = cowboy_req:set_resp_body_fun(chunked, StreamFun, Req)
    
    The fun, StreamFun, should accept a fun as its single argument. This
    fun, ChunkFun, is used to send chunks of iodata:
    
    ok = ChunkFun(IoData)
    
    ChunkFun should not be called with an empty binary or iolist as this
    will cause HTTP 1.1 clients to believe the stream is over. The final (0
    length) chunk will be sent automatically - even if it has already been
    sent - assuming no exception is raised.
    
    Also note that the connection will close after the last chunk for HTTP
    1.0 clients.
This page is out of date. Refresh to see the latest.
View
66 src/cowboy_req.erl
@@ -122,6 +122,8 @@
-export_type([cookie_opts/0]).
-type resp_body_fun() :: fun((inet:socket(), module()) -> ok).
+-type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}).
+-type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok).
-record(http_req, {
%% Transport.
@@ -159,7 +161,8 @@
resp_state = waiting :: locked | waiting | chunks | done,
resp_headers = [] :: cowboy_http:headers(),
resp_body = <<>> :: iodata() | resp_body_fun()
- | {non_neg_integer(), resp_body_fun()},
+ | {non_neg_integer(), resp_body_fun()}
+ | {chunked, resp_chunked_fun()},
%% Functions.
onresponse = undefined :: undefined | already_called
@@ -892,10 +895,15 @@ set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) ->
%% If the body function crashes while writing the response body or writes
%% fewer bytes than declared the behaviour is undefined.
-spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req)
+ -> Req when Req::req();
+ (chunked, resp_chunked_fun(), Req)
-> Req when Req::req().
set_resp_body_fun(StreamLen, StreamFun, Req)
when is_integer(StreamLen), is_function(StreamFun) ->
- Req#http_req{resp_body={StreamLen, StreamFun}}.
+ Req#http_req{resp_body={StreamLen, StreamFun}};
+set_resp_body_fun(chunked, StreamFun, Req)
+ when is_function(StreamFun) ->
+ Req#http_req{resp_body={chunked, StreamFun}}.
%% @doc Return whether the given header has been set for the response.
-spec has_resp_header(binary(), req()) -> boolean().
@@ -906,6 +914,8 @@ has_resp_header(Name, #http_req{resp_headers=RespHeaders}) ->
-spec has_resp_body(req()) -> boolean().
has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
true;
+has_resp_body(#http_req{resp_body={chunked, _}}) ->
+ true;
has_resp_body(#http_req{resp_body={Length, _}}) ->
Length > 0;
has_resp_body(#http_req{resp_body=RespBody}) ->
@@ -957,6 +967,20 @@ reply(Status, Headers, Body, Req=#http_req{
true -> ok
end,
Req2#http_req{connection=RespConn};
+ {chunked, BodyFun} ->
+ %% We stream the response body in chunks.
+ {RespType, Req2} = chunked_response(Status, Headers, Req),
+ if RespType =/= hook, Method =/= <<"HEAD">> ->
+ ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
+ BodyFun(ChunkFun),
+ %% Terminate the chunked body for HTTP/1.1 only.
+ _ = case Version of
+ {1, 0} -> ok;
+ _ -> Transport:send(Socket, <<"0\r\n\r\n">>)
+ end;
+ true -> ok
+ end,
+ Req2;
{ContentLength, BodyFun} ->
%% We stream the response body for ContentLength bytes.
RespConn = response_connection(Headers, Connection),
@@ -1035,22 +1059,9 @@ chunked_reply(Status, Req) ->
%% @see cowboy_req:chunk/2
-spec chunked_reply(cowboy_http:status(), cowboy_http:headers(), Req)
-> {ok, Req} when Req::req().
-chunked_reply(Status, Headers, Req=#http_req{
- version=Version, connection=Connection,
- resp_state=waiting, resp_headers=RespHeaders}) ->
- RespConn = response_connection(Headers, Connection),
- HTTP11Headers = case Version of
- {1, 1} -> [
- {<<"connection">>, atom_to_connection(Connection)},
- {<<"transfer-encoding">>, <<"chunked">>}];
- _ -> []
- end,
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers], <<>>, Req),
- {ok, Req2#http_req{connection=RespConn, resp_state=chunks,
- resp_headers=[], resp_body= <<>>}}.
+chunked_reply(Status, Headers, Req) ->
+ {_, Req2} = chunked_response(Status, Headers, Req),
+ {ok, Req2}.
%% @doc Send a chunk of data.
%%
@@ -1205,6 +1216,25 @@ to_list(Req) ->
%% Internal.
+-spec chunked_response(cowboy_http:status(), cowboy_http:headers(), Req) ->
+ {normal | hook, Req} when Req::req().
+chunked_response(Status, Headers, Req=#http_req{
+ version=Version, connection=Connection,
+ resp_state=waiting, resp_headers=RespHeaders}) ->
+ RespConn = response_connection(Headers, Connection),
+ HTTP11Headers = case Version of
+ {1, 1} -> [
+ {<<"connection">>, atom_to_connection(Connection)},
+ {<<"transfer-encoding">>, <<"chunked">>}];
+ _ -> []
+ end,
+ {RespType, Req2} = response(Status, Headers, RespHeaders, [
+ {<<"date">>, cowboy_clock:rfc1123()},
+ {<<"server">>, <<"Cowboy">>}
+ |HTTP11Headers], <<>>, Req),
+ {RespType, Req2#http_req{connection=RespConn, resp_state=chunks,
+ resp_headers=[], resp_body= <<>>}}.
+
-spec response(cowboy_http:status(), cowboy_http:headers(),
cowboy_http:headers(), cowboy_http:headers(), iodata(), Req)
-> {normal | hook, Req} when Req::req().
View
2  src/cowboy_rest.erl
@@ -945,6 +945,8 @@ set_resp_body(Req, State=#state{handler=Handler, handler_state=HandlerState,
cowboy_req:set_resp_body_fun(StreamFun, Req2);
{stream, Len, StreamFun} ->
cowboy_req:set_resp_body_fun(Len, StreamFun, Req2);
+ {chunked, StreamFun} ->
+ cowboy_req:set_resp_body_fun(chunked, StreamFun, Req2);
_Contents ->
cowboy_req:set_resp_body(Body, Req2)
end,
View
47 test/http_SUITE.erl
@@ -82,6 +82,8 @@
-export([static_test_file_css/1]).
-export([stream_body_set_resp/1]).
-export([stream_body_set_resp_close/1]).
+-export([stream_body_set_resp_chunked/1]).
+-export([stream_body_set_resp_chunked10/1]).
-export([te_chunked/1]).
-export([te_chunked_chopped/1]).
-export([te_chunked_delayed/1]).
@@ -153,6 +155,8 @@ groups() ->
static_test_file_css,
stream_body_set_resp,
stream_body_set_resp_close,
+ stream_body_set_resp_chunked,
+ stream_body_set_resp_chunked10,
te_chunked,
te_chunked_chopped,
te_chunked_delayed,
@@ -338,6 +342,10 @@ init_dispatch(Config) ->
http_stream_body, [
{reply, set_resp_close},
{body, <<"stream_body_set_resp_close">>}]},
+ {"/stream_body/set_resp_chunked",
+ http_stream_body, [
+ {reply, set_resp_chunked},
+ {body, [<<"stream_body">>, <<"_set_resp_chunked">>]}]},
{"/static/[...]", cowboy_static,
[{directory, ?config(static_dir, Config)},
{mimetypes, [{<<".css">>, [<<"text/css">>]}]}]},
@@ -1211,6 +1219,45 @@ stream_body_set_resp_close(Config) ->
end,
{error, closed} = Transport:recv(Socket, 0, 1000).
+stream_body_set_resp_chunked(Config) ->
+ Client = ?config(client, Config),
+ {ok, Client2} = cowboy_client:request(<<"GET">>,
+ build_url("/stream_body/set_resp_chunked", Config), Client),
+ {ok, 200, Headers, Client3} = cowboy_client:response(Client2),
+ {_, <<"chunked">>} = lists:keyfind(<<"transfer-encoding">>, 1, Headers),
+ {ok, Transport, Socket} = cowboy_client:transport(Client3),
+ case element(7, Client3) of
+ <<"B\r\nstream_body\r\n11\r\n_set_resp_chunked\r\n0\r\n\r\n">> ->
+ ok;
+ Buffer ->
+ {ok, Rest} = Transport:recv(Socket, 44 - byte_size(Buffer), 1000),
+ <<"B\r\nstream_body\r\n11\r\n_set_resp_chunked\r\n0\r\n\r\n">>
+ = <<Buffer/binary, Rest/binary>>,
+ ok
+ end.
+
+stream_body_set_resp_chunked10(Config) ->
+ Client = ?config(client, Config),
+ Transport = ?config(transport, Config),
+ {ok, Client2} = cowboy_client:connect(
+ Transport, "localhost", ?config(port, Config), Client),
+ Data = ["GET /stream_body/set_resp_chunked HTTP/1.0\r\n",
+ "Host: localhost\r\n\r\n"],
+ {ok, Client3} = cowboy_client:raw_request(Data, Client2),
+ {ok, 200, Headers, Client4} = cowboy_client:response(Client3),
+ false = lists:keymember(<<"transfer-encoding">>, 1, Headers),
+ {ok, Transport, Socket} = cowboy_client:transport(Client4),
+ case element(7, Client4) of
+ <<"stream_body_set_resp_chunked">> ->
+ ok;
+ Buffer ->
+ {ok, Rest} = Transport:recv(Socket, 28 - byte_size(Buffer), 1000),
+ <<"stream_body_set_resp_chunked">>
+ = <<Buffer/binary, Rest/binary>>,
+ ok
+ end,
+ {error, closed} = Transport:recv(Socket, 0, 1000).
+
te_chunked(Config) ->
Client = ?config(client, Config),
Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])),
View
6 test/http_SUITE_data/http_stream_body.erl
@@ -19,7 +19,11 @@ handle(Req, State=#state{headers=_Headers, body=Body, reply=Reply}) ->
SLen = iolist_size(Body),
cowboy_req:set_resp_body_fun(SLen, SFun, Req);
set_resp_close ->
- cowboy_req:set_resp_body_fun(SFun, Req)
+ cowboy_req:set_resp_body_fun(SFun, Req);
+ set_resp_chunked ->
+ %% Here Body should be a list of chunks, not a binary.
+ SFun2 = fun(SendFun) -> lists:foreach(SendFun, Body) end,
+ cowboy_req:set_resp_body_fun(chunked, SFun2, Req)
end,
{ok, Req3} = cowboy_req:reply(200, Req2),
{ok, Req3, State}.
Something went wrong with that request. Please try again.