Skip to content
Browse files

Added support for push stream in mochiweb and inets (feel free to qui…

…ck the tires).
  • Loading branch information...
1 parent 2e74b87 commit b2d3d665a7115ae5619b72cdeaa57683e5affdd9 @davide davide committed
Showing with 141 additions and 0 deletions.
  1. +75 −0 src/server_gateways/ewgi_inets.erl
  2. +66 −0 src/server_gateways/ewgi_mochiweb.erl
View
75 src/server_gateways/ewgi_inets.erl
@@ -25,6 +25,12 @@
-module(ewgi_inets).
-export([do/1]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("inets/src/httpd.hrl").
-include_lib("ewgi.hrl").
@@ -229,6 +235,10 @@ handle_result(#mod{config_db=Db}=A, Ctx) ->
Headers0 = [{string:to_lower(H), binary_to_list(iolist_to_binary(V))} || {H, V} <- ewgi_api:response_headers(Ctx)],
Headers = lists:foldl(fun fold_header/2, [], Headers0),
case ewgi_api:response_message_body(Ctx) of
+ PushStream when is_pid(PushStream) ->
+ PushStream ! {push_stream_data, ?MODULE, A#mod.socket},
+ ChunkedAllowed = not httpd_response:is_disable_chunked_send(Db),
+ handle_push_stream(A, ChunkedAllowed, Code, Headers, PushStream);
Body when is_function(Body, 0) ->
ChunkedAllowed = not httpd_response:is_disable_chunked_send(Db),
handle_result_wrap_stream(A, ChunkedAllowed, Code, Headers, Body);
@@ -275,6 +285,70 @@ stream_to_list(S) when is_function(S, 0) ->
{} -> []
end.
+handle_push_stream(#mod{http_version=Ver}, ChunkedAllowed, Code, Headers, PushStream)
+ when (Ver =/= "HTTP/1.1") or (not ChunkedAllowed) ->
+ PushStream ! {discard, self()},
+ Body = "HTTP/1.1 required for streaming live data!",
+ Length = {content_length, integer_to_list(erlang:iolist_size(Body))},
+ {proceed, [{response, {response, [{code, Code}, Length] ++ Headers, Body}}]};
+handle_push_stream(A, true, Code, Headers, PushStream) ->
+ ExtraHeaders = httpd_response:cache_headers(A),
+ httpd_response:send_header(A, Code, ExtraHeaders ++ [{transfer_encoding, "chunked"}|Headers]),
+ Socket = A#mod.socket,
+ wait_for_streamcontent_pid(Socket, PushStream).
+
+%% Copied/adapted from yaws_server
+wait_for_streamcontent_pid(CliSock, ContentPid) ->
+ Ref = erlang:monitor(process, ContentPid),
+ gen_tcp:controlling_process(CliSock, ContentPid),
+ ContentPid ! {ok, self()},
+ receive
+ endofstreamcontent ->
+ erlang:demonitor(Ref),
+ %% should just use demonitor [flush] option instead?
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 0 ->
+ ok
+ end;
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ done.
+
+%%--------------------------------------------------------------------
+%% Push Streams API - copied from yaws_api
+%%--------------------------------------------------------------------
+
+%% This won't work for SSL for now
+stream_process_deliver(Sock, IoList) ->
+ gen_tcp:send(Sock, IoList).
+
+%% This won't work for SSL for now either
+stream_process_deliver_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ stream_process_deliver_final_chunk(Sock, IoList);
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+stream_process_deliver_final_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ <<"0\r\n\r\n">>;
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n0\r\n\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+
+stream_process_end(Sock, ServerPid) ->
+ gen_tcp:controlling_process(Sock, ServerPid),
+ ServerPid ! endofstreamcontent.
+
+%%--------------------------------------------------------------------
+
fold_header({"accept-ranges", V}, Acc) ->
[{accept_ranges, V}|Acc];
fold_header({"allow", V}, Acc) ->
@@ -364,3 +438,4 @@ recv_size(Length, ChunkSz) when Length > 0, Length < ChunkSz ->
Length;
recv_size(_, ChunkSz) ->
ChunkSz.
+
View
66 src/server_gateways/ewgi_mochiweb.erl
@@ -27,6 +27,12 @@
%% ewgi callbacks
-export([run/2]).
+-export([
+ stream_process_deliver/2,
+ stream_process_deliver_chunk/2,
+ stream_process_deliver_final_chunk/2,
+ stream_process_end/2
+ ]).
-include_lib("ewgi.hrl").
@@ -61,6 +67,11 @@ handle_result(Ctx, Req) ->
Body = ewgi_api:response_message_body(Ctx),
handle_result1(Code, Headers, Body, Req).
+handle_result1(Code, Headers, PushStream, Req) when is_pid(PushStream) ->
+ Req:respond({Code, Headers, chunked}),
+ Socket = Req:get(socket),
+ PushStream ! {push_stream_data, ?MODULE, Socket},
+ wait_for_streamcontent_pid(Socket, PushStream);
handle_result1(Code, Headers, F, Req) when is_function(F, 0) ->
MochiResp = Req:respond({Code, Headers, chunked}),
%handle_stream_result(MochiResp, (catch F()));
@@ -90,6 +101,61 @@ handle_stream(R, Generator) ->
error_logger:error_report(io_lib:format("Invalid stream generator: ~p~n", [Generator])),
R:write_chunk([]).
+%% Copied/adapted from yaws_server
+wait_for_streamcontent_pid(CliSock, ContentPid) ->
+ Ref = erlang:monitor(process, ContentPid),
+ gen_tcp:controlling_process(CliSock, ContentPid),
+ ContentPid ! {ok, self()},
+ receive
+ endofstreamcontent ->
+ erlang:demonitor(Ref),
+ %% should just use demonitor [flush] option instead?
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ after 0 ->
+ ok
+ end;
+ {'DOWN', Ref, _, _, _} ->
+ ok
+ end,
+ done.
+
+%%--------------------------------------------------------------------
+%% Push Streams API - copied from yaws_api
+%% We could use mochiweb's write_chunk function but that
+%% would require that we copy MochiResp around instead of
+%% just copying the socket.
+%%--------------------------------------------------------------------
+
+%% This won't work for SSL for now
+stream_process_deliver(Sock, IoList) ->
+ gen_tcp:send(Sock, IoList).
+
+%% This won't work for SSL for now either
+stream_process_deliver_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ stream_process_deliver_final_chunk(Sock, IoList);
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+stream_process_deliver_final_chunk(Sock, IoList) ->
+ Chunk = case erlang:iolist_size(IoList) of
+ 0 ->
+ <<"0\r\n\r\n">>;
+ S ->
+ [mochihex:to_hex(S), "\r\n", IoList, "\r\n0\r\n\r\n"]
+ end,
+ gen_tcp:send(Sock, Chunk).
+
+stream_process_end(Sock, ServerPid) ->
+ gen_tcp:controlling_process(Sock, ServerPid),
+ ServerPid ! endofstreamcontent.
+
+%%--------------------------------------------------------------------
+
process_application(Appl, Ctx) when is_list(Appl) ->
Path = ewgi_api:path_info(Ctx),
process_mount_application(Ctx, Path, find_mount(Appl, Path));

0 comments on commit b2d3d66

Please sign in to comment.
Something went wrong with that request. Please try again.