Skip to content

Commit

Permalink
Replace init_stream/5 with stream_body/2
Browse files Browse the repository at this point in the history
This allows us to change the max chunk length on a per chunk basis
instead of for the whole stream. It's also much easier to use this
way even if we don't want to change the chunk size.
  • Loading branch information
essen committed Apr 2, 2013
1 parent 66f7c15 commit ce1d886
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 34 deletions.
6 changes: 3 additions & 3 deletions guide/req.md
Expand Up @@ -104,9 +104,9 @@ and `body_qs/2` will return `{error, badlength}`. If the request
contains chunked body, `body/1`, `body/2`, `body_qs/1`
and `body_qs/2` will return `{error, chunked}`.
If you get either of the above two errors, you will want to
handle the body of the request using `stream_body/1` and
`skip_body/1`, with the streaming process optionally
initialized using `init_stream/4` or `init_stream/5`.
handle the body of the request using `stream_body/1`,
`stream_body/2` and `skip_body/1`, with the streaming process
optionally initialized using `init_stream/4`.

Multipart request body
----------------------
Expand Down
67 changes: 36 additions & 31 deletions src/cowboy_req.erl
Expand Up @@ -78,8 +78,8 @@
-export([has_body/1]).
-export([body_length/1]).
-export([init_stream/4]).
-export([init_stream/5]).
-export([stream_body/1]).
-export([stream_body/2]).
-export([skip_body/1]).
-export([body/1]).
-export([body/2]).
Expand Down Expand Up @@ -155,8 +155,8 @@
meta = [] :: [{atom(), any()}],

%% Request body.
body_state = waiting :: waiting | done | {stream,
non_neg_integer(), non_neg_integer(), fun(), any(), fun()},
body_state = waiting :: waiting | done
| {stream, non_neg_integer(), fun(), any(), fun()},
multipart = undefined :: undefined | {non_neg_integer(), fun()},
buffer = <<>> :: binary(),

Expand Down Expand Up @@ -591,17 +591,11 @@ body_length(Req) ->
{undefined, Req2}
end.

%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req)
-spec init_stream(fun(), any(), fun(), Req)
-> {ok, Req} when Req::req().
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req).

%% @doc Initialize body streaming and set custom decoding functions.
%%
%% Calling this function is optional. It should only be used if you
%% need to override the default behavior of Cowboy. Otherwise you
%% should call stream_body/1 directly.
%% should call stream_body/{1,2} directly.
%%
%% Two decodings happen. First a decoding function is applied to the
%% transferred data, and then another is applied to the actual content.
Expand All @@ -613,27 +607,36 @@ init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
%% Content encoding is generally used for compression.
%%
%% Standard encodings can be found in cowboy_http.
-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req)
-spec init_stream(fun(), any(), fun(), Req)
-> {ok, Req} when Req::req().
init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) ->
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
{ok, Req#http_req{body_state=
{stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}.
{stream, 0, TransferDecode, TransferState, ContentDecode}}}.

%% @equiv stream_body(Req, 1000000)
-spec stream_body(Req) -> {ok, binary(), Req}
| {done, Req} | {error, atom()} when Req::req().
stream_body(Req) ->
stream_body(Req, 1000000).

%% @doc Stream the request's body.
%%
%% This is the most low level function to read the request body.
%%
%% In most cases, if they weren't defined before using stream_body/4,
%% In most cases, if they weren't defined before using init_stream/4,
%% this function will guess which transfer and content encodings were
%% used for building the request body, and configure the decoding
%% functions that will be used when streaming.
%%
%% It then starts streaming the body, returning {ok, Data, Req}
%% for each streamed part, and {done, Req} when it's finished streaming.
-spec stream_body(Req) -> {ok, binary(), Req}
%%
%% You can limit the size of the chunks being returned by using the
%% second argument which is the size in bytes. It defaults to 1000000 bytes.
-spec stream_body(Req, non_neg_integer()) -> {ok, binary(), Req}
| {done, Req} | {error, atom()} when Req::req().
stream_body(Req=#http_req{body_state=waiting,
version=Version, transport=Transport, socket=Socket}) ->
stream_body(Req=#http_req{body_state=waiting, version=Version,
transport=Transport, socket=Socket}, MaxLength) ->
{ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req),
case ExpectHeader of
[<<"100-continue">>] ->
Expand All @@ -646,34 +649,36 @@ stream_body(Req=#http_req{body_state=waiting,
case parse_header(<<"transfer-encoding">>, Req1) of
{ok, [<<"chunked">>], Req2} ->
stream_body(Req2#http_req{body_state=
{stream, 0, 1000000,
{stream, 0,
fun cowboy_http:te_chunked/2, {0, 0},
fun cowboy_http:ce_identity/1}});
fun cowboy_http:ce_identity/1}},
MaxLength);
{ok, [<<"identity">>], Req2} ->
{Length, Req3} = body_length(Req2),
case Length of
0 ->
{done, Req3#http_req{body_state=done}};
Length ->
stream_body(Req3#http_req{body_state=
{stream, Length, 1000000,
{stream, Length,
fun cowboy_http:te_identity/2, {0, Length},
fun cowboy_http:ce_identity/1}})
fun cowboy_http:ce_identity/1}},
MaxLength)
end
end;
stream_body(Req=#http_req{body_state=done}) ->
stream_body(Req=#http_req{body_state=done}, _) ->
{done, Req};
stream_body(Req=#http_req{buffer=Buffer})
stream_body(Req=#http_req{buffer=Buffer}, _)
when Buffer =/= <<>> ->
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
stream_body(Req) ->
stream_body_recv(Req).
stream_body(Req, MaxLength) ->
stream_body_recv(Req, MaxLength).

-spec stream_body_recv(Req)
-spec stream_body_recv(Req, non_neg_integer())
-> {ok, binary(), Req} | {error, atom()} when Req::req().
stream_body_recv(Req=#http_req{
transport=Transport, socket=Socket, buffer=Buffer,
body_state={stream, Length, MaxLength, _, _, _}}) ->
body_state={stream, Length, _, _, _}}, MaxLength) ->
%% @todo Allow configuring the timeout.
case Transport:recv(Socket, min(Length, MaxLength), 5000) of
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
Expand All @@ -683,20 +688,20 @@ stream_body_recv(Req=#http_req{

-spec transfer_decode(binary(), Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req().
transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength,
transfer_decode(Data, Req=#http_req{body_state={stream, _,
TransferDecode, TransferState, ContentDecode}}) ->
case TransferDecode(Data, TransferState) of
{ok, Data2, Rest, TransferState2} ->
content_decode(ContentDecode, Data2,
Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength,
Req#http_req{buffer=Rest, body_state={stream, 0,
TransferDecode, TransferState2, ContentDecode}});
%% @todo {header(s) for chunked
more ->
stream_body_recv(Req#http_req{buffer=Data, body_state={stream,
0, MaxLength, TransferDecode, TransferState, ContentDecode}});
0, TransferDecode, TransferState, ContentDecode}}, 0);
{more, Length, Data2, TransferState2} ->
content_decode(ContentDecode, Data2,
Req#http_req{body_state={stream, Length, MaxLength,
Req#http_req{body_state={stream, Length,
TransferDecode, TransferState2, ContentDecode}});
{done, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req),
Expand Down

0 comments on commit ce1d886

Please sign in to comment.