Permalink
Browse files

Allow "stream processes" to close the client socket

Yaws allows application processes to take control of the client socket
via the streamcontent_from_pid directive. Sometimes during the course
of its data delivery the application might find that it needs to close
the socket, but the original design did not provide a way for the
application to tell Yaws the socket was closed. The result was that a
message like the one below would show up in the Yaws log whenever a
closed socket was returned to Yaws, due to Yaws trying to continue to
use the socket as if it were still open:

     =ERROR REPORT==== 12-May-2010::00:27:05 ===
     Yaws process died: {{badmatch,{error,einval}},
                         [{yaws,setopts,3},
                          {yaws,http_recv_request,2},
                          {yaws,do_http_get_headers,2},
                          {yaws,http_get_headers,2},
                          {yaws_server,aloop,3},
                          {yaws_server,acceptor0,2},
                          {proc_lib,init_p_do_apply,3}]}

Change yaws_api:stream_process_end/2 to allow the application to pass
the atom 'closed' as the first argument, in place of the socket
argument. This informs Yaws that the socket is closed and prevents it
from trying to keep using it, thus avoiding errors like the one above.
  • Loading branch information...
vinoski committed May 12, 2010
1 parent fd5e602 commit 5da72a3bf0890fbb5a904348a151bcb118b47432
Showing with 113 additions and 19 deletions.
  1. +6 −1 doc/yaws.tex
  2. +10 −1 man/yaws_api.5
  3. +2 −0 src/yaws_api.erl
  4. +15 −8 src/yaws_server.erl
  5. +53 −6 test/t2/app_test.erl
  6. +23 −3 test/t2/streamtest.erl
  7. +4 −0 www/stream.yaws
