Permalink
Browse files

Push Streams middleware (currrently only supported in Yaws).

  • Loading branch information...
1 parent 4428d77 commit 82d3eb0b1220703de6f9d3453355d30c624f344f @davide davide committed Nov 3, 2009
Showing with 150 additions and 3 deletions.
  1. +46 −0 src/ewgi_api.erl
  2. +80 −0 src/middleware/ewgi_push_stream/ewgi_push_stream.erl
  3. +24 −3 src/server_gateways/ewgi_yaws.erl
View
@@ -64,6 +64,15 @@
-export([parse_qs/1, parse_post/1, parse_post/2, urlencode/1, quote/1,
normalize_header/1, unquote_path/1, path_components/3, urlsplit/1]).
+%% Stream methods
+-export([
+ stream_process_init/0,
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/1
+ ]).
+
%%====================================================================
%% API
%%====================================================================
@@ -864,3 +873,40 @@ urlsplit_query("#" ++ Rest, Acc) ->
{lists:reverse(Acc), Rest};
urlsplit_query([C | Rest], Acc) ->
urlsplit_query(Rest, [C | Acc]).
+
+%%--------------------------------------------------------------------
+%% Stream methods
+%%--------------------------------------------------------------------
+-define(STREAM_INIT_TIMEOUT, 5000).
+stream_process_init() ->
+ receive
+ {push_stream_data, ServerModule, Socket} ->
+ receive
+ {ok, ServerPid} ->
+ Connection = {ServerModule, ServerPid, Socket},
+ {ok, Connection};
+ {discard, ServerPid} ->
+ Connection = {ServerModule, ServerPid, Socket},
+ stream_process_end(Connection),
+ discard
+ end
+ after ?STREAM_INIT_TIMEOUT ->
+ error_logger:error_msg(?MODULE_STRING ++": Timeout while trying to init stream process!~n"),
+ discard
+ end.
+
+stream_process_deliver({ServerModule, _ServerPid, Socket}, IoList) ->
+ A = ServerModule:new(undefined),
+ A:stream_process_deliver(Socket, IoList).
+
+stream_process_deliver_chunk({ServerModule, _ServerPid, Socket}, IoList) ->
+ A = ServerModule:new(undefined),
+ A:stream_process_deliver_chunk(Socket, IoList).
+
+stream_process_deliver_final_chunk({ServerModule, _ServerPid, Socket}, IoList) ->
+ A = ServerModule:new(undefined),
+ A:stream_process_deliver_final_chunk(Socket, IoList).
+
+stream_process_end({ServerModule, ServerPid, Socket}) ->
+ A = ServerModule:new(undefined),
+ A:stream_process_end(ServerPid, Socket).
@@ -0,0 +1,80 @@
+%% @author Davide Marquês <nesrait@gmail.com>
+%% @copyright 2009 Davide Marquês <nesrait@gmail.com>
+%%
+%% @doc ewgi Push streams middleware.
+%% The difference for regular [pull-]streams is that those are synchronously polled
+%% by the ewgi server for more data. With pull streams the ewgi server is notified
+%% when there is more data available.
+%% @end
+%%
+%% Licensed under the MIT license:
+%% http://www.opensource.org/licenses/mit-license.php
+%%
+-module(ewgi_push_stream).
+-author('Davide Marquês <nesrait@gmail.com>').
+
+%% Ewgi Application API
+-export([run/2]).
+
+%% Usage examples
+-export([chunked_stream_example/1, non_chunked_stream_example/1]).
+
+run(Ctx, [Status, CT, StreamFun]) when is_function(StreamFun, 0) ->
+ StreamPid = spawn(StreamFun),
+ run(Ctx, [Status, CT, StreamPid]);
+
+run(Ctx, [Status, CT, StreamFun]) when is_function(StreamFun, 1) ->
+ StreamPid = spawn(fun() -> StreamFun(Ctx) end),
+ run(Ctx, [Status, CT, StreamPid]);
+
+run(Ctx, [Status, ContentType, StreamPid]) when is_pid(StreamPid) ->
+ ewgi_api:response_status(Status,
+ ewgi_api:response_headers([{"Content-type", ContentType}],
+ ewgi_api:response_message_body(StreamPid, Ctx))).
+
+%%--------------------------------------------------------------------
+%% Usage Examples
+%%--------------------------------------------------------------------
+chunked_stream_example(Ctx) ->
+ Status = {200, "OK"},
+ ContentType = "text/plain",
+ StreamFun = stream_function(fun chunked_stream/3),
+ ?MODULE:run(Ctx, [Status, ContentType, StreamFun]).
+
+non_chunked_stream_example(Ctx) ->
+ Status = {200, "OK"},
+ ContentType = "text/plain",
+ StreamFun = stream_function(fun non_chunked_stream/3),
+ ?MODULE:run(Ctx, [Status, ContentType, StreamFun]).
+
+stream_function(Generator) ->
+ fun(Ctx) ->
+ case ewgi_api:stream_process_init() of
+ {ok, Connection} ->
+ Generator(Ctx, Connection, 5),
+ ewgi_api:stream_process_end(Connection);
+ _ -> ok
+ end
+ end.
+
+non_chunked_stream(Ctx, Connection, 0) ->
+ IoList = ["done"],
+ ewgi_api:stream_process_deliver(Connection, IoList);
+non_chunked_stream(Ctx, Connection, Times) ->
+ IoList = [data()],
+ ewgi_api:stream_process_deliver(Connection, IoList),
+ timer:sleep(2000),
+ non_chunked_stream(Ctx, Connection, Times - 1).
+
+chunked_stream(Ctx, Connection, 0) ->
+ IoList = ["done"],
+ ewgi_api:stream_process_deliver_final_chunk(Connection, IoList);
+chunked_stream(Ctx, Connection, Times) ->
+ IoList = [data()],
+ ewgi_api:stream_process_deliver_chunk(Connection, IoList),
+ timer:sleep(2000),
+ chunked_stream(Ctx, Connection, Times - 1).
+
+data() ->
+ httpd_util:rfc1123_date(erlang:localtime()) ++ "\r\n".
+
@@ -26,13 +26,31 @@
-module(ewgi_yaws, [Appl]).
-export([run/1]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("yaws_api.hrl").
-include_lib("ewgi.hrl").
-define(INTERNAL_ERROR, [{status, 500}, {content, "text/plain", <<"Internal Server Error">>}]).
-define(BAD_REQUEST, [{status, 400}, {content, "text/plain", <<"Bad Request">>}]).
+stream_process_deliver(Socket, IoList) ->
+ yaws_api:stream_process_deliver(Socket, IoList).
+
+stream_process_deliver_chunk(Socket, IoList) ->
+ yaws_api:stream_process_deliver_chunk(Socket, IoList).
+
+stream_process_deliver_final_chunk(Socket, IoList) ->
+ yaws_api:stream_process_deliver_final_chunk(Socket, IoList).
+
+stream_process_end(YawsPid, Socket) ->
+ yaws_api:stream_process_end(Socket, YawsPid).
+
%%====================================================================
%% ewgi_server callbacks
%%====================================================================
@@ -42,7 +60,7 @@ run(Arg) ->
Ctx0 = ewgi_api:context(Req, ewgi_api:empty_response()),
try Appl(Ctx0) of
Ctx when ?IS_EWGI_CONTEXT(Ctx) ->
- handle_result(?INSPECT_EWGI_RESPONSE(Ctx))
+ handle_result(?INSPECT_EWGI_RESPONSE(Ctx), Arg#arg.clisock)
catch
_:Reason ->
error_logger:error_report(Reason),
@@ -54,16 +72,19 @@ run(Arg) ->
?BAD_REQUEST
end.
-handle_result(Ctx) ->
+handle_result(Ctx, Socket) ->
{Code, _} = ewgi_api:response_status(Ctx),
H = ewgi_api:response_headers(Ctx),
ContentType = get_content_type(H),
Acc = get_yaws_headers(H),
case ewgi_api:response_message_body(Ctx) of
+ PushStream when is_pid(PushStream) ->
+ PushStream ! {push_stream_data, ?MODULE, Socket},
+ [{status, Code}, {streamcontent_from_pid, ContentType, PushStream}];
Generator when is_function(Generator, 0) ->
YawsPid = self(),
spawn(fun() -> handle_stream(Generator, YawsPid) end),
- {streamcontent_with_timeout, ContentType, <<>>, infinity};
+ [{status, Code}, {streamcontent_with_timeout, ContentType, <<>>, infinity}];
Body ->
[{status, Code}, {content, ContentType, Body}|Acc]
end.

0 comments on commit 82d3eb0

Please sign in to comment.