Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote branch 'davide/master'

  • Loading branch information...
commit 4db7537588c6b7cdfb2d51dfb9be26ca2ca84bca 2 parents 6bbb761 + a814b78
@skarab skarab authored
View
110 src/ewgi_api.erl
@@ -61,8 +61,18 @@
-export([server_request_foldl/4]).
%% Utility methods
--export([parse_qs/1, parse_post/1, parse_post/2, urlencode/1, quote/1,
- normalize_header/1, unquote_path/1, path_components/3, urlsplit/1]).
+-export([parse_qs/1, parse_post/1, urlencode/1, quote/1, normalize_header/1,
+ unquote_path/1, path_components/3, urlsplit/1]).
+
+%% Stream methods
+-export([
+ stream_process_init/2,
+ stream_process_init/3,
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/1
+ ]).
%%====================================================================
%% API
@@ -454,17 +464,7 @@ parse_qs(ToParse) ->
%% @end
%%--------------------------------------------------------------------
parse_post(ToParse) ->
- parse_data(ToParse, "ISO-8859-1").
-
-%%--------------------------------------------------------------------
-%% @spec parse_post(string()|binary(), InEncoding) -> [proplist()]
-%%
-%% @doc Parse application/x-www-form-urlencoded data.
-%% Calls parse_data to do the job.
-%% @end
-%%--------------------------------------------------------------------
-parse_post(ToParse, InEncoding) ->
- parse_data(ToParse, InEncoding).
+ parse_data(ToParse).
%%--------------------------------------------------------------------
%% @spec parse_data(string()|binary()) -> [proplist()]
@@ -472,25 +472,18 @@ parse_post(ToParse, InEncoding) ->
%% @doc Parse a query string or application/x-www-form-urlencoded data.
%% @end
%%--------------------------------------------------------------------
-parse_data(undefined, _InEncoding) ->
+parse_data(undefined) ->
[];
-parse_data(Data, InEncoding) ->
- UTFData = unicode_data(Data, string:to_lower(InEncoding)),
- kv_data(UTFData, []).
-
-unicode_data(Data, "iso-8859-1") ->
- unicode:characters_to_list(Data, latin1);
-unicode_data(Data, "utf8") ->
- unicode:characters_to_list(Data, utf8);
-unicode_data(Data, "utf-8") ->
- unicode:characters_to_list(Data, utf8).
-%% TODO: add support for more charsets
-
-kv_data([], Acc) ->
+parse_data(Binary) when is_binary(Binary) ->
+ parse_data(binary_to_list(Binary), []);
+parse_data(String) ->
+ parse_data(String, []).
+
+parse_data([], Acc) ->
lists:reverse(Acc);
-kv_data(String, Acc) ->
+parse_data(String, Acc) ->
{{Key, Val}, Rest} = parse_kv(String),
- kv_data(Rest, [{Key, Val} | Acc]).
+ parse_data(Rest, [{Key, Val} | Acc]).
%%--------------------------------------------------------------------
@@ -864,3 +857,62 @@ urlsplit_query("#" ++ Rest, Acc) ->
{lists:reverse(Acc), Rest};
urlsplit_query([C | Rest], Acc) ->
urlsplit_query(Rest, [C | Acc]).
+
+%%--------------------------------------------------------------------
+%% Stream methods
+%%--------------------------------------------------------------------
+%% chunked response
+stream_process_init(Ctx, chunked) when ?IS_EWGI_CONTEXT(Ctx) ->
+ {StatusCode, _} = ewgi_api:response_status(Ctx),
+ Headers = ewgi_api:response_headers(Ctx),
+ ChunkedHeader = {"Transfer-Encoding", "chunked"},
+ wait_for_socket(StatusCode, [ChunkedHeader|Headers], chunked);
+
+%% non chunked response
+stream_process_init(Ctx, CL) when ?IS_EWGI_CONTEXT(Ctx), is_integer(CL) ->
+ {StatusCode, _} = ewgi_api:response_status(Ctx),
+ Headers = ewgi_api:response_headers(Ctx),
+ CLHeader = {"Content-Length", integer_to_list(CL)},
+ wait_for_socket(StatusCode, [CLHeader|Headers], non_chunked).
+
+%% This API is for processes that don't have access the original ewgi_context()
+stream_process_init(StatusCode, Headers, chunked) ->
+ ChunkedHeader = {"Transfer-Encoding", "chunked"},
+ wait_for_socket(StatusCode, [ChunkedHeader|Headers], chunked);
+stream_process_init(StatusCode, Headers, CL) when is_integer(CL) ->
+ CLHeader = {"Content-Length", integer_to_list(CL)},
+ wait_for_socket(StatusCode, [CLHeader|Headers], non_chunked).
+
+-define(STREAM_INIT_TIMEOUT, 5000).
+
+wait_for_socket(StatusCode, Headers, TransferEncoding) ->
+ receive
+ {push_stream_init, ServerModule, ServerPid, Socket} ->
+ ServerPid ! {push_stream_init, self(), StatusCode, Headers, TransferEncoding},
+ Connection = {ServerModule, ServerPid, Socket},
+ %% The server should report back on whether we should send data.
+ %% Sometimes (Method='HEAD') only the headers are sent.
+ receive
+ {ok, ServerPid} ->
+ {ok, Connection};
+ {discard, ServerPid} ->
+ stream_process_end(Connection),
+ discard
+ end
+ after ?STREAM_INIT_TIMEOUT ->
+ error_logger:error_msg(?MODULE_STRING ++": Timeout while trying to init stream process!~n"),
+ discard
+ end.
+
+stream_process_deliver({ServerModule, _ServerPid, Socket}, IoList) ->
+ ServerModule:stream_process_deliver(Socket, IoList).
+
+stream_process_deliver_chunk({ServerModule, _ServerPid, Socket}, IoList) ->
+ ServerModule:stream_process_deliver_chunk(Socket, IoList).
+
+stream_process_deliver_final_chunk({ServerModule, _ServerPid, Socket}, IoList) ->
+ ServerModule:stream_process_deliver_final_chunk(Socket, IoList).
+
+stream_process_end({ServerModule, ServerPid, Socket}) ->
+ ServerModule:stream_process_end(Socket, ServerPid).
+
View
59 src/middleware/ewgi_post/ewgi_post.erl
@@ -51,26 +51,52 @@ post_parse_middleware(MaxLength, App, ErrApp)
parse_ct(L) when is_list(L) ->
case string:tokens(L, ";") of
[CT|Vars] ->
- Vars1 = [string:tokens(VarStr, "=") || VarStr <- Vars],
- Vars2 = [{string:strip(Name), Value} || [Name, Value] <- Vars1],
- %% http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html ->
- %% When no explicit charset parameter is provided by the sender,
- %% media subtypes of the "text" type are defined to have a default
- %% charset value of "ISO-8859-1" when received via HTTP.
- Charset = proplists:get_value("charset", Vars2, "ISO-8859-1"),
- {CT, Charset};
+ Vars1 = [string:tokens(VarStr, "=") || VarStr <- Vars],
+ Vars2 = [{string:strip(Name), Value} || [Name, Value] <- Vars1],
+ {CT, Vars2};
_ ->
undefined
end.
-parse_post(Ctx, App, ErrApp, {"application/x-www-form-urlencoded", Charset}, Max) ->
+parse_post(Ctx, App, ErrApp, {"application/x-www-form-urlencoded", Vars}, Max) ->
case ewgi_api:content_length(Ctx) of
L when is_integer(L), L > Max ->
- %% shouldn't we set an error message here?
+ %% shouldn't we set an error message here?
ErrApp(Ctx);
L when is_integer(L), L > 0 ->
Input = read_input_string(Ctx, L),
- Vals = ewgi_api:parse_post(Input, Charset),
+ %% http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
+ %% When no explicit charset parameter is provided by the sender,
+ %% media subtypes of the "text" type are defined to have a default
+ %% charset value of "ISO-8859-1" when received via HTTP.
+ case proplists:get_value("charset", Vars) of
+ undefined -> InCharset = "iso-8859-1"
+ ;Charset -> InCharset = string:to_lower(Charset)
+ end,
+ UnicodeInput = to_unicode(Input, InCharset),
+ Vals = ewgi_api:parse_post(UnicodeInput),
+ Ctx1 = ewgi_api:remote_user_data(Vals, Ctx),
+ App(Ctx1);
+ _ ->
+ ErrApp(Ctx)
+ end;
+parse_post(Ctx, App, ErrApp, {"application/json", _Vars}, Max) ->
+ case ewgi_api:content_length(Ctx) of
+ L when is_integer(L), L > Max ->
+ %% shouldn't we set an error message here?
+ ErrApp(Ctx);
+ L when is_integer(L), L > 0 ->
+ Input = read_input_string(Ctx, L),
+ %% http://www.ietf.org/rfc/rfc4627.txt
+ %% JSON text SHALL be encoded in Unicode.
+ %% The default encoding is UTF-8.
+ case unicode:bom_to_encoding(Input) of
+ {latin1,0} -> InEncoding = utf8
+ ;{InEncoding, _Length} -> ok
+ end,
+ UnicodeInput = unicode:characters_to_list(Input, InEncoding),
+ {Json, [], _} = ktj_decode:decode(UnicodeInput),
+ Vals = [{"json", Json}],
Ctx1 = ewgi_api:remote_user_data(Vals, Ctx),
App(Ctx1);
_ ->
@@ -91,6 +117,15 @@ read_input_string_cb(Acc) ->
read_input_string_cb([B|Acc])
end.
+%% Transforms the data from the given charset to unicode
+%% Todo: add support for other charset as needed.
+to_unicode(Data, "iso-8859-1") ->
+ unicode:characters_to_list(Data, latin1);
+to_unicode(Data, "utf8") ->
+ unicode:characters_to_list(Data, utf8);
+to_unicode(Data, "utf-8") ->
+ unicode:characters_to_list(Data, utf8).
+
%%
%% example functions on how to use the post handling middleware
%%
@@ -113,7 +148,7 @@ display_form_data({ewgi_context, Request, _Response}=Ctx) ->
Body1 ->
io_lib:format("~p", [Body1])
end,
- ResponseHeaders = [{"Content-type", "text/plain"}],
+ ResponseHeaders = [{"Content-type", "text/html; charset=utf8"}],
Response = {ewgi_response,
{200, "OK"},
ResponseHeaders,
View
126 src/middleware/ewgi_push_stream/ewgi_push_stream.erl
@@ -0,0 +1,126 @@
+%% @author Davide Marquês <nesrait@gmail.com>
+%% @copyright 2009 Davide Marquês <nesrait@gmail.com>
+%%
+%% @doc ewgi Push streams middleware.
+%% The difference for regular [pull-]streams is that those are synchronously polled
+%% by the ewgi server for more data. With pull streams the ewgi server is notified
+%% when there is more data available.
+%% @end
+%%
+%% Licensed under the MIT license:
+%% http://www.opensource.org/licenses/mit-license.php
+%%
+-module(ewgi_push_stream).
+-author('Davide Marquês <nesrait@gmail.com>').
+
+-define(STREAM_INIT_TIMEOUT, 5000).
+
+-define(EX_STREAM_DATA, ["ini\r\n", "mini\r\n", "mini\r\n", "mo\r\n"]).
+
+%% Ewgi Application API
+-export([run/2]).
+
+%% Usage examples
+-export([
+ chunked_stream_example/1,
+ non_chunked_stream_example/1,
+ ewgi_free_stream_example/1
+ ]).
+
+%% The body field is used to specify which process will be producing data.
+%% The various headers and data are all send asynchronously to the
+%% ewgi gateway when available.
+%% The given timeout defines how long to want before issuing a 504 response.
+run(Ctx, [StreamPid]) when is_pid(StreamPid) ->
+ PS = {push_stream, StreamPid, ?STREAM_INIT_TIMEOUT},
+ ewgi_api:response_message_body(PS, Ctx);
+
+run(Ctx, [StreamPid, Timeout]) when is_pid(StreamPid) ->
+ PS = {push_stream, StreamPid, Timeout},
+ ewgi_api:response_message_body(PS, Ctx).
+
+%%--------------------------------------------------------------------
+%% Chunked stream example
+%%--------------------------------------------------------------------
+%% N.B.: there's no point in setting headers in the ewgi_context
+%% passed to ewgi_push_stream:run/2. Those headers will be ignore!
+chunked_stream_example(Ctx) ->
+ StreamPid = spawn(fun() -> chunked_stream(Ctx) end),
+ ?MODULE:run(Ctx, [StreamPid]).
+
+%% The ewgi_context that is passed to ewgi_api:stream_process_init/2 is
+%% the one from where the status code and headers are read from.
+%% The default is a chunked response (see below for non-chunked responses).
+chunked_stream(Ctx0) ->
+ Status = {200, "OK"},
+ H = ewgi_api:response_headers(Ctx0),
+ CTHeader = {"Content-type", "text/plain"},
+ Ctx = ewgi_api:response_headers([CTHeader|H],
+ ewgi_api:response_status(Status, Ctx0)),
+
+ CSEnd = "chunked_stream_end",
+ case ewgi_api:stream_process_init(Ctx, chunked) of
+ {ok, Connection} ->
+ lists:foreach(fun(Word) ->
+ ewgi_api:stream_process_deliver_chunk(Connection, Word),
+ timer:sleep(1000)
+ end, ?EX_STREAM_DATA),
+ ewgi_api:stream_process_deliver_final_chunk(Connection, CSEnd),
+ ewgi_api:stream_process_end(Connection);
+ _ -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Non-Chunked stream example
+%%--------------------------------------------------------------------
+%% N.B.: there's no point in setting headers in the ewgi_context
+%% passed to ewgi_push_stream:run/2. Those headers will be ignore!
+non_chunked_stream_example(Ctx) ->
+ StreamPid = spawn(fun() -> non_chunked_stream(Ctx) end),
+ ?MODULE:run(Ctx, [StreamPid]).
+
+%% The ewgi_context that is passed to ewgi_api:stream_process_init/2 is
+%% the one from where the status code and headers are read from.
+%% By passing a content-length we specify that this woun't be a chunked response.
+non_chunked_stream(Ctx0) ->
+ Status = {200, "OK"},
+ H = ewgi_api:response_headers(Ctx0),
+ CTHeader = {"Content-type", "text/plain"},
+ Ctx = ewgi_api:response_headers([CTHeader|H],
+ ewgi_api:response_status(Status, Ctx0)),
+
+ NCSEnd = "non_chunked_stream_end",
+ ContentLength = iolist_size(?EX_STREAM_DATA) + iolist_size(NCSEnd),
+ case ewgi_api:stream_process_init(Ctx, ContentLength) of
+ {ok, Connection} ->
+ lists:foreach(fun(Word) ->
+ ewgi_api:stream_process_deliver(Connection, Word),
+ timer:sleep(1000)
+ end, ?EX_STREAM_DATA),
+ ewgi_api:stream_process_deliver(Connection, NCSEnd),
+ ewgi_api:stream_process_end(Connection);
+ _ -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Ewgi-free stream
+%% Just showing off that any process can serve as a push stream
+%%--------------------------------------------------------------------
+ewgi_free_stream_example(Ctx) ->
+ StreamPid = spawn(fun ewgi_free_stream/0),
+ ?MODULE:run(Ctx, [StreamPid]).
+
+ewgi_free_stream() ->
+ EFSEnd = "ewgi_free_stream_end",
+ StatusCode = 200,
+ Headers = [{"Content-type", "text/plain"}],
+ case ewgi_api:stream_process_init(StatusCode, Headers, chunked) of
+ {ok, Connection} ->
+ lists:foreach(fun(Word) ->
+ ewgi_api:stream_process_deliver_chunk(Connection, Word),
+ timer:sleep(1000)
+ end, ?EX_STREAM_DATA),
+ ewgi_api:stream_process_deliver_final_chunk(Connection, EFSEnd),
+ ewgi_api:stream_process_end(Connection);
+ _ -> ok
+ end.
View
127 src/server_gateways/ewgi_inets.erl
@@ -25,6 +25,12 @@
-module(ewgi_inets).
-export([do/1]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("inets/src/httpd.hrl").
-include_lib("ewgi.hrl").
@@ -225,18 +231,24 @@ parse_other_header1(K0, {K, V}, Acc) ->
gb_trees:enter(K, [{K0, V}|Ex], Acc).
handle_result(#mod{config_db=Db}=A, Ctx) ->
- {Code, _} = ewgi_api:response_status(Ctx),
- Headers0 = [{string:to_lower(H), binary_to_list(iolist_to_binary(V))} || {H, V} <- ewgi_api:response_headers(Ctx)],
- Headers = lists:foldl(fun fold_header/2, [], Headers0),
- case ewgi_api:response_message_body(Ctx) of
- Body when is_function(Body, 0) ->
- ChunkedAllowed = not httpd_response:is_disable_chunked_send(Db),
- handle_result_wrap_stream(A, ChunkedAllowed, Code, Headers, Body);
- Body0 ->
- Body = [Body0],
- Length = {content_length, integer_to_list(erlang:iolist_size(Body))},
- {proceed, [{response, {response, [{code, Code}, Length] ++ Headers, Body}}]}
- end.
+ case ewgi_api:response_message_body(Ctx) of
+ {push_stream, GeneratorPid, Timeout} when is_pid(GeneratorPid) ->
+ ChunkedAllowed = not httpd_response:is_disable_chunked_send(Db),
+ handle_push_stream(A, Ctx, ChunkedAllowed, GeneratorPid, Timeout);
+ Body0 ->
+ {Code, _} = ewgi_api:response_status(Ctx),
+ Headers0 = [{string:to_lower(H), binary_to_list(iolist_to_binary(V))} || {H, V} <- ewgi_api:response_headers(Ctx)],
+ Headers = lists:foldl(fun fold_header/2, [], Headers0),
+ case Body0 of
+ Body when is_function(Body, 0) ->
+ ChunkedAllowed = not httpd_response:is_disable_chunked_send(Db),
+ handle_result_wrap_stream(A, ChunkedAllowed, Code, Headers, Body);
+ _ ->
+ Body = [Body0],
+ Length = {content_length, integer_to_list(erlang:iolist_size(Body))},
+ {proceed, [{response, {response, [{code, Code}, Length] ++ Headers, Body}}]}
+ end
+ end.
handle_result_wrap_stream(#mod{http_version=Ver}, ChunkedAllowed, Code, Headers, Body0)
when (Ver =/= "HTTP/1.1") or (not ChunkedAllowed) ->
@@ -275,6 +287,96 @@ stream_to_list(S) when is_function(S, 0) ->
{} -> []
end.
+handle_push_stream(#mod{http_version=Ver}, _Ctx, ChunkedAllowed, GeneratorPid, _Timeout)
+ when (Ver =/= "HTTP/1.1") or (not ChunkedAllowed) ->
+ GeneratorPid ! {discard, self()},
+ Body = "HTTP/1.1 required for streaming live data!",
+ Length = {content_length, integer_to_list(erlang:iolist_size(Body))},
+ NotSupported = 505,
+ {proceed, [{response, {response, [{code, NotSupported}, Length], Body}}]};
+handle_push_stream(A, Ctx, true, GeneratorPid, Timeout) ->
+ Socket = A#mod.socket,
+ GeneratorPid ! {push_stream_init, ?MODULE, self(), Socket},
+ receive
+ {push_stream_init, GeneratorPid, Code, Headers0, TransferEncoding} ->
+ Headers1 = [{string:to_lower(H), binary_to_list(iolist_to_binary(V))} || {H, V} <- Headers0],
+ Headers = lists:foldl(fun fold_header/2, [], Headers1),
+ ExtraHeaders = httpd_response:cache_headers(A),
+ httpd_response:send_header(A, Code, ExtraHeaders ++ Headers),
+ case TransferEncoding of
+ chunked ->
+ GeneratorPid ! {ok, self()}
+ ;_ ->
+ %% WARNING: we're depending on the original ewgi_context here!!!!
+ case ewgi_api:request_method(Ctx) of
+ 'HEAD' ->
+ GeneratorPid ! {discard, self()};
+ _ ->
+ GeneratorPid ! {ok, self()}
+ end
+ end,
+ Socket = A#mod.socket,
+ wait_for_streamcontent_pid(Socket, GeneratorPid)
+ after Timeout ->
+ Body = "Gateway Timeout",
+ Length = {content_length, integer_to_list(erlang:iolist_size(Body))},
+ GatewayTimeout = 504,
+ {proceed, [{response, {response, [{code, GatewayTimeout}, Length], Body}}]}
+ end.
+
+%% Copied/adapted from yaws_server
+wait_for_streamcontent_pid(CliSock, ContentPid) ->
+ Ref = erlang:monitor(process, ContentPid),
+ gen_tcp:controlling_process(CliSock, ContentPid),
+ ContentPid ! {ok, self()},
+ receive
+ endofstreamcontent ->
+ ok = gen_tcp:close(CliSock),
+ erlang:demonitor(Ref),
+ %% should just use demonitor [flush] option instead?
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 0 ->
+ ok
+ end;
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ done.
+
+%%--------------------------------------------------------------------
+%% Push Streams API - copied from yaws_api
+%%--------------------------------------------------------------------
+
+%% This won't work for SSL for now
+stream_process_deliver(Sock, IoList) ->
+ gen_tcp:send(Sock, IoList).
+
+%% This won't work for SSL for now either
+stream_process_deliver_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ stream_process_deliver_final_chunk(Sock, IoList);
+ S ->
+ [http_util:integer_to_hexlist(S), "\r\n", IoList, "\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+stream_process_deliver_final_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ <<"0\r\n\r\n">>;
+ S ->
+ [http_util:integer_to_hexlist(S), "\r\n", IoList, "\r\n0\r\n\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+
+stream_process_end(Sock, ServerPid) ->
+ gen_tcp:controlling_process(Sock, ServerPid),
+ ServerPid ! endofstreamcontent.
+
+%%--------------------------------------------------------------------
+
fold_header({"accept-ranges", V}, Acc) ->
[{accept_ranges, V}|Acc];
fold_header({"allow", V}, Acc) ->
@@ -364,3 +466,4 @@ recv_size(Length, ChunkSz) when Length > 0, Length < ChunkSz ->
Length;
recv_size(_, ChunkSz) ->
ChunkSz.
+
View
123 src/server_gateways/ewgi_mochiweb.erl
@@ -23,10 +23,16 @@
%%%
%%% Created : 12 Oct 2007 by Filippo Pacini <filippo.pacini@gmail.com>
%%%-------------------------------------------------------------------
--module(ewgi_mochiweb, [Appl]).
+-module(ewgi_mochiweb).
%% ewgi callbacks
--export([run/1]).
+-export([run/2]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("ewgi.hrl").
@@ -35,10 +41,10 @@
%%====================================================================
%% ewgi_server callbacks
%%====================================================================
-run(MochiReq) ->
+run(Appl, MochiReq) ->
try parse_arg(MochiReq) of
Req when ?IS_EWGI_REQUEST(Req) ->
- try process_application(ewgi_api:context(Req, ewgi_api:empty_response())) of
+ try process_application(Appl, ewgi_api:context(Req, ewgi_api:empty_response())) of
not_found ->
MochiReq:not_found();
Ctx when ?IS_EWGI_CONTEXT(Ctx) ->
@@ -56,10 +62,14 @@ run(MochiReq) ->
%% Chunked response if a nullary function is returned
handle_result(Ctx, Req) ->
- {Code, _} = ewgi_api:response_status(Ctx),
- Headers = ewgi_api:response_headers(Ctx),
- Body = ewgi_api:response_message_body(Ctx),
- handle_result1(Code, Headers, Body, Req).
+ case ewgi_api:response_message_body(Ctx) of
+ {push_stream, GeneratorPid, Timeout} when is_pid(GeneratorPid) ->
+ handle_push_stream(Ctx, Req, GeneratorPid, Timeout);
+ Body ->
+ {Code, _} = ewgi_api:response_status(Ctx),
+ Headers = ewgi_api:response_headers(Ctx),
+ handle_result1(Code, Headers, Body, Req)
+ end.
handle_result1(Code, Headers, F, Req) when is_function(F, 0) ->
MochiResp = Req:respond({Code, Headers, chunked}),
@@ -68,6 +78,34 @@ handle_result1(Code, Headers, F, Req) when is_function(F, 0) ->
handle_result1(Code, Headers, L, Req) ->
Req:respond({Code, Headers, L}).
+handle_push_stream(Ctx, Req, GeneratorPid, Timeout) ->
+ Socket = Req:get(socket),
+ GeneratorPid ! {push_stream_init, ?MODULE, self(), Socket},
+ receive
+ {push_stream_init, GeneratorPid, Code, Headers, TransferEncoding} ->
+ case TransferEncoding of
+ chunked ->
+ Req:respond({Code, Headers, chunked}),
+ GeneratorPid ! {ok, self()}
+ ;_ ->
+ %% mochiweb_request:respond/1 expects the full body in order to
+ %% count the content-length but we already have that. What we're
+ %% missing is the [to-be-sent] body.
+ HResponse = mochiweb_headers:make(Headers),
+ Req:start_response({Code, HResponse}),
+ %% WARNING: we're depending on the original ewgi_context here!!!!
+ case ewgi_api:request_method(Ctx) of
+ 'HEAD' ->
+ GeneratorPid ! {discard, self()};
+ _ ->
+ GeneratorPid ! {ok, self()}
+ end
+ end,
+ wait_for_streamcontent_pid(Socket, GeneratorPid)
+ after Timeout ->
+ Req:respond({504, [], <<"Gateway Timeout">>})
+ end.
+
%% Treat a stream with chunked transfer encoding
handle_stream(R, Generator) when is_function(Generator, 0) ->
case (catch Generator()) of
@@ -90,10 +128,66 @@ handle_stream(R, Generator) ->
error_logger:error_report(io_lib:format("Invalid stream generator: ~p~n", [Generator])),
R:write_chunk([]).
-process_application(Ctx) when is_list(Appl) ->
+%% Copied/adapted from yaws_server
+wait_for_streamcontent_pid(CliSock, ContentPid) ->
+ Ref = erlang:monitor(process, ContentPid),
+ gen_tcp:controlling_process(CliSock, ContentPid),
+ ContentPid ! {ok, self()},
+ receive
+ endofstreamcontent ->
+ ok = gen_tcp:close(CliSock),
+ erlang:demonitor(Ref),
+ %% should just use demonitor [flush] option instead?
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 0 ->
+ ok
+ end;
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ done.
+
+%%--------------------------------------------------------------------
+%% Push Streams API - copied from yaws_api
+%% We could use mochiweb's write_chunk function but that
+%% would require that we copy MochiResp around instead of
+%% just copying the socket.
+%%--------------------------------------------------------------------
+
+%% This won't work for SSL for now
+stream_process_deliver(Sock, IoList) ->
+ gen_tcp:send(Sock, IoList).
+
+%% This won't work for SSL for now either
+stream_process_deliver_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ stream_process_deliver_final_chunk(Sock, IoList);
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+stream_process_deliver_final_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ <<"0\r\n\r\n">>;
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n0\r\n\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+
+stream_process_end(Sock, ServerPid) ->
+ gen_tcp:controlling_process(Sock, ServerPid),
+ ServerPid ! endofstreamcontent.
+
+%%--------------------------------------------------------------------
+
+process_application(Appl, Ctx) when is_list(Appl) ->
Path = ewgi_api:path_info(Ctx),
process_mount_application(Ctx, Path, find_mount(Appl, Path));
-process_application(Ctx) ->
+process_application(Appl, Ctx) ->
ewgi_application:run(Appl, Ctx).
process_mount_application(_, _, {not_found, _}) ->
@@ -184,15 +278,6 @@ parse_element(remote_ident, _Req) ->
parse_element(remote_user, _Req) ->
undefined;
-parse_element(remote_user_data, Req) ->
- case Req:get(method) of
- M when (M=='POST') orelse (M=='PUT') ->
- Req:recv_body(),
- erlang:get(mochiweb_request_body);
- _ ->
- undefined
- end;
-
parse_element(request_method, Req) ->
Req:get(method);
View
67 src/server_gateways/ewgi_yaws.erl
@@ -23,9 +23,15 @@
%%%
%%% Created : 12 Oct 2007 by Filippo Pacini <filippo.pacini@gmail.com>
%%%-------------------------------------------------------------------
--module(ewgi_yaws, [Appl]).
+-module(ewgi_yaws).
--export([run/1]).
+-export([run/2]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("yaws_api.hrl").
-include_lib("ewgi.hrl").
@@ -36,13 +42,13 @@
%%====================================================================
%% ewgi_server callbacks
%%====================================================================
-run(Arg) ->
+run(Appl, Arg) ->
try parse_arg(Arg) of
Req when ?IS_EWGI_REQUEST(Req) ->
Ctx0 = ewgi_api:context(Req, ewgi_api:empty_response()),
try Appl(Ctx0) of
Ctx when ?IS_EWGI_CONTEXT(Ctx) ->
- handle_result(?INSPECT_EWGI_RESPONSE(Ctx))
+ handle_result(?INSPECT_EWGI_RESPONSE(Ctx), Arg#arg.clisock)
catch
_:Reason ->
error_logger:error_report(Reason),
@@ -54,18 +60,37 @@ run(Arg) ->
?BAD_REQUEST
end.
-handle_result(Ctx) ->
- {Code, _} = ewgi_api:response_status(Ctx),
- H = ewgi_api:response_headers(Ctx),
- ContentType = get_content_type(H),
- Acc = get_yaws_headers(H),
+handle_result(Ctx, Socket) ->
case ewgi_api:response_message_body(Ctx) of
- Generator when is_function(Generator, 0) ->
- YawsPid = self(),
- spawn(fun() -> handle_stream(Generator, YawsPid) end),
- {streamcontent_with_timeout, ContentType, <<>>, infinity};
+ {push_stream, GeneratorPid, Timeout} when is_pid(GeneratorPid) ->
+ GeneratorPid ! {push_stream_init, ?MODULE, self(), Socket},
+ receive
+ {push_stream_init, GeneratorPid, Code, Headers, _TransferEncoding} ->
+ ContentType = get_content_type(Headers),
+ Acc = get_yaws_headers(Headers),
+ [{status, Code},
+ {allheaders, Acc},
+ {streamcontent_from_pid, ContentType, GeneratorPid}]
+ after Timeout ->
+ [{status, 504}, {content, "text/plain", <<"Gateway Timeout">>}]
+ end;
Body ->
- [{status, Code}, {content, ContentType, Body}|Acc]
+ {Code, _} = ewgi_api:response_status(Ctx),
+ H = ewgi_api:response_headers(Ctx),
+ ContentType = get_content_type(H),
+ Acc = get_yaws_headers(H),
+ case Body of
+ Generator when is_function(Generator, 0) ->
+ YawsPid = self(),
+ spawn(fun() -> handle_stream(Generator, YawsPid) end),
+ [{status, Code},
+ {allheaders, Acc},
+ {streamcontent_with_timeout, ContentType, <<>>, infinity}];
+ _ ->
+ [{status, Code},
+ {allheaders, Acc},
+ {content, ContentType, Body}]
+ end
end.
get_yaws_headers(H) ->
@@ -108,6 +133,20 @@ handle_stream(Generator, YawsPid) ->
error_logger:error_report(io_lib:format("Invalid stream generator: ~p~n", [Generator])),
yaws_api:stream_chunk_end(YawsPid).
+%%--------------------------------------------------------------------
+%% Push Streams API
+%%--------------------------------------------------------------------
+stream_process_deliver(Socket, IoList) ->
+ yaws_api:stream_process_deliver(Socket, IoList).
+
+stream_process_deliver_chunk(Socket, IoList) ->
+ yaws_api:stream_process_deliver_chunk(Socket, IoList).
+
+stream_process_deliver_final_chunk(Socket, IoList) ->
+ yaws_api:stream_process_deliver_final_chunk(Socket, IoList).
+
+stream_process_end(Socket, ServerPid) ->
+ yaws_api:stream_process_end(Socket, ServerPid).
%%--------------------------------------------------------------------
%%% Internal functions
Please sign in to comment.
Something went wrong with that request. Please try again.