View
@@ -1558,7 +1558,12 @@ \section{Stream content}
\verb+yaws_api:stream_process_end(Socket, YawsPid)+ when it finishes
sending data or when it receives the \verb+{discard, YawsPid}+ message
from \Yaws\ --- this is required to inform \Yaws\ that \verb+Pid+ has
-finished with the socket and will not use it directly anymore.
+finished with the socket and will not use it directly anymore. If the
+application has to close the socket while it's in control of it,
+though, it must pass the atom \verb+closed+ as the first argument to
+\verb+yaws_api:stream_process_end+ in place of the socket to inform
+\Yaws\ that the socket has been closed and it should no longer attempt
+to use it.
Applications using \verb+streamcontent_from_pid+ wanting to avoid
chunked transfer encoding for their streams should be sure to include
View
@@ -247,7 +247,16 @@ Application processes delivering data directly to clients must call
this function to inform yaws that they've finished using
\fISocket\fR. The \fIYawsPid\fR argument will have been passed to the
process earlier when yaws sent it a message telling it to proceed with
-data delivery.
+data delivery. Yaws expects \fISocket\fR to be open.
+
+.TP
+\fBstream_process_end(closed, YawsPid)\fR
+Same as the previous function but the application calls this if it
+closes the client socket as part of its data delivery process. This
+allows yaws to continue without assuming the socket is still open and
+encountering errors due to that assumption. The \fIYawsPid\fR argument
+will have been passed to the application process earlier when yaws
+sent it a message telling it to proceed with data delivery.
.TP
\fBparse_query(Arg)\fR
View
@@ -873,6 +873,8 @@ stream_process_deliver_final_chunk(Sock, IoList) ->
end,
stream_process_deliver(Sock, Chunk).
+stream_process_end(closed, YawsPid) ->
+ YawsPid ! {endofstreamcontent, closed};
stream_process_end(Sock={sslsocket,_,_}, YawsPid) ->
ssl:controlling_process(Sock, YawsPid),
YawsPid ! endofstreamcontent;
View
@@ -2668,19 +2668,26 @@ wait_for_streamcontent_pid(Priv, CliSock, ContentPid) ->
end,
receive
endofstreamcontent ->
- erlang:demonitor(Ref),
- %% should just use demonitor [flush] option instead?
- receive
- {'DOWN', Ref, _, _, _} ->
- ok
- after 0 ->
- ok
- end;
+ demonitor_streamcontent_pid(Ref);
+ {endofstreamcontent, closed} ->
+ H = get(outh),
+ put(outh, H#outh{doclose = true}),
+ demonitor_streamcontent_pid(Ref);
{'DOWN', Ref, _, _, _} ->
ok
end,
done_or_continue().
+demonitor_streamcontent_pid(Ref) ->
+ erlang:demonitor(Ref),
+ %% should just use demonitor [flush] option instead?
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 0 ->
+ ok
+ end.
+
skip_data(List, Fd, Sz) when is_list(List) ->
skip_data(list_to_binary(List), Fd, Sz);
skip_data(Bin, Fd, Sz) when is_binary(Bin) ->
View
@@ -43,7 +43,6 @@ collect_pids(Pids) ->
end.
-
%% max 5 connectors at a time
allow_connects([], _) ->
io:format(" test1: all pids connected \n",[]);
@@ -232,15 +231,63 @@ appmod_test() ->
streamcontent_test() ->
io:format("streamcontent_test\n",[]),
- Uri = "http://localhost:8000/streamtest",
- ?line {ok, "200", Headers, Body} = ibrowse:send_req(Uri, [], get),
- ?line "chunked" = proplists:get_value("Transfer-Encoding", Headers),
- ?line Body = "this is an iolist",
+ Uri1 = "http://localhost:8000/streamtest/1",
+ ?line {ok, "200", Headers1, Body1} = ibrowse:send_req(Uri1, [], get),
+ ?line "chunked" = proplists:get_value("Transfer-Encoding", Headers1),
+ ?line Body1 = "this is an iolist",
+
+ %% The following test attempts to ensure that Yaws doesn't report the
+ %% following problem due to the application closing the socket and then
+ %% handing it back to Yaws via stream_process_end.
+ %%
+ %% =ERROR REPORT==== 12-May-2010::00:27:05 ===
+ %% Yaws process died: {{badmatch,{error,einval}},
+ %% [{yaws,setopts,3},
+ %% {yaws,http_recv_request,2},
+ %% {yaws,do_http_get_headers,2},
+ %% {yaws,http_get_headers,2},
+ %% {yaws_server,aloop,3},
+ %% {yaws_server,acceptor0,2},
+ %% {proc_lib,init_p_do_apply,3}]}
+ %%
+ %% The test uses plain sockets because closing the remote end makes
+ %% ibrowse unhappy. Unfortunately the only way to currently check that
+ %% the above message doesn't appear is to turn on traffic tracing in
+ %% yaws.conf and then visually check the file logs/report.log.
+ %%
+ Path = "/streamtest/2",
+ {ok, Sock} = gen_tcp:connect("localhost", 8000, [binary, {active, false}]),
+ gen_tcp:send(Sock, "GET " ++ Path ++ " HTTP/1.1\r\nHost: localhost\r\n\r\n"),
+ inet:setopts(Sock, [{packet, http}]),
+ {ok, Len} = recv_hdrs(Sock),
+ inet:setopts(Sock, [{packet, raw}, {active, false}]),
+ {ok, <<"closing the socket">>} = gen_tcp:recv(Sock, Len),
+ timer:sleep(10000),
+ gen_tcp:close(Sock),
ok.
+recv_hdrs(Sock) ->
+ recv_hdrs(Sock, 0).
+recv_hdrs(Sock, Len) ->
+ inet:setopts(Sock, [{active, once}]),
+ receive
+ {http, Sock, http_eoh} ->
+ {ok, Len};
+ {http, Sock, {http_error, Error}} ->
+ {error, Error};
+ {http, Sock, {http_header, _, 'Content-Length', _, LenStr}} ->
+ recv_hdrs(Sock, list_to_integer(LenStr));
+ {http, Sock, {http_header, _, _, _, _}} ->
+ recv_hdrs(Sock, Len);
+ {http, Sock, {http_response, _, 200, "OK"}} ->
+ recv_hdrs(Sock, Len);
+ Other ->
+ {error, {"unexpected message", Other}}
+ end.
+
%% used for appmod tests
%%
-out(A) ->
+out(_A) ->
%% add our special header to mark that we were here
[{status, 200},
{header, {?APPMOD_HEADER, "true"}}].
View
@@ -1,12 +1,21 @@
-module(streamtest).
--export([out/1, streamer/1]).
+-export([out/1, streamer/1, close_streamer/2]).
-include("../../include/yaws_api.hrl").
out(Arg) ->
Sock = Arg#arg.clisock,
- Pid = spawn(?MODULE, streamer, [Sock]),
- {streamcontent_from_pid, "text/plain", Pid}.
+ Url = yaws_api:request_url(Arg),
+ case lists:reverse(string:tokens(Url#url.path, "/")) of
+ ["1"|_] ->
+ Pid1 = spawn(?MODULE, streamer, [Sock]),
+ {streamcontent_from_pid, "text/plain", Pid1};
+ ["2"|_] ->
+ Msg = "closing the socket",
+ Pid2 = spawn(?MODULE, close_streamer, [Sock, Msg]),
+ [{header, {content_length, length(Msg)}},
+ {streamcontent_from_pid, "text/plain", Pid2}]
+ end.
streamer(Sock) ->
receive
@@ -19,3 +28,14 @@ streamer(Sock) ->
),
yaws_api:stream_process_end(Sock, YawsPid)
end.
+
+close_streamer(Sock, Msg) ->
+ receive
+ {discard, YawsPid} ->
+ yaws_api:stream_process_end(Sock, YawsPid);
+ {ok, YawsPid} ->
+ gen_tcp:send(Sock, Msg),
+ timer:sleep(3000),
+ gen_tcp:close(Sock),
+ yaws_api:stream_process_end(closed, YawsPid)
+ end.
View
@@ -81,6 +81,10 @@ out(A) ->
{p,[],"This informs yaws that Pid is finished with the socket and will no longer use it directly."},
+ box(" yaws_api:stream_process_end(closed, YawsPid)"),
+
+ {p,[],"This informs yaws that Pid has not only finished with the socket, but has also closed it. Yaws will not attempt to use the socket anymore after the application calls this function."},
+
{p,[],"Applications using streamcontent_from_pid should be sure to set a Content-Length header in their out/1 return value if they want to avoid chunked transfer encoding for their return value. Yaws automatically sets the HTTP Transfer-Encoding header to chunked if it doesn't detect a Content-Length header."},
{p,[],"Here's an example of using streamcontent_from_pid:"},

0 comments on commit 5da72a3

Please sign in to comment.