Skip to content

Commit

Permalink
Refactoring nested cases with reduce_while. Fixes #58
Browse files Browse the repository at this point in the history
Also:

- nuke handler supervisor
- get rid of accept timeout
- close connection when no bookish_spork response
  • Loading branch information
tank-bohr committed Nov 21, 2019
1 parent f540c06 commit 960d9ed
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 130 deletions.
22 changes: 7 additions & 15 deletions src/bookish_spork_acceptor.erl
Expand Up @@ -10,12 +10,10 @@
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
handle_info/2
]).

-record(state, {
sup :: pid(),
server :: pid(),
listen_socket :: bookish_spork_transport:listen()
}).
Expand All @@ -35,10 +33,8 @@ start_link(Server, ListenSocket) ->

%% @private
init({Server, ListenSocket}) ->
{ok, Sup} = bookish_spork_sup:start_handler_sup(Server),
accept(),
{ok, #state{
sup = Sup,
server = Server,
listen_socket = ListenSocket
}}.
Expand All @@ -48,8 +44,8 @@ handle_call(_Request, _From, State) ->
{reply, {error, unknown_call}, State}.

%% @private
handle_cast(accept, #state{listen_socket = ListenSocket, sup = Sup} = State) ->
accept(ListenSocket, Sup),
handle_cast(accept, #state{listen_socket = ListenSocket, server = Server} = State) ->
accept(ListenSocket, Server),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
Expand All @@ -58,17 +54,13 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.

%% @private
terminate(_Reason, #state{sup = Sup}) ->
bookish_spork_sup:stop(Sup).

accept() ->
gen_server:cast(self(), accept).

-spec accept(ListenSocket, Sup) -> ok when
-spec accept(ListenSocket, Server) -> ok when
ListenSocket :: bookish_spork_transport:listen(),
Sup :: pid().
accept(ListenSocket, Sup) ->
Server :: pid().
accept(ListenSocket, Server) ->
Transport = bookish_spork_transport:accept(ListenSocket),
bookish_spork_sup:start_handler(Sup, Transport),
bookish_spork_handler:start_link(Server, Transport),
accept().
161 changes: 76 additions & 85 deletions src/bookish_spork_handler.erl
@@ -1,7 +1,6 @@
-module(bookish_spork_handler).

-export([
child_spec/1,
start_link/2
]).

Expand All @@ -15,35 +14,26 @@

-record(state, {
server :: pid(),
transport :: bookish_spork_transport:t()
transport :: bookish_spork_transport:t(),
request :: undefined | bookish_spork_request:t(),
response :: undefined | bookish_spork_response:t() | bookish_spork:stub_request_fun()
}).

-type state() :: #state{}.

child_spec(Args) ->
#{
id => ?MODULE,
start => {?MODULE, start_link, Args},
restart => temporary,
shutdown => 5000,
type => worker,
modules => [?MODULE]
}.

-spec start_link(Server, Transport) -> {ok, pid()} when
Server :: pid(),
Transport :: bookish_spork_transport:t().
start_link(Server, Transport) ->
Args = {Server, Transport},
gen_server:start_link(?MODULE, Args, []).
gen_server:start_link(?MODULE, {Server, Transport}, []).

-spec init({Server, Transport}) -> {ok, State} when
State :: state(),
Server :: pid(),
Transport :: bookish_spork_transport:t().
%% @private
init({Server, Transport}) ->
handle_connection(),
keepalive_loop(),
{ok, #state{
server = Server,
transport = Transport
Expand All @@ -54,103 +44,104 @@ handle_call(_Request, _From, State) ->
{reply, {error, unknown_call}, State}.

%% @private
handle_cast(handle_connection, State) ->
handle_connection(State);
handle_cast(keepalive_loop, State) ->
keepalive_loop(State);
handle_cast(_Msg, State) ->
{noreply, State}.

%% @private
handle_info(_Info, State) ->
{noreply, State}.

-spec handle_connection() -> ok.
%% @private
handle_connection() ->
gen_server:cast(self(), handle_connection).

-spec handle_connection(state()) -> {noreply, state()} | {stop, normal, state()}.
-spec keepalive_loop() -> ok.
%% @private
handle_connection(#state{transport = Transport, server = Server} = State) ->
case receive_request(State) of
{ok, Request} ->
ok = bookish_spork_server:store_request(Server, Request),
case bookish_spork_server:response(Server) of
{ok, Response} ->
reply(Transport, Response, Request),
complete_connection(State, Request);
{error, no_response} ->
bookish_spork_transport:close(Transport),
{stop, normal, State}

end;
socket_closed ->
{stop, normal, State}
end.
keepalive_loop() ->
gen_server:cast(self(), keepalive_loop).

-spec complete_connection(State, Request) -> {noreply, State} | {stop, normal, State} when
State :: state(),
Request :: bookish_spork_request:t().
-spec keepalive_loop(state()) -> {noreply, state()} | {stop, normal, state()}.
%% @private
complete_connection(#state{transport = Transport} = State, Request) ->
case bookish_spork_request:is_keepalive(Request) of
true ->
handle_connection(),
{noreply, State};
false ->
bookish_spork_transport:shutdown(Transport),
{stop, normal, State}
end.

-spec receive_request(State :: state()) -> Result when
Result :: {ok, Request} | socket_closed,
Request :: bookish_spork_request:t().
%% @private
receive_request(#state{transport = Transport}) ->
keepalive_loop(State) ->
reduce_while(State, [
fun reset/1,
fun receive_request/1,
fun store_request/1,
fun pick_response/1,
fun reply/1,
fun complete_connection/1
]).

reset(#state{server = Server, transport = Transport}) ->
Request = bookish_spork_request:from_transport(Transport),
read_from_socket(Transport, Request).
{cont, #state{
server = Server,
request = Request,
transport = Transport
}}.

-spec read_from_socket(Transport, RequestIn) -> Result when
Transport :: bookish_spork_transport:t(),
RequestIn :: bookish_spork_request:t(),
Result :: {ok, RequestOut} | socket_closed,
RequestOut :: bookish_spork_request:t().
-spec receive_request(state()) -> {cont, state()} | {halt, socket_closed}.
%% @private
read_from_socket(Transport, RequestIn) ->
receive_request(#state{transport = Transport, request = RequestIn} = State) ->
case bookish_spork_transport:recv(Transport) of
{ok, {http_request, Method, {abs_path, Uri}, Version}} ->
RequestOut = bookish_spork_request:request_line(RequestIn, Method, Uri, Version),
read_from_socket(Transport, RequestOut);
?FUNCTION_NAME(State#state{request = RequestOut});
{ok, {http_header, _, Header, _, Value}} ->
RequestOut = bookish_spork_request:add_header(RequestIn, Header, Value),
read_from_socket(Transport, RequestOut);
?FUNCTION_NAME(State#state{request = RequestOut});
{ok, http_eoh} ->
Body = read_body(Transport, bookish_spork_request:content_length(RequestIn)),
ContentLength = bookish_spork_request:content_length(RequestIn),
Body = bookish_spork_transport:read_raw(Transport, ContentLength),
RequestOut = bookish_spork_request:body(RequestIn, Body),
{ok, RequestOut};
{cont, State#state{request = RequestOut}};
{ok, {http_error, HttpError}} ->
erlang:error({http_error, HttpError}, [Transport, RequestIn]);
{error, closed} ->
socket_closed;
{halt, socket_closed};
{error, enotconn} ->
socket_closed
{halt, socket_closed}
end.

-spec read_body(Transport, ContentLength) -> Body when
Transport :: bookish_spork_transport:t(),
ContentLength :: non_neg_integer(),
Body :: binary().
%% @private
read_body(Transport, ContentLength) ->
bookish_spork_transport:read_raw(Transport, ContentLength).
store_request(#state{server = Server, request = Request} = State) ->
bookish_spork_server:store_request(Server, Request),
{cont, State}.

pick_response(#state{server = Server, transport = Transport} = State) ->
case bookish_spork_server:response(Server) of
{ok, Response} ->
{cont, State#state{response = Response}};
{error, no_response} ->
bookish_spork_transport:close(Transport),
{halt, normal}
end.

-spec reply(Transport, Response, Request) -> ok when
Transport :: bookish_spork_transport:t(),
Response :: bookish_spork:stub_request_fun() | bookish_spork_response:t(),
Request :: bookish_spork_request:t().
-spec reply(state()) -> {cont, state()}.
%% @private
reply(Transport, ResponseFun, Request) when is_function(ResponseFun) ->
reply(State = #state{request = Request, response = ResponseFun}) when is_function(ResponseFun) ->
Response = ResponseFun(Request),
reply(Transport, bookish_spork_response:new(Response), Request);
reply(Transport, Response, _Request) ->
?FUNCTION_NAME(State#state{response = bookish_spork_response:new(Response)});
reply(State = #state{transport = Transport, response = Response}) ->
String = bookish_spork_response:write_str(Response, calendar:universal_time()),
bookish_spork_transport:send(Transport, [String]).
bookish_spork_transport:send(Transport, [String]),
{cont, State}.

-spec complete_connection(state()) -> {cont, state()} | {halt, normal}.
%% @private
complete_connection(State = #state{transport = Transport, request = Request}) ->
case bookish_spork_request:is_keepalive(Request) of
true ->
keepalive_loop(),
{cont, State};
false ->
bookish_spork_transport:shutdown(Transport),
{halt, normal}
end.

reduce_while(State, []) ->
{noreply, State};
reduce_while(State, [Fun|Rest]) ->
case Fun(State) of
{cont, NewState} ->
reduce_while(NewState, Rest);
{halt, Reason} ->
{stop, Reason, State}
end.
10 changes: 5 additions & 5 deletions src/bookish_spork_ssl.erl
Expand Up @@ -4,7 +4,7 @@

-export([
listen/2,
accept/2,
accept/1,
recv/2,
send/2,
close/1,
Expand All @@ -27,14 +27,14 @@ listen(Port, Options) ->
ssl:listen(Port, Options ++ ?SSL_OPTIONS ++ ?HELLO).

-ifdef(OTP_RELEASE).
accept(ListenSocket, Timeout) ->
{ok, Socket} = ssl:transport_accept(ListenSocket, Timeout),
accept(ListenSocket) ->
{ok, Socket} = ssl:transport_accept(ListenSocket),
{ok, HsSocket, Ext} = ssl:handshake(Socket),
{ok, SslSocket} = ssl:handshake_continue(HsSocket, []),
{ok, SslSocket, Ext}.
-else.
accept(ListenSocket, Timeout) ->
{ok, Socket} = ssl:transport_accept(ListenSocket, Timeout),
accept(ListenSocket) ->
{ok, Socket} = ssl:transport_accept(ListenSocket),
ok = ssl:ssl_accept(Socket),
{ok, Socket}.
-endif.
Expand Down
26 changes: 6 additions & 20 deletions src/bookish_spork_sup.erl
@@ -1,8 +1,6 @@
-module(bookish_spork_sup).
-export([
start_acceptor_sup/2,
start_handler_sup/1,
start_handler/2,
stop/1
]).

Expand All @@ -11,35 +9,23 @@
init/1
]).

-define(ACCEPTOR_SUP_FLAGS, #{
-define(SUP_FLAGS, #{
strategy => one_for_one,
intensity => 5,
period => 10
}).

-define(HANDLER_SUP_FLAGS, #{
strategy => simple_one_for_one,
intensity => 5,
period => 10
}).

-spec start_acceptor_sup(Server, ListenSocket) -> {ok, pid()} when
Server :: pid(),
ListenSocket :: bookish_spork_transport:listen().
start_acceptor_sup(Server, ListenSocket) ->
supervisor:start_link(?MODULE, {acceptor, [Server, ListenSocket]}).

start_handler_sup(Server) ->
supervisor:start_link(?MODULE, {handler, [Server]}).

start_handler(Sup, Transport) ->
supervisor:start_child(Sup, [Transport]).
supervisor:start_link(?MODULE, [Server, ListenSocket]).

stop(Sup) ->
ok = gen_server:stop(Sup).

%% @private
init({acceptor, Args}) ->
{ok, {?ACCEPTOR_SUP_FLAGS, [bookish_spork_acceptor:child_spec(Args)]}};
init({handler, Args}) ->
{ok, {?HANDLER_SUP_FLAGS, [bookish_spork_handler:child_spec(Args)]}}.
init(Args) ->
{ok, {?SUP_FLAGS, [
bookish_spork_acceptor:child_spec(Args)
]}}.
6 changes: 2 additions & 4 deletions src/bookish_spork_transport.erl
Expand Up @@ -38,9 +38,8 @@
Result :: {ok, ListenSocket} | {error, Reason},
ListenSocket :: socket(),
Reason :: system_limit | inet:posix().
-callback accept(ListenSocket, Timeout) -> Result when
-callback accept(ListenSocket) -> Result when
ListenSocket :: socket(),
Timeout :: timeout(),
Result :: {ok, Socket} | {ok, Socket, Ext} | {error, Reason},
Socket :: socket(),
Ext :: ssl:protocol_extensions(),
Expand Down Expand Up @@ -74,7 +73,6 @@
{active, false},
{reuseaddr, true}
]).
-define(ACCEPT_TIMEOUT, 5000).
-define(IS_SSL_SOCKET(Socket), is_tuple(Socket) andalso element(1, Socket) =:= sslsocket).

-spec listen(callback_module(), inet:port_number()) -> listen().
Expand All @@ -84,7 +82,7 @@ listen(Module, Port) ->

-spec accept(listen()) -> t().
accept(#listen{socket = ListenSocket, module = Module}) ->
case Module:accept(ListenSocket, ?ACCEPT_TIMEOUT) of
case Module:accept(ListenSocket) of
{ok, Socket, Ext} ->
#transport{id = generate_id(), module = Module, socket = Socket, ssl_ext = Ext};
{ok, Socket} ->
Expand Down
2 changes: 1 addition & 1 deletion test/bookish_spork_SUITE.erl
Expand Up @@ -101,7 +101,7 @@ stub_multiple_requests(_Config) ->
{"http://localhost:32002/pulti", []}, [], [{body_format, binary}]),
{ok, Request2} = bookish_spork:capture_request(),
?assertEqual("/pulti", bookish_spork_request:uri(Request2)),
?assertMatch({error, _},
?assertMatch({error, socket_closed_remotely},
httpc:request(get, {"http://localhost:32002", []}, [], [{body_format, binary}])).

stub_with_fun(_Config) ->
Expand Down

0 comments on commit 960d9ed

Please sign in to comment.