Skip to content

Commit

Permalink
Add an 'onresponse' hook
Browse files Browse the repository at this point in the history
This new protocol option is a fun.

It expects 3 args: the Status code used in the reply (this is the
cowboy_http:status() type, it can be an integer or a binary), the
headers that will be sent in the reply, and the Req. It should
only return a possibly modified Req. This can be used for many
things like error logging or custom error pages.

If a reply is sent inside the hook, then Cowboy will discard the
reply initially sent. Extra caution must be used in the handlers
making use of inline chunked replies as they will throw an error.

This fun cannot be used as a filter, you can either observe the
reply sent or discard it to send a different one instead.

The hook will not be called for replies sent from inside the hook.
  • Loading branch information
Loïc Hoguin committed May 4, 2012
1 parent 7ed93fc commit 57fda14
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 37 deletions.
2 changes: 2 additions & 0 deletions include/http.hrl
Expand Up @@ -52,5 +52,7 @@
fun(() -> {sent, non_neg_integer()})},

%% Functions.
onresponse = undefined :: undefined | fun((cowboy_http:status(),
cowboy_http:headers(), #http_req{}) -> #http_req{}),
urldecode :: {fun((binary(), T) -> binary()), T}
}).
4 changes: 3 additions & 1 deletion src/cowboy_client.erl
Expand Up @@ -158,7 +158,9 @@ response_body_loop(Client, Acc) ->
{ok, Data, Client2} ->
response_body_loop(Client2, << Acc/binary, Data/binary >>);
{done, Client2} ->
{ok, Acc, Client2}
{ok, Acc, Client2};
{error, Reason} ->
{error, Reason}
end.

skip_body(Client=#client{state=response_body}) ->
Expand Down
17 changes: 12 additions & 5 deletions src/cowboy_http_protocol.erl
Expand Up @@ -48,6 +48,8 @@
dispatch :: cowboy_dispatcher:dispatch_rules(),
handler :: {module(), any()},
onrequest :: undefined | fun((#http_req{}) -> #http_req{}),
onresponse = undefined :: undefined | fun((cowboy_http:status(),
cowboy_http:headers(), #http_req{}) -> #http_req{}),
urldecode :: {fun((binary(), T) -> binary()), T},
req_empty_lines = 0 :: integer(),
max_empty_lines :: integer(),
Expand Down Expand Up @@ -79,14 +81,16 @@ init(ListenerPid, Socket, Transport, Opts) ->
MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity),
MaxLineLength = proplists:get_value(max_line_length, Opts, 4096),
OnRequest = proplists:get_value(onrequest, Opts),
OnResponse = proplists:get_value(onresponse, Opts),
Timeout = proplists:get_value(timeout, Opts, 5000),
URLDecDefault = {fun cowboy_http:urldecode/2, crash},
URLDec = proplists:get_value(urldecode, Opts, URLDecDefault),
ok = cowboy:accept_ack(ListenerPid),
wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,
dispatch=Dispatch, max_empty_lines=MaxEmptyLines,
max_keepalive=MaxKeepalive, max_line_length=MaxLineLength,
timeout=Timeout, onrequest=OnRequest, urldecode=URLDec}).
timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse,
urldecode=URLDec}).

