Skip to content

Commit

Permalink
Make streamed chunk size configurable
Browse files Browse the repository at this point in the history
Defaults to a maximum of 1000000 bytes.

Also standardize the te_identity and te_chunked decoding functions.
Now they both try to read as much as possible (up to the limit),
making body reading much faster when not using chunked encoding.
  • Loading branch information
essen committed Mar 5, 2013
1 parent 55e98f4 commit 233cf43
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
3 changes: 2 additions & 1 deletion guide/req.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ If you know the request contains a body, and that it is
of appropriate size, then you can read it directly with
either `body/1` or `body_qs/1`. Otherwise, you will want
to stream it with `stream_body/1` and `skip_body/1`, with
the streaming process optionally initialized using `init_stream/4`.
the streaming process optionally initialized using `init_stream/4`
or `init_stream/5`.

Multipart request body
----------------------
Expand Down
8 changes: 5 additions & 3 deletions src/cowboy_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ authorization_basic_password(<<C, Rest/binary>>, Fun, Acc) ->
%% @doc Decode a stream of chunks.
-spec te_chunked(Bin, TransferState)
-> more | {more, non_neg_integer(), Bin, TransferState}
| {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState}
| {ok, Bin, Bin, TransferState}
| {done, non_neg_integer(), Bin} | {error, badarg}
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
Expand All @@ -879,11 +879,13 @@ te_chunked(Data, {ChunkRem, Streamed}) ->

%% @doc Decode an identity stream.
-spec te_identity(Bin, TransferState)
-> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin}
-> {more, non_neg_integer(), Bin, TransferState}
| {done, Bin, non_neg_integer(), Bin}
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
te_identity(Data, {Streamed, Total})
when Streamed + byte_size(Data) < Total ->
{ok, Data, {Streamed + byte_size(Data), Total}};
Streamed2 = Streamed + byte_size(Data),
{more, Total - Streamed2, Data, {Streamed2, Total}};
te_identity(Data, {Streamed, Total}) ->
Size = Total - Streamed,
<< Data2:Size/binary, Rest/binary >> = Data,
Expand Down
69 changes: 40 additions & 29 deletions src/cowboy_req.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
-export([has_body/1]).
-export([body_length/1]).
-export([init_stream/4]).
-export([init_stream/5]).
-export([stream_body/1]).
-export([skip_body/1]).
-export([body/1]).
Expand Down Expand Up @@ -152,7 +153,8 @@
meta = [] :: [{atom(), any()}],

%% Request body.
body_state = waiting :: waiting | done | {stream, fun(), any(), fun()},
body_state = waiting :: waiting | done | {stream,
non_neg_integer(), non_neg_integer(), fun(), any(), fun()},
multipart = undefined :: undefined | {non_neg_integer(), fun()},
buffer = <<>> :: binary(),

Expand Down Expand Up @@ -587,6 +589,12 @@ body_length(Req) ->
{undefined, Req2}
end.

%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req)
-spec init_stream(fun(), any(), fun(), Req)
-> {ok, Req} when Req::req().
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req).

%% @doc Initialize body streaming and set custom decoding functions.
%%
%% Calling this function is optional. It should only be used if you
Expand All @@ -603,10 +611,11 @@ body_length(Req) ->
%% Content encoding is generally used for compression.
%%
%% Standard encodings can be found in cowboy_http.
-spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req().
init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req)
-> {ok, Req} when Req::req().
init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) ->
{ok, Req#http_req{body_state=
{stream, TransferDecode, TransferState, ContentDecode}}}.
{stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}.

%% @doc Stream the request's body.
%%
Expand Down Expand Up @@ -635,56 +644,58 @@ stream_body(Req=#http_req{body_state=waiting,
case parse_header(<<"transfer-encoding">>, Req1) of
{ok, [<<"chunked">>], Req2} ->
stream_body(Req2#http_req{body_state=
{stream, fun cowboy_http:te_chunked/2, {0, 0},
fun cowboy_http:ce_identity/1}});
{stream, 0, 1000000,
fun cowboy_http:te_chunked/2, {0, 0},
fun cowboy_http:ce_identity/1}});
{ok, [<<"identity">>], Req2} ->
{Length, Req3} = body_length(Req2),
case Length of
0 ->
{done, Req3#http_req{body_state=done}};
Length ->
stream_body(Req3#http_req{body_state=
{stream, fun cowboy_http:te_identity/2, {0, Length},
fun cowboy_http:ce_identity/1}})
{stream, Length, 1000000,
fun cowboy_http:te_identity/2, {0, Length},
fun cowboy_http:ce_identity/1}})
end
end;
stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}})
stream_body(Req=#http_req{body_state=done}) ->
{done, Req};
stream_body(Req=#http_req{buffer=Buffer})
when Buffer =/= <<>> ->
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
stream_body_recv(0, Req);
stream_body(Req=#http_req{body_state=done}) ->
{done, Req}.
stream_body(Req) ->
stream_body_recv(Req).

-spec stream_body_recv(non_neg_integer(), Req)
-spec stream_body_recv(Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req().
stream_body_recv(Length, Req=#http_req{
transport=Transport, socket=Socket, buffer=Buffer}) ->
stream_body_recv(Req=#http_req{
transport=Transport, socket=Socket, buffer=Buffer,
body_state={stream, Length, MaxLength, _, _, _}}) ->
%% @todo Allow configuring the timeout.
case Transport:recv(Socket, Length, 5000) of
case Transport:recv(Socket, min(Length, MaxLength), 5000) of
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
Req#http_req{buffer= <<>>});
{error, Reason} -> {error, Reason}
end.

-spec transfer_decode(binary(), Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req().
transfer_decode(Data, Req=#http_req{
body_state={stream, TransferDecode, TransferState, ContentDecode}}) ->
transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength,
TransferDecode, TransferState, ContentDecode}}) ->
case TransferDecode(Data, TransferState) of
{ok, Data2, TransferState2} ->
content_decode(ContentDecode, Data2, Req#http_req{body_state=
{stream, TransferDecode, TransferState2, ContentDecode}});
{ok, Data2, Rest, TransferState2} ->
content_decode(ContentDecode, Data2, Req#http_req{
buffer=Rest, body_state=
{stream, TransferDecode, TransferState2, ContentDecode}});
content_decode(ContentDecode, Data2,
Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength,
TransferDecode, TransferState2, ContentDecode}});
%% @todo {header(s) for chunked
more ->
stream_body_recv(0, Req#http_req{buffer=Data});
{more, Length, Rest, TransferState2} ->
stream_body_recv(Length, Req#http_req{buffer=Rest, body_state=
{stream, TransferDecode, TransferState2, ContentDecode}});
stream_body_recv(Req#http_req{buffer=Data, body_state={stream,
0, MaxLength, TransferDecode, TransferState, ContentDecode}});
{more, Length, Data2, TransferState2} ->
content_decode(ContentDecode, Data2,
Req#http_req{body_state={stream, Length, MaxLength,
TransferDecode, TransferState2, ContentDecode}});
{done, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req),
{done, Req2};
Expand Down

0 comments on commit 233cf43

Please sign in to comment.