Skip to content
Browse files

handle large non-chunked content in yaws_revproxy

Use partial_post_size as a block size to handle large non-chunked content
from backend servers in yaws_revproxy. Read the content block by block,
returning each block to the client as it arrives. This avoids having to
read the entire backend response into memory before replying to the
client. Add a test for this new functionality and fix .gitignore to ignore
the new content file used for the test.
  • Loading branch information...
1 parent f1f858c commit b10e49286940658a90003ab831cb1a067785effe @vinoski vinoski committed Jun 10, 2012
Showing with 117 additions and 35 deletions.
  1. +1 −0 .gitignore
  2. +96 −33 src/yaws_revproxy.erl
  3. +7 −1 test/conf/revproxy.conf
  4. +1 −0 test/t4/Makefile
  5. +12 −1 test/t4/app_test.erl
View
1 .gitignore
@@ -28,6 +28,7 @@ test/support/include.sh
test/t1/localhost:8000/
test/t[12345]/logs/
test/t[12345]/yaws.conf
+test/t4/www2/8388608.bin
www/yaws.pdf
www/yaws.ps
www/*.txt
View
129 src/yaws_revproxy.erl
@@ -31,10 +31,10 @@
r_meth, %% what req method are we processing
r_host, %% and value of Host: for the cli request
- resp, %% response reveiced from the server
+ resp, %% response received from the server
headers, %% and associated headers
srvdata, %% the server data
- is_chunked}). %% true is the response is chunked
+ is_chunked}). %% true if the response is chunked
%% TODO: Activate proxy keep-alive with a new option ?
@@ -43,7 +43,7 @@
%% Initialize the connection to the backend server. If an error occured, return
%% an error 404.
-out(Arg = #arg{req=Req, headers=Hdrs, state={Prefix,URL}}) ->
+out(#arg{req=Req, headers=Hdrs, state={Prefix,URL}}=Arg) ->
case connect(URL) of
{ok, Sock, Type} ->
?Debug("Connection established on ~p: Socket=~p, Type=~p~n",
@@ -64,7 +64,7 @@ out(Arg = #arg{req=Req, headers=Hdrs, state={Prefix,URL}}) ->
%% Send the client request to the server then check if the request content is
%% chunked or not
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendheaders ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == sendheaders ->
?Debug("Send request headers to backend server: ~n"
" - ~s~n", [?format_record(Arg#arg.req, http_request)]),
@@ -95,7 +95,7 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendheaders ->
%% Send the request content to the server. Here the content is not chunked. But
%% it can be splitted because of 'partial_post_size' value.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendcontent ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == sendcontent ->
case Arg#arg.clidata of
{partial, Bin} ->
?Debug("Send partial content to backend server: ~p bytes~n",
@@ -137,7 +137,7 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendcontent ->
%% Send the request content to the server. Here the content is chunked, so we
%% must rebuild the chunk before sending it. Chunks can have different size than
%% the original request because of 'partial_post_size' value.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendchunk ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == sendchunk ->
case Arg#arg.clidata of
{partial, Bin} ->
?Debug("Send chunked content to backend server: ~p bytes~n",
@@ -175,7 +175,7 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == sendchunk ->
%% The request and its content were sent. Now, we try to read the response
%% headers. Then we check if the response content is chunked or not.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == recvheaders ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == recvheaders ->
Res = yaws:http_get_headers(RPState#revproxy.srvsock,
RPState#revproxy.type),
case Res of
@@ -240,32 +240,55 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == recvheaders ->
end;
-%% The reponse content is not chunked.
-%% TODO: use partial_post_size to split huge content and avoid memory
-%% exhaustion.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == recvcontent ->
+%% The response content is not chunked.
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == recvcontent ->
Len = list_to_integer((RPState#revproxy.headers)#headers.content_length),
- case read(RPState, Len) of
- {ok, Data} ->
- ?Debug("Response content received from the backend server : "
- "~p bytes~n", [size(Data)]),
- RPState1 = RPState#revproxy{state = terminate,
+ SC=get(sc),
+ if
+ Len =< SC#sconf.partial_post_size ->
+ case read(RPState, Len) of
+ {ok, Data} ->
+ ?Debug("Response content received from the backend server : "
+ "~p bytes~n", [size(Data)]),
+ RPState1 = RPState#revproxy{state = terminate,
+ is_chunked = false,
+ srvdata = {content, Data}},
+ out(Arg#arg{state=RPState1});
+ {error, Reason} ->
+ ?Debug("TCP error: ~p~n", [Reason]),
+ case Reason of
+ closed -> close(RPState);
+ _ -> ok
+ end,
+ outXXX(500, Arg)
+ end;
+ true ->
+ BlockSize = SC#sconf.partial_post_size,
+ BlockCount = Len div BlockSize,
+ LastBlock = Len rem BlockSize,
+ GC = get(gc),
+ Pid = spawn(
+ fun() ->
+ put(gc, GC),
+ receive
+ {ok, YawsPid} ->
+ recv_blocks(YawsPid, RPState, Arg,
+ BlockCount, BlockSize, LastBlock);
+ {discard, YawsPid} ->
+ Socket = Arg#arg.clisock,
+ yaws_api:stream_process_end(Socket, YawsPid)
+ end
+ end),
+ RPState1 = RPState#revproxy{state = terminate,
is_chunked = false,
- srvdata = {content, Data}},
- out(Arg#arg{state=RPState1});
- {error, Reason} ->
- ?Debug("TCP error: ~p~n", [Reason]),
- case Reason of
- closed -> close(RPState);
- _ -> ok
- end,
- outXXX(500, Arg)
+ srvconn_status = "keep-alive",
+ srvdata = {streamcontent_from_pid, Pid}},
+ out(Arg#arg{state=RPState1})
end;
-
-%% The reponse content is chunked. Read the first chunk here and spawn a process
+%% The response content is chunked. Read the first chunk here and spawn a process
%% to read others.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == recvchunk ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == recvchunk ->
case read_chunk(RPState) of
{ok, Data} ->
?Debug("First chunk received from the backend server : "
@@ -286,7 +309,7 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == recvchunk ->
%% Now, we return the result and we let yaws_server deals with it. If it is
%% possible, we try to cache the connection.
-out(Arg = #arg{state=RPState}) when RPState#revproxy.state == terminate ->
+out(#arg{state=RPState}=Arg) when RPState#revproxy.state == terminate ->
case RPState#revproxy.srvconn_status of
"close" when RPState#revproxy.is_chunked == false -> close(RPState);
_ -> cache_connection(RPState)
@@ -321,13 +344,18 @@ out(Arg = #arg{state=RPState}) when RPState#revproxy.state == terminate ->
spawn(fun() -> put(gc, GC), recv_next_chunk(Self, Arg) end),
MimeType = (RPState#revproxy.headers)#headers.content_type,
Res ++ [{streamcontent, MimeType, Chunk}];
+
+ {streamcontent_from_pid, Pid} ->
+ MimeType = (RPState#revproxy.headers)#headers.content_type,
+ Res ++ [{streamcontent_from_pid, MimeType, Pid}];
+
_ ->
Res
end;
%% Catch unexpected state by sending an error 500
-out(Arg = #arg{state=RPState}) ->
+out(#arg{state=RPState}=Arg) ->
?Debug("Unexpected revproxy state:~n - ~s~n",
[?format_record(RPState, revproxy)]),
case RPState#revproxy.srvsock of
@@ -355,7 +383,7 @@ outXXX(Code, _Arg) ->
%%==========================================================================
%% This function is used to read a chunk and to stream it to the client.
-recv_next_chunk(YawsPid, Arg = #arg{state=RPState}) ->
+recv_next_chunk(YawsPid, #arg{state=RPState}=Arg) ->
case read_chunk(RPState) of
{ok, <<>>} ->
?Debug("Last chunk received from the backend server~n", []),
@@ -378,6 +406,43 @@ recv_next_chunk(YawsPid, Arg = #arg{state=RPState}) ->
outXXX(500, Arg)
end.
+%%==========================================================================
+%% This function reads blocks from the server and streams them to the client.
+recv_blocks(YawsPid, RPState, #arg{clisock=Sock}=Arg, 0, _BlockSize, LastBlock) ->
+ case read(RPState, LastBlock) of
+ {ok, Data} ->
+ ?Debug("Response content received from the backend server : "
+ "~p bytes~n", [size(Data)]),
+ ok = yaws_api:stream_process_deliver(Sock, Data),
+ yaws_api:stream_process_end(Sock, YawsPid);
+ {error, Reason} ->
+ ?Debug("TCP error: ~p~n", [Reason]),
+ case Reason of
+ closed ->
+ close(RPState),
+ yaws_api:stream_process_end(closed, YawsPid);
+ _ ->
+ yaws_api:stream_process_end(Arg#arg.clisock, YawsPid)
+ end
+ end;
+recv_blocks(YawsPid, RPState, Arg, BlockCount, BlockSize, LastBlock) ->
+ Sock = Arg#arg.clisock,
+ case read(RPState, BlockSize) of
+ {ok, Data} ->
+ ?Debug("Response content received from the backend server : "
+ "~p bytes~n", [size(Data)]),
+ ok = yaws_api:stream_process_deliver(Sock, Data),
+ recv_blocks(YawsPid, RPState, Arg, BlockCount-1, BlockSize, LastBlock);
+ {error, Reason} ->
+ ?Debug("TCP error: ~p~n", [Reason]),
+ case Reason of
+ closed ->
+ close(RPState),
+ yaws_api:stream_process_end(closed, YawsPid);
+ _ ->
+ yaws_api:stream_process_end(Sock, YawsPid)
+ end
+ end.
%%==========================================================================
%% TODO: find a better way to cache connections to backend servers. Here we can
@@ -461,13 +526,11 @@ do_connect(URL) ->
{error, unsupported_protocol}
end.
-
send(#revproxy{srvsock=Sock, type=ssl}, Data) ->
ssl:send(Sock, Data);
send(#revproxy{srvsock=Sock, type=nossl}, Data) ->
gen_tcp:send(Sock, Data).
-
read(RPState, Len) ->
yaws:setopts(RPState#revproxy.srvsock, [{packet, raw}, binary],
RPState#revproxy.type),
View
8 test/conf/revproxy.conf
@@ -140,4 +140,10 @@ use_fdsrv = false
docroot = %YTOP%/www
</server>
-
+<server localhost>
+ port = 8004
+ listen = 0.0.0.0
+ listen_backlog = 512
+ docroot = %YTOP%/www
+ revproxy = /revproxy http://localhost:8002
+</server>
View
1 test/t4/Makefile
@@ -18,6 +18,7 @@ test: all start
dd if=/dev/zero of=../../www/2000.txt bs=1024 count=2000 >/dev/null 2>&1
dd if=/dev/zero of=../../www/3000.txt bs=1024 count=3000 >/dev/null 2>&1
dd if=/dev/zero of=../../www/10000.txt bs=1024 count=10000 >/dev/null 2>&1
+ dd if=/dev/urandom of=www2/8388608.bin bs=1 count=8388608 >/dev/null 2>&1
ul=`ulimit -n` ; \
val=`expr $$ul '<' $(ULIMIT)` ; \
if [ $$val = 1 ] ; then \
View
13 test/t4/app_test.erl
@@ -27,6 +27,7 @@ start() ->
streamcontent_revproxy_test(),
keepalive_revproxy_test(),
rewrite_revproxy_test(),
+ large_content_revproxy_test(),
fwdproxy_test(),
ok
catch
@@ -194,6 +195,17 @@ rewrite_revproxy_test() ->
?line Res = Body2,
ok.
+large_content_revproxy_test() ->
+ io:format("large_content_revproxy_test\n", []),
+ Uri = "http://localhost:8004/revproxy/8388608.bin",
+
+ ?line {ok, Bin} = file:read_file("www2/8388608.bin"),
+ ?line {ok, "200", Hdrs, Body0} = ibrowse:send_req(Uri, [], get),
+ ?line "8388608" = proplists:get_value("Content-Length", Hdrs),
+ Body = list_to_binary(Body0),
+ ?line true = (size(Body) == 8388608),
+ ?line Bin = Body,
+ ok.
fwdproxy_test() ->
io:format("fwdproxy_test\n", []),
@@ -209,7 +221,6 @@ fwdproxy_test() ->
?line Res = Body2,
ok.
-
recv_hdrs(Sock) ->
inet:setopts(Sock, [{packet, http}]),
recv_hdrs(Sock, 0).

0 comments on commit b10e492

Please sign in to comment.
Something went wrong with that request. Please try again.