%% @private
-spec parse_request(#state{}) -> ok.
Expand Down Expand Up @@ -122,24 +126,27 @@ request({http_request, Method, {absoluteURI, _Scheme, _Host, _Port, Path},
request({http_request, Method, {abs_path, AbsPath}, Version},
State=#state{socket=Socket, transport=Transport,
req_keepalive=Keepalive, max_keepalive=MaxKeepalive,
urldecode={URLDecFun, URLDecArg}=URLDec}) ->
onresponse=OnResponse, urldecode={URLDecFun, URLDecArg}=URLDec}) ->
URLDecode = fun(Bin) -> URLDecFun(Bin, URLDecArg) end,
{Path, RawPath, Qs} = cowboy_dispatcher:split_path(AbsPath, URLDecode),
ConnAtom = if Keepalive < MaxKeepalive -> version_to_connection(Version);
true -> close
end,
parse_header(#http_req{socket=Socket, transport=Transport,
connection=ConnAtom, pid=self(), method=Method, version=Version,
path=Path, raw_path=RawPath, raw_qs=Qs, urldecode=URLDec}, State);
path=Path, raw_path=RawPath, raw_qs=Qs, onresponse=OnResponse,
urldecode=URLDec}, State);
request({http_request, Method, '*', Version},
State=#state{socket=Socket, transport=Transport,
req_keepalive=Keepalive, max_keepalive=MaxKeepalive, urldecode=URLDec}) ->
req_keepalive=Keepalive, max_keepalive=MaxKeepalive,
onresponse=OnResponse, urldecode=URLDec}) ->
ConnAtom = if Keepalive < MaxKeepalive -> version_to_connection(Version);
true -> close
end,
parse_header(#http_req{socket=Socket, transport=Transport,
connection=ConnAtom, pid=self(), method=Method, version=Version,
path='*', raw_path= <<"*">>, raw_qs= <<>>, urldecode=URLDec}, State);
path='*', raw_path= <<"*">>, raw_qs= <<>>, onresponse=OnResponse,
urldecode=URLDec}, State);
request({http_request, _Method, _URI, _Version}, State) ->
error_terminate(501, State);
request({http_error, <<"\r\n">>},
Expand Down
67 changes: 42 additions & 25 deletions src/cowboy_http_req.erl
Expand Up @@ -696,26 +696,28 @@ reply(Status, Headers, Req=#http_req{resp_body=Body}) ->
-spec reply(cowboy_http:status(), cowboy_http:headers(), iodata(), #http_req{})
-> {ok, #http_req{}}.
reply(Status, Headers, Body, Req=#http_req{socket=Socket, transport=Transport,
version=Version, connection=Connection, pid=ReqPid,
version=Version, connection=Connection,
method=Method, resp_state=waiting, resp_headers=RespHeaders}) ->
RespConn = response_connection(Headers, Connection),
ContentLen = case Body of {CL, _} -> CL; _ -> iolist_size(Body) end,
HTTP11Headers = case Version of
{1, 1} -> [{<<"Connection">>, atom_to_connection(Connection)}];
_ -> []
end,
response(Status, Headers, RespHeaders, [
{ReplyType, Req2} = response(Status, Headers, RespHeaders, [
{<<"Content-Length">>, integer_to_list(ContentLen)},
{<<"Date">>, cowboy_clock:rfc1123()},
{<<"Server">>, <<"Cowboy">>}
|HTTP11Headers], Req),
case {Method, Body} of
{'HEAD', _} -> ok;
{_, {_, StreamFun}} -> StreamFun();
{_, _} -> Transport:send(Socket, Body)
if Method =:= 'HEAD' -> ok;
ReplyType =:= hook -> ok; %% Hook replied for us, stop there.
true ->
case Body of
{_, StreamFun} -> StreamFun();
_ -> Transport:send(Socket, Body)
end
end,
ReqPid ! {?MODULE, resp_sent},
{ok, Req#http_req{connection=RespConn, resp_state=done,
{ok, Req2#http_req{connection=RespConn, resp_state=done,
resp_headers=[], resp_body= <<>>}}.

%% @equiv chunked_reply(Status, [], Req)
Expand All @@ -729,20 +731,19 @@ chunked_reply(Status, Req) ->
-> {ok, #http_req{}}.
chunked_reply(Status, Headers, Req=#http_req{
version=Version, connection=Connection,
pid=ReqPid, resp_state=waiting, resp_headers=RespHeaders}) ->
resp_state=waiting, resp_headers=RespHeaders}) ->
RespConn = response_connection(Headers, Connection),
HTTP11Headers = case Version of
{1, 1} -> [
{<<"Connection">>, atom_to_connection(Connection)},
{<<"Transfer-Encoding">>, <<"chunked">>}];
_ -> []
end,
response(Status, Headers, RespHeaders, [
{_, Req2} = response(Status, Headers, RespHeaders, [
{<<"Date">>, cowboy_clock:rfc1123()},
{<<"Server">>, <<"Cowboy">>}
|HTTP11Headers], Req),
ReqPid ! {?MODULE, resp_sent},
{ok, Req#http_req{connection=RespConn, resp_state=chunks,
{ok, Req2#http_req{connection=RespConn, resp_state=chunks,
resp_headers=[], resp_body= <<>>}}.

%% @doc Send a chunk of data.
Expand All @@ -762,12 +763,11 @@ chunk(Data, #http_req{socket=Socket, transport=Transport, resp_state=chunks}) ->
-spec upgrade_reply(cowboy_http:status(), cowboy_http:headers(), #http_req{})
-> {ok, #http_req{}}.
upgrade_reply(Status, Headers, Req=#http_req{
pid=ReqPid, resp_state=waiting, resp_headers=RespHeaders}) ->
response(Status, Headers, RespHeaders, [
resp_state=waiting, resp_headers=RespHeaders}) ->
{_, Req2} = response(Status, Headers, RespHeaders, [
{<<"Connection">>, <<"Upgrade">>}
], Req),
ReqPid ! {?MODULE, resp_sent},
{ok, Req#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}.
{ok, Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}.

%% Misc API.

Expand Down Expand Up @@ -798,16 +798,33 @@ transport(#http_req{transport=Transport, socket=Socket}) ->
%% Internal.

-spec response(cowboy_http:status(), cowboy_http:headers(),
cowboy_http:headers(), cowboy_http:headers(), #http_req{}) -> ok.
response(Status, Headers, RespHeaders, DefaultHeaders, #http_req{
socket=Socket, transport=Transport, version=Version}) ->
cowboy_http:headers(), cowboy_http:headers(), #http_req{})
-> {normal | hook, #http_req{}}.
response(Status, Headers, RespHeaders, DefaultHeaders, Req=#http_req{
socket=Socket, transport=Transport, version=Version,
pid=ReqPid, onresponse=OnResponse}) ->
FullHeaders = response_merge_headers(Headers, RespHeaders, DefaultHeaders),
%% @todo 'onresponse' hook here.
HTTPVer = cowboy_http:version_to_binary(Version),
StatusLine = << HTTPVer/binary, " ", (status(Status))/binary, "\r\n" >>,
HeaderLines = [[Key, <<": ">>, Value, <<"\r\n">>]
|| {Key, Value} <- FullHeaders],
Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>]).
Req2 = case OnResponse of
undefined -> Req;
OnResponse -> OnResponse(Status, FullHeaders,
%% Don't call 'onresponse' from the hook itself.
Req#http_req{resp_headers=[], resp_body= <<>>,
onresponse=undefined})
end,
ReplyType = case Req2#http_req.resp_state of
waiting ->
HTTPVer = cowboy_http:version_to_binary(Version),
StatusLine = << HTTPVer/binary, " ",
(status(Status))/binary, "\r\n" >>,
HeaderLines = [[Key, <<": ">>, Value, <<"\r\n">>]
|| {Key, Value} <- FullHeaders],
Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>]),
ReqPid ! {?MODULE, resp_sent},
normal;
_ ->
hook
end,
{ReplyType, Req2}.

-spec response_connection(cowboy_http:headers(), keepalive | close)
-> keepalive | close.
Expand Down
45 changes: 39 additions & 6 deletions test/http_SUITE.erl
Expand Up @@ -44,6 +44,7 @@
-export([nc_zero/1]).
-export([onrequest/1]).
-export([onrequest_reply/1]).
-export([onresponse_reply/1]).
-export([pipeline/1]).
-export([rest_keepalive/1]).
-export([rest_keepalive_post/1]).
Expand All @@ -66,7 +67,7 @@
%% ct.

all() ->
[{group, http}, {group, https}, {group, hooks}].
[{group, http}, {group, https}, {group, onrequest}, {group, onresponse}].

groups() ->
Tests = [
Expand Down Expand Up @@ -108,9 +109,12 @@ groups() ->
[
{http, [], Tests},
{https, [], Tests},
{hooks, [], [
{onrequest, [], [
onrequest,
onrequest_reply
]},
{onresponse, [], [
onresponse_reply
]}
].

Expand Down Expand Up @@ -160,10 +164,10 @@ init_per_group(https, Config) ->
{ok, Client} = cowboy_client:init(Opts),
[{scheme, <<"https">>}, {port, Port}, {opts, Opts},
{transport, Transport}, {client, Client}|Config1];
init_per_group(hooks, Config) ->
init_per_group(onrequest, Config) ->
Port = 33082,
Transport = cowboy_tcp_transport,
{ok, _} = cowboy:start_listener(hooks, 100,
{ok, _} = cowboy:start_listener(onrequest, 100,
Transport, [{port, Port}],
cowboy_http_protocol, [
{dispatch, init_dispatch(Config)},
Expand All @@ -172,6 +176,20 @@ init_per_group(hooks, Config) ->
{timeout, 500}
]),
{ok, Client} = cowboy_client:init([]),
[{scheme, <<"http">>}, {port, Port}, {opts, []},
{transport, Transport}, {client, Client}|Config];
init_per_group(onresponse, Config) ->
Port = 33083,
Transport = cowboy_tcp_transport,
{ok, _} = cowboy:start_listener(onresponse, 100,
Transport, [{port, Port}],
cowboy_http_protocol, [
{dispatch, init_dispatch(Config)},
{max_keepalive, 50},
{onresponse, fun onresponse_hook/3},
{timeout, 500}
]),
{ok, Client} = cowboy_client:init([]),
[{scheme, <<"http">>}, {port, Port}, {opts, []},
{transport, Transport}, {client, Client}|Config].

Expand All @@ -185,8 +203,8 @@ end_per_group(https, Config) ->
end_per_group(http, Config) ->
cowboy:stop_listener(http),
end_static_dir(Config);
end_per_group(hooks, _) ->
cowboy:stop_listener(hooks),
end_per_group(Name, _) ->
cowboy:stop_listener(Name),
ok.

%% Dispatch configuration.
Expand Down Expand Up @@ -568,6 +586,21 @@ onrequest_hook(Req) ->
Req3
end.

onresponse_reply(Config) ->
Client = ?config(client, Config),
{ok, Client2} = cowboy_client:request(<<"GET">>,
build_url("/", Config), Client),
{ok, 777, Headers, Client3} = cowboy_client:response(Client2),
{<<"x-hook">>, <<"onresponse">>} = lists:keyfind(<<"x-hook">>, 1, Headers),
%% Make sure we don't get the body initially sent.
{error, closed} = cowboy_client:response_body(Client3).

%% Hook for the above onresponse tests.
onresponse_hook(_, Headers, Req) ->
{ok, Req2} = cowboy_http_req:reply(
<<"777 Lucky">>, [{<<"x-hook">>, <<"onresponse">>}|Headers], Req),
Req2.

pipeline(Config) ->
Client = ?config(client, Config),
{ok, Client2} = cowboy_client:request(<<"GET">>,
Expand Down

0 comments on commit 57fda14

Please sign in to comment.