Browse files

Use a custom supervisor for ranch_conns_sup

This change was designed so that we don't have this supervisor
and ranch_listener performing the same job, namely monitoring
connection processes, the first through links and the second
through monitors.

This change also makes possible various optimizations:

 *  Acceptors don't need to know about options, maximum number
    of connections, or anything else. They can just accept,
    pass the socket to the supervisor, and when the supervisor
    replies continue accepting connections.

 *  The supervisor holds most of the information that will be
    passed to created processes. This reduces copying.

 *  The supervisor temporarily takes ownership of the socket,
    then creates the connection process and gives it ownership,
    streamlining the creation.

 *  The supervisor can hold acceptors in their receive loop if
    max_connections is reached. When this number gets below the
    limit it can then send a message to a sleeping acceptor to
    make it resume its operations.

 *  Because we know that all connection process creations are made
    from the local Erlang node, we can greatly reduce the number
    operations to be made when calling the supervisor.

 *  Because all acceptors die if this supervisor dies, we can
    remove even more operations from the calling code. We do not
    need to monitor or wait for a timeout. This reduces the call
    code to two statements: send and receive. (Thanks James Fish
    for helping with that.)

 *  The supervisor only needs to keep track of a list of pids.
    There is no children specification to be maintained, we do
    not need to handle restart strategy (no process can be
    restarted because the socket dies with it). We are using
    the process dictionary for storing the pids as it proved
    to be the simplest and fastest solution.

 *  The supervisor maintains a count of current connections,
    but also of processes (including the ones that removed
    themselves from the pool), making any query of these values
    very fast.

The supervisor should still be compatible with OTP principles.
It responds to calls from the supervisor module as expected,
although some of them are disabled and an error will be returned,
for example supervisor:start_child/2. It is also started with
proc_lib and handles system messages. sys:get_status/1 can thus
be used as expected.

We can see a great increase in the number of requests/s, a great
improvement in the latency of requests, and we can simply accept
requests faster than before. It will probably have a bigger increase
under virtualized environments, although that's only a guess.

As a result of this, we don't write much anymore in the ranch_server
ets table, so the write_concurrency option was removed. Tests were
also slightly improved to prevent race conditions.
  • Loading branch information...
1 parent 809a12f commit 33db3b0d1aafcfbc9aadbad622a4014c021ef10c @essen essen committed Mar 30, 2013
Showing with 353 additions and 352 deletions.
  1. +18 −76 src/ranch_acceptor.erl
  2. +8 −9 src/ranch_acceptors_sup.erl
  3. +157 −16 src/ranch_conns_sup.erl
  4. +11 −48 src/ranch_listener.erl
  5. +8 −8 src/ranch_listener_sup.erl
  6. +14 −88 src/ranch_server.erl
  7. +1 −1 src/ranch_sup.erl
  8. +136 −106 test/acceptor_SUITE.erl
View
94 src/ranch_acceptor.erl
@@ -16,89 +16,31 @@
-module(ranch_acceptor).
%% API.
--export([start_link/6]).
+-export([start_link/3]).
%% Internal.
--export([init/7]).
--export([loop/7]).
+-export([loop/3]).
%% API.
--spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
+-spec start_link(inet:socket(), module(), pid())
-> {ok, pid()}.
-start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
- {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
- {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, init,
- [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
- ok = ranch_server:add_acceptor(Ref, Pid),
+start_link(LSocket, Transport, ConnsSup) ->
+ Pid = spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]),
{ok, Pid}.
%% Internal.
--spec init(inet:socket(), module(), module(),
- non_neg_integer(), any(), pid(), pid()) -> no_return().
-init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- async_accept(LSocket, Transport),
- loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
-
--spec loop(inet:socket(), module(), module(),
- ranch:max_conns(), any(), pid(), pid()) -> no_return().
-loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- receive
- %% We couldn't accept the socket but it's safe to continue.
- {accept, continue} ->
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns, Opts, ListenerPid, ConnsSup);
- %% Found my sockets!
- {accept, CSocket} ->
- {ok, ConnPid} = supervisor:start_child(ConnsSup,
- [ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, ConnPid),
- ConnPid ! {shoot, ListenerPid},
- {ok, MaxConns2} = case MaxConns of
- infinity ->
- {ok, infinity};
- _ ->
- NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
- maybe_wait(ListenerPid, MaxConns, NbConns)
- end,
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the max number of connections allowed concurrently.
- {set_max_conns, MaxConns2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the protocol options.
- {set_opts, Opts2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns, Opts2, ListenerPid, ConnsSup)
- end.
-
--spec maybe_wait(pid(), MaxConns, non_neg_integer())
- -> {ok, MaxConns} when MaxConns::ranch:max_conns().
-maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
- {ok, MaxConns};
-maybe_wait(ListenerPid, MaxConns, NbConns) ->
- receive
- {set_max_conns, MaxConns2} ->
- maybe_wait(ListenerPid, MaxConns2, NbConns)
- after 0 ->
- NbConns2 = ranch_server:count_connections(ListenerPid),
- maybe_wait(ListenerPid, MaxConns, NbConns2)
- end.
-
--spec async_accept(inet:socket(), module()) -> ok.
-async_accept(LSocket, Transport) ->
- AcceptorPid = self(),
- _ = spawn_link(fun() ->
- case Transport:accept(LSocket, infinity) of
- {ok, CSocket} ->
- Transport:controlling_process(CSocket, AcceptorPid),
- AcceptorPid ! {accept, CSocket};
- %% We want to crash if the listening socket got closed.
- {error, Reason} when Reason =/= closed ->
- AcceptorPid ! {accept, continue}
- end
- end),
- ok.
+-spec loop(inet:socket(), module(), pid()) -> no_return().
+loop(LSocket, Transport, ConnsSup) ->
+ _ = case Transport:accept(LSocket, infinity) of
+ {ok, CSocket} ->
+ Transport:controlling_process(CSocket, ConnsSup),
+ %% This call will not return until process has been started
+ %% AND we are below the maximum number of connections.
+ ranch_conns_sup:start_protocol(ConnsSup, CSocket);
+ %% We want to crash if the listening socket got closed.
+ {error, Reason} when Reason =/= closed ->
+ ok
+ end,
+ ?MODULE:loop(LSocket, Transport, ConnsSup).
View
17 src/ranch_acceptors_sup.erl
@@ -17,24 +17,23 @@
-behaviour(supervisor).
%% API.
--export([start_link/5]).
+-export([start_link/4]).
%% supervisor.
-export([init/1]).
%% API.
--spec start_link(any(), non_neg_integer(), module(), any(),
- module()) -> {ok, pid()}.
-start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) ->
- supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
- Protocol]).
+-spec start_link(any(), non_neg_integer(), module(), any())
+ -> {ok, pid()}.
+start_link(Ref, NbAcceptors, Transport, TransOpts) ->
+ supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts]).
%% supervisor.
-init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
+init([Ref, NbAcceptors, Transport, TransOpts]) ->
ListenerPid = ranch_server:lookup_listener(Ref),
- ConnsPid = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
LSocket = case proplists:get_value(socket, TransOpts) of
undefined ->
{ok, Socket} = Transport:listen(TransOpts),
@@ -46,7 +45,7 @@ init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
ranch_listener:set_port(ListenerPid, Port),
Procs = [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
- Ref, LSocket, Transport, Protocol, ListenerPid, ConnsPid
+ LSocket, Transport, ConnsSup
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
View
173 src/ranch_conns_sup.erl
@@ -13,30 +13,171 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @private
+%%
+%% Make sure to never reload this module outside a release upgrade,
+%% as calling l(ranch_conns_sup) twice will kill the process and all
+%% the currently open connections.
-module(ranch_conns_sup).
--behaviour(supervisor).
%% API.
--export([start_link/1]).
--export([start_protocol/5]).
+-export([start_link/3]).
+-export([start_protocol/2]).
+-export([active_connections/1]).
+
+%% Supervisor internals.
+-export([init/4]).
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
-%% supervisor.
--export([init/1]).
+-record(state, {
+ parent = undefined :: pid(),
+ listener_pid = undefined :: pid(),
+ transport = undefined :: module(),
+ protocol = undefined :: module(),
+ opts :: any(),
+ max_conns = undefined :: non_neg_integer() | infinity
+}).
%% API.
--spec start_link(any()) -> {ok, pid()}.
-start_link(Ref) ->
- supervisor:start_link(?MODULE, Ref).
+-spec start_link(any(), module(), module()) -> {ok, pid()}.
+start_link(Ref, Transport, Protocol) ->
+ proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]).
+
+%% We can safely assume we are on the same node as the supervisor.
+%%
+%% We can also safely avoid having a monitor and a timeout here
+%% because only three things can happen:
+%% * The supervisor died; rest_for_one strategy killed all acceptors
+%% so this very calling process is going to di--
+%% * There's too many connections, the supervisor will resume the
+%% acceptor only when we get below the limit again.
+%% * The supervisor is overloaded, there's either too many acceptors
+%% or the max_connections limit is too large. It's better if we
+%% don't keep accepting connections because this leaves
+%% more room for the situation to be resolved.
+%%
+%% We do not need the reply, we only need the ok from the supervisor
+%% to continue. The supervisor sends its own pid when the acceptor can
+%% continue.
+-spec start_protocol(pid(), inet:socket()) -> ok.
+start_protocol(SupPid, Socket) ->
+ SupPid ! {?MODULE, start_protocol, self(), Socket},
+ receive SupPid -> ok end.
--spec start_protocol(pid(), inet:socket(), module(), module(), any())
- -> {ok, pid()}.
-start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) ->
- Protocol:start_link(ListenerPid, Socket, Transport, Opts).
+%% We can't make the above assumptions here. This function might be
+%% called from anywhere.
+-spec active_connections(pid()) -> non_neg_integer().
+active_connections(SupPid) ->
+ Tag = erlang:monitor(process, SupPid),
+ erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
+ [noconnect]),
+ receive
+ {Tag, Ret} ->
+ erlang:demonitor(Tag, [flush]),
+ Ret;
+ {'DOWN', Tag, _, _, noconnection} ->
+ exit({nodedown, node(SupPid)});
+ {'DOWN', Tag, _, _, Reason} ->
+ exit(Reason)
+ after 5000 ->
+ erlang:demonitor(Tag, [flush]),
+ exit(timeout)
+ end.
-%% supervisor.
+%% Supervisor internals.
-init(Ref) ->
+-spec init(pid(), any(), module(), module()) -> no_return().
+init(Parent, Ref, Transport, Protocol) ->
+ process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, self()),
- {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []},
- temporary, brutal_kill, worker, [?MODULE]}]}}.
+ ListenerPid = ranch_server:lookup_listener(Ref),
+ {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
+ {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
+ ok = proc_lib:init_ack(Parent, {ok, self()}),
+ loop(#state{parent=Parent, listener_pid=ListenerPid, transport=Transport,
+ protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []).
+
+loop(State=#state{parent=Parent, listener_pid=ListenerPid,
+ transport=Transport, protocol=Protocol, opts=Opts,
+ max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
+ receive
+ {?MODULE, start_protocol, To, Socket} ->
+ case Protocol:start_link(ListenerPid, Socket, Transport, Opts) of
+ {ok, Pid} ->
+ Transport:controlling_process(Socket, Pid),
+ Pid ! {shoot, ListenerPid},
+ put(Pid, true),
+ CurConns2 = CurConns + 1,
+ if CurConns2 < MaxConns ->
+ To ! self(),
+ loop(State, CurConns2, NbChildren + 1,
+ Sleepers);
+ true ->
+ loop(State, CurConns2, NbChildren + 1,
+ [To|Sleepers])
+ end;
+ _ ->
+ To ! self(),
+ loop(State, CurConns, NbChildren, Sleepers)
+ end;
+ {?MODULE, active_connections, To, Tag} ->
+ To ! {Tag, CurConns},
+ loop(State, CurConns, NbChildren, Sleepers);
+ %% Remove a connection from the count of connections.
+ {remove_connection, ListenerPid} ->
+ loop(State, CurConns - 1, NbChildren, Sleepers);
+ %% Upgrade the max number of connections allowed concurrently.
+ %% We resume all sleeping acceptors if this number increases.
+ {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
+ _ = [To ! self() || To <- Sleepers],
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, []);
+ {set_max_conns, MaxConns2} ->
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, Sleepers);
+ %% Upgrade the protocol options.
+ {set_opts, Opts2} ->
+ loop(State#state{opts=Opts2},
+ CurConns, NbChildren, Sleepers);
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {'EXIT', Pid, _} when Sleepers =:= [] ->
+ erase(Pid),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers);
+ %% Resume a sleeping acceptor if needed.
+ {'EXIT', Pid, _} ->
+ erase(Pid),
+ [To|Sleepers2] = Sleepers,
+ To ! self(),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, CurConns, NbChildren, Sleepers});
+ %% Calls from the supervisor module.
+ {'$gen_call', {To, Tag}, which_children} ->
+ Pids = get_keys(true),
+ Children = [{Protocol, Pid, worker, [Protocol]}
+ || Pid <- Pids],
+ To ! {Tag, Children},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, count_children} ->
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ To ! {Tag, Counts},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, _} ->
+ To ! {Tag, {error, ?MODULE}},
+ loop(State, CurConns, NbChildren, Sleepers)
+ end.
+
+system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
+ loop(State, CurConns, NbChildren, Sleepers).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.
View
59 src/ranch_listener.erl
@@ -19,7 +19,6 @@
%% API.
-export([start_link/3]).
-export([stop/1]).
--export([add_connection/2]).
-export([remove_connection/1]).
-export([get_port/1]).
-export([set_port/2]).
@@ -40,8 +39,7 @@
ref :: any(),
max_conns = undefined :: ranch:max_conns(),
port = undefined :: undefined | inet:port_number(),
- proto_opts = undefined :: any(),
- rm_diff = 0 :: non_neg_integer()
+ proto_opts = undefined :: any()
}).
%% API.
@@ -56,27 +54,16 @@ start_link(Ref, MaxConns, ProtoOpts) ->
stop(ServerPid) ->
gen_server:call(ServerPid, stop).
-%% @doc Add a connection to the listener's pool.
--spec add_connection(pid(), pid()) -> non_neg_integer().
-add_connection(ServerPid, ConnPid) ->
- ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
- ranch_server:add_connection(ServerPid).
-
%% @doc Remove this process' connection from the pool.
%%
%% Useful if you have long-lived connections that aren't taking up
%% resources and shouldn't be counted in the limited number of running
%% connections.
--spec remove_connection(pid()) -> non_neg_integer().
+-spec remove_connection(pid()) -> ok.
remove_connection(ServerPid) ->
- try
- Count = ranch_server:remove_connection(ServerPid),
- ok = gen_server:cast(ServerPid, remove_connection),
- Count
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
+ ConnsSup = ranch_server:find_connections_sup(ServerPid),
+ ConnsSup ! {remove_connection, ServerPid},
+ ok.
%% @doc Return the listener's port.
-spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -111,12 +98,8 @@ set_protocol_options(ServerPid, ProtoOpts) ->
%% gen_server.
%% @private
-init([Ref, infinity, ProtoOpts]) ->
- ok = ranch_server:insert_listener(Ref, self()),
- {ok, #state{ref=Ref, max_conns=infinity, proto_opts=ProtoOpts}};
init([Ref, MaxConns, ProtoOpts]) ->
ok = ranch_server:insert_listener(Ref, self()),
- ranch_server:add_connections_counter(self()),
{ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
%% @private
@@ -125,48 +108,28 @@ handle_call(get_port, _From, State=#state{port=Port}) ->
handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
{reply, {ok, MaxConns}, State};
handle_call({set_max_connections, MaxConnections}, _From,
- State=#state{ref=Ref, max_conns=CurrMax, rm_diff=CurrDiff}) ->
- RmDiff = case {MaxConnections, CurrMax} of
- {infinity, _} -> % moving to infinity, delete connection key
- ranch_server:remove_connections_counter(self()),
- 0;
- {_, infinity} -> % moving away from infinity, create connection key
- ranch_server:add_connections_counter(self()),
- CurrDiff;
- {_, _} -> % stay current
- CurrDiff
- end,
- ranch_server:send_to_acceptors(Ref, {set_max_conns, MaxConnections}),
- {reply, ok, State#state{max_conns=MaxConnections, rm_diff=RmDiff}};
+ State=#state{ref=Ref}) ->
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup ! {set_max_conns, MaxConnections},
+ {reply, ok, State#state{max_conns=MaxConnections}};
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
- ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup ! {set_opts, ProtoOpts},
{reply, ok, State#state{proto_opts=ProtoOpts}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call(_, _From, State) ->
{reply, ignored, State}.
%% @private
-handle_cast({add_connection, ConnPid}, State) ->
- _ = erlang:monitor(process, ConnPid),
- {noreply, State};
-handle_cast(remove_connection, State=#state{max_conns=infinity}) ->
- {noreply, State};
-handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff + 1}};
handle_cast({set_port, Port}, State) ->
{noreply, State#state{port=Port}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
- _ = ranch_server:remove_connection(self()),
- {noreply, State};
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff - 1}};
handle_info(_Info, State) ->
{noreply, State}.
View
16 src/ranch_listener_sup.erl
@@ -30,23 +30,23 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
supervisor:start_link(?MODULE, {
Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts
- }).
+ }).
%% supervisor.
init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) ->
ChildSpecs = [
%% listener
{ranch_listener, {ranch_listener, start_link,
- [Ref, MaxConns, ProtoOpts]},
- permanent, 5000, worker, [ranch_listener]},
+ [Ref, MaxConns, ProtoOpts]},
+ permanent, 5000, worker, [ranch_listener]},
%% conns_sup
- {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]},
- permanent, infinity, supervisor, [ranch_conns_sup]},
+ {ranch_conns_sup, {ranch_conns_sup, start_link,
+ [Ref, Transport, Protocol]},
+ permanent, infinity, supervisor, [ranch_conns_sup]},
%% acceptors_sup
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
- [Ref, NbAcceptors, Transport, TransOpts, Protocol]
- }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
+ [Ref, NbAcceptors, Transport, TransOpts]
+ }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
-
View
102 src/ranch_server.erl
@@ -22,13 +22,8 @@
-export([lookup_listener/1]).
-export([set_connections_sup/2]).
-export([lookup_connections_sup/1]).
--export([add_acceptor/2]).
--export([send_to_acceptors/2]).
--export([add_connection/1]).
+-export([find_connections_sup/1]).
-export([count_connections/1]).
--export([remove_connection/1]).
--export([add_connections_counter/1]).
--export([remove_connections_counter/1]).
%% gen_server.
-export([init/1]).
@@ -40,8 +35,7 @@
-define(TAB, ?MODULE).
--type key() :: {listener | acceptors, any()}.
--type monitors() :: [{{reference(), pid()}, key()}].
+-type monitors() :: [{{reference(), pid()}, any()}].
-record(state, {
monitors = [] :: monitors()
}).
@@ -68,63 +62,23 @@ lookup_listener(Ref) ->
-spec set_connections_sup(any(), pid()) -> ok.
set_connections_sup(Ref, Pid) ->
true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}),
+ true = ets:insert_new(?TAB, {{conns_sup, lookup_listener(Ref)}, Pid}),
ok.
%% @doc Lookup a connection supervisor used by specific listener.
-spec lookup_connections_sup(any()) -> pid() | undefined.
lookup_connections_sup(Ref) ->
ets:lookup_element(?TAB, {listener, Ref}, 3).
-%% @doc Add an acceptor for the given listener.
--spec add_acceptor(any(), pid()) -> ok.
-add_acceptor(Ref, Pid) ->
- gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}).
-
-%% @doc Send a message to all acceptors of the given listener.
--spec send_to_acceptors(any(), any()) -> ok.
-send_to_acceptors(Ref, Msg) ->
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- _ = [Pid ! Msg || Pid <- Acceptors],
- ok.
-
-%% @doc Add a connection to the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec add_connection(pid()) -> non_neg_integer().
-add_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+%% @doc Find a connection supervisor using the listener pid.
+-spec find_connections_sup(pid()) -> pid().
+find_connections_sup(Pid) ->
+ ets:lookup_element(?TAB, {conns_sup, Pid}, 2).
%% @doc Count the number of connections in the connection pool.
--spec count_connections(pid()) -> non_neg_integer().
-count_connections(ListenerPid) ->
- try
- ets:update_counter(?TAB, {connections, ListenerPid}, 0)
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
-
-%% @doc Remove a connection from the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec remove_connection(pid()) -> non_neg_integer().
-remove_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, -1).
-
-
-%% @doc Add a connections counter to the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-add_connections_counter(Pid) ->
- true = ets:insert_new(?TAB, {{connections, Pid}, 0}).
-
-%% @doc remove a connections counter from the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-remove_connections_counter(Pid) ->
- true = ets:delete(?TAB, {connections, Pid}).
+-spec count_connections(any()) -> non_neg_integer().
+count_connections(Ref) ->
+ ranch_conns_sup:active_connections(lookup_connections_sup(Ref)).
%% gen_server.
@@ -138,27 +92,18 @@ handle_call(_Request, _From, State) ->
%% @private
handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
- true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
MonitorRef = erlang:monitor(process, Pid),
{noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
-handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
- MonitorRef = erlang:monitor(process, Pid),
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
- {noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
-handle_cast({add_connection, Pid}, State) ->
- _ = erlang:monitor(process, Pid),
- {noreply, State};
+ monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}};
handle_cast(_Request, State) ->
{noreply, State}.
%% @private
handle_info({'DOWN', MonitorRef, process, Pid, _},
State=#state{monitors=Monitors}) ->
- {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
- Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors),
+ {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
+ true = ets:delete(?TAB, {listener, Ref}),
+ Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
{noreply, State#state{monitors=Monitors2}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -170,22 +115,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
--spec remove_process(key(), reference(), pid(), Monitors)
- -> Monitors when Monitors::monitors() .
-remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
- true = ets:delete(?TAB, Key),
- true = ets:delete(?TAB, {acceptors, Ref}),
- remove_connections_counter(Pid),
- lists:keydelete({MonitorRef, Pid}, 1, Monitors);
-remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
- try
- Acceptors = ets:lookup_element(?TAB, Key, 2),
- true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)})
- catch
- error:_ ->
- ok
- end,
- lists:keydelete({MonitorRef, Pid}, 1, Monitors).
View
2 src/ranch_sup.erl
@@ -34,7 +34,7 @@ start_link() ->
init([]) ->
ranch_server = ets:new(ranch_server, [
- ordered_set, public, named_table, {write_concurrency, true}]),
+ ordered_set, public, named_table]),
Procs = [
{ranch_server, {ranch_server, start_link, []},
permanent, 5000, worker, [ranch_server]}
View
242 test/acceptor_SUITE.erl
@@ -111,215 +111,242 @@ misc_bad_transport(_) ->
%% ssl.
ssl_accept_error(Config) ->
- {ok, _} = ranch:start_listener(ssl_accept_error, 1,
+ Name = ssl_accept_error,
+ {ok, ListenerSup} = ranch:start_listener(Name, 1,
ranch_ssl, [{port, 0},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}],
echo_protocol, []),
- Port = ranch:get_port(ssl_accept_error),
- [AcceptorPid] = ets:lookup_element(ranch_server,
- {acceptors, ssl_accept_error}, 2),
+ Port = ranch:get_port(Name),
+ ListenerSupChildren = supervisor:which_children(ListenerSup),
+ {_, AcceptorsSup, _, _}
+ = lists:keyfind(ranch_acceptors_sup, 1, ListenerSupChildren),
+ [{{acceptor, _, _}, AcceptorPid, _, _}]
+ = supervisor:which_children(AcceptorsSup),
true = is_process_alive(AcceptorPid),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:close(Socket),
receive after 500 -> ok end,
- true = is_process_alive(AcceptorPid).
+ true = is_process_alive(AcceptorPid),
+ ranch:stop_listener(Name).
ssl_accept_socket(Config) ->
%%% XXX we can't do the spawn to test the controlling process change
%%% because of the bug in ssl
+ Name = ssl_accept_socket,
{ok, S} = ssl:listen(0,
[{certfile, ?config(data_dir, Config) ++ "cert.pem"}, binary,
{active, false}, {packet, raw}, {reuseaddr, true}]),
- {ok, _} = ranch:start_listener(ssl_accept_socket, 1,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_ssl, [{socket, S}], echo_protocol, []),
- Port = ranch:get_port(ssl_accept_socket),
+ Port = ranch:get_port(Name),
{ok, Socket} = ssl:connect("localhost", Port,
[binary, {active, false}, {packet, raw},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}]),
ok = ssl:send(Socket, <<"TCP Ranch is working!">>),
{ok, <<"TCP Ranch is working!">>} = ssl:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(ssl_accept_socket),
+ ok = ranch:stop_listener(Name),
{error, closed} = ssl:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(ssl_accept_socket) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
ssl_active_echo(Config) ->
- {ok, _} = ranch:start_listener(ssl_active_echo, 1,
+ Name = ssl_active_echo,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_ssl, [{port, 0},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}],
active_echo_protocol, []),
- Port = ranch:get_port(ssl_active_echo),
+ Port = ranch:get_port(Name),
{ok, Socket} = ssl:connect("localhost", Port,
[binary, {active, false}, {packet, raw},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}]),
ok = ssl:send(Socket, <<"SSL Ranch is working!">>),
{ok, <<"SSL Ranch is working!">>} = ssl:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(ssl_active_echo),
+ ok = ranch:stop_listener(Name),
{error, closed} = ssl:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(ssl_active_echo) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
ssl_echo(Config) ->
- {ok, _} = ranch:start_listener(ssl_echo, 1,
+ Name = ssl_echo,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_ssl, [{port, 0},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}],
echo_protocol, []),
- Port = ranch:get_port(ssl_echo),
+ Port = ranch:get_port(Name),
{ok, Socket} = ssl:connect("localhost", Port,
[binary, {active, false}, {packet, raw},
{certfile, ?config(data_dir, Config) ++ "cert.pem"}]),
ok = ssl:send(Socket, <<"SSL Ranch is working!">>),
{ok, <<"SSL Ranch is working!">>} = ssl:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(ssl_echo),
+ ok = ranch:stop_listener(Name),
{error, closed} = ssl:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(ssl_echo) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
%% tcp.
tcp_accept_socket(_) ->
+ Name = tcp_accept_socket,
Ref = make_ref(),
Parent = self(),
spawn(fun() ->
- {ok, S} = gen_tcp:listen(0, [binary, {active, false}, {packet, raw},
- {reuseaddr, true}]),
- {ok, _} = ranch:start_listener(tcp_accept_socket, 1,
- ranch_tcp, [{socket, S}], echo_protocol, []),
- Parent ! Ref
- end),
+ {ok, S} = gen_tcp:listen(0, [binary, {active, false}, {packet, raw},
+ {reuseaddr, true}]),
+ {ok, _} = ranch:start_listener(Name, 1,
+ ranch_tcp, [{socket, S}], echo_protocol, []),
+ Parent ! Ref
+ end),
receive
Ref -> ok
end,
-
- Port = ranch:get_port(tcp_accept_socket),
+ Port = ranch:get_port(Name),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>),
{ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(tcp_accept_socket),
+ ok = ranch:stop_listener(Name),
{error, closed} = gen_tcp:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(tcp_accept_socket) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
tcp_active_echo(_) ->
- {ok, _} = ranch:start_listener(tcp_active_echo, 1,
+ Name = tcp_active_echo,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}], active_echo_protocol, []),
- Port = ranch:get_port(tcp_active_echo),
+ Port = ranch:get_port(Name),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>),
{ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(tcp_active_echo),
+ ok = ranch:stop_listener(Name),
{error, closed} = gen_tcp:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(tcp_active_echo) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
tcp_echo(_) ->
- {ok, _} = ranch:start_listener(tcp_echo, 1,
+ Name = tcp_echo,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}], echo_protocol, []),
- Port = ranch:get_port(tcp_echo),
+ Port = ranch:get_port(Name),
{ok, Socket} = gen_tcp:connect("localhost", Port,
[binary, {active, false}, {packet, raw}]),
ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>),
{ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000),
- ok = ranch:stop_listener(tcp_echo),
+ ok = ranch:stop_listener(Name),
{error, closed} = gen_tcp:recv(Socket, 0, 1000),
%% Make sure the listener stopped.
- {'EXIT', _} = begin catch ranch:get_port(tcp_echo) end,
+ {'EXIT', _} = begin catch ranch:get_port(Name) end,
ok.
tcp_max_connections(_) ->
- {ok, _} = ranch:start_listener(tcp_max_connections, 1,
+ Name = tcp_max_connections,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}, {max_connections, 10}],
notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
- Port = ranch:get_port(tcp_max_connections),
- %% @todo We'll probably want a more direct interface to count_connections.
- ListenerPid = ranch_server:lookup_listener(tcp_max_connections),
+ Port = ranch:get_port(Name),
ok = connect_loop(Port, 11, 150),
- 10 = ranch_server:count_connections(ListenerPid),
+ 10 = ranch_server:count_connections(Name),
10 = receive_loop(connected, 400),
- 1 = receive_loop(connected, 1000).
+ 1 = receive_loop(connected, 1000),
+ ranch:stop_listener(Name).
tcp_max_connections_and_beyond(_) ->
- {ok, _} = ranch:start_listener(tcp_max_connections_and_beyond, 1,
+ Name = tcp_max_connections_and_beyond,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}, {max_connections, 10}],
remove_conn_and_wait_protocol, [{remove, true}]),
- Port = ranch:get_port(tcp_max_connections_and_beyond),
- %% @todo We'll probably want a more direct interface to count_connections.
- ListenerPid = ranch_server:lookup_listener(tcp_max_connections_and_beyond),
+ Port = ranch:get_port(Name),
ok = connect_loop(Port, 10, 0),
- 0 = ranch_server:count_connections(ListenerPid),
- ranch:set_protocol_options(tcp_max_connections_and_beyond,
- [{remove, false}]),
- receive after 500 -> ok end,
+ receive after 250 -> ok end,
+ 0 = ranch_server:count_connections(Name),
+ 10 = length(supervisor:which_children(
+ ranch_server:lookup_connections_sup(Name))),
+ Counts = supervisor:count_children(
+ ranch_server:lookup_connections_sup(Name)),
+ {_, 1} = lists:keyfind(specs, 1, Counts),
+ {_, 0} = lists:keyfind(supervisors, 1, Counts),
+ {_, 10} = lists:keyfind(active, 1, Counts),
+ {_, 10} = lists:keyfind(workers, 1, Counts),
+ ranch:set_protocol_options(Name, [{remove, false}]),
+ receive after 250 -> ok end,
ok = connect_loop(Port, 10, 0),
- receive after 500 -> ok end,
- 10 = ranch_server:count_connections(ListenerPid).
+ receive after 250 -> ok end,
+ 10 = ranch_server:count_connections(Name),
+ 20 = length(supervisor:which_children(
+ ranch_server:lookup_connections_sup(Name))),
+ Counts2 = supervisor:count_children(
+ ranch_server:lookup_connections_sup(Name)),
+ {_, 20} = lists:keyfind(active, 1, Counts2),
+ {_, 20} = lists:keyfind(workers, 1, Counts2),
+ ranch:stop_listener(Name).
tcp_set_max_connections(_) ->
- {ok, _} = ranch:start_listener(tcp_set_max_connections, 1,
+ Name = tcp_set_max_connections,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}, {max_connections, 10}],
notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
- Port = ranch:get_port(tcp_set_max_connections),
- %% @todo We'll probably want a more direct interface to count_connections.
- ListenerPid = ranch_server:lookup_listener(tcp_set_max_connections),
+ Port = ranch:get_port(Name),
ok = connect_loop(Port, 20, 0),
- 10 = ranch_server:count_connections(ListenerPid),
+ 10 = ranch_server:count_connections(Name),
10 = receive_loop(connected, 1000),
- 10 = ranch:get_max_connections(tcp_set_max_connections),
- ranch:set_max_connections(tcp_set_max_connections, 20),
+ 10 = ranch:get_max_connections(Name),
+ ranch:set_max_connections(Name, 20),
10 = receive_loop(connected, 1000),
- 20 = ranch:get_max_connections(tcp_set_max_connections).
+ 20 = ranch:get_max_connections(Name),
+ ranch:stop_listener(Name).
tcp_infinity_max_connections(_) ->
- {ok, _} = ranch:start_listener(tcp_infinity_max_connections, 1,
+ Name = tcp_infinity_max_connections,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}, {max_connections, 10}],
notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
- Port = ranch:get_port(tcp_infinity_max_connections),
- %% @todo We'll probably want a more direct interface to count_connections.
- ListenerPid = ranch_server:lookup_listener(tcp_infinity_max_connections),
+ Port = ranch:get_port(Name),
ok = connect_loop(Port, 20, 0),
- 10 = ranch_server:count_connections(ListenerPid),
+ 10 = ranch_server:count_connections(Name),
10 = receive_loop(connected, 1000),
- 10 = ranch:get_max_connections(tcp_infinity_max_connections),
- ranch:set_max_connections(tcp_infinity_max_connections, infinity),
- 0 = ranch_server:count_connections(ListenerPid),
- infinity = ranch:get_max_connections(tcp_infinity_max_connections),
- ranch:set_max_connections(tcp_infinity_max_connections, 10),
- 0 = ranch_server:count_connections(ListenerPid),
+ 10 = ranch_server:count_connections(Name),
+ 10 = ranch:get_max_connections(Name),
+ ranch:set_max_connections(Name, infinity),
+ receive after 250 -> ok end,
+ 20 = ranch_server:count_connections(Name),
+ infinity = ranch:get_max_connections(Name),
+ ranch:set_max_connections(Name, 10),
+ 20 = ranch_server:count_connections(Name),
10 = receive_loop(connected, 1000),
- 10 = ranch_server:count_connections(ListenerPid). % count could be off
+ ranch:stop_listener(Name).
tcp_upgrade(_) ->
- {ok, _} = ranch:start_listener(tcp_upgrade, 1,
+ Name = tcp_upgrade,
+ {ok, _} = ranch:start_listener(Name, 1,
ranch_tcp, [{port, 0}],
notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
- Port = ranch:get_port(tcp_upgrade),
+ Port = ranch:get_port(Name),
ok = connect_loop(Port, 1, 0),
receive connected -> ok after 1000 -> error(timeout) end,
- ranch:set_protocol_options(tcp_upgrade, [{msg, upgraded}, {pid, self()}]),
+ ranch:set_protocol_options(Name, [{msg, upgraded}, {pid, self()}]),
ok = connect_loop(Port, 1, 0),
- receive upgraded -> ok after 1000 -> error(timeout) end.
+ receive upgraded -> ok after 1000 -> error(timeout) end,
+ ranch:stop_listener(Name).
%% Supervisor tests
supervisor_clean_restart(_) ->
%% There we verify that mature listener death will not let
%% whole supervisor down and also the supervisor itself will
%% restart everything properly.
- Ref = supervisor_clean_restart,
+ Name = supervisor_clean_restart,
NbAcc = 4,
- {ok, Pid} = ranch:start_listener(Ref,
+ {ok, Pid} = ranch:start_listener(Name,
NbAcc, ranch_tcp, [{port, 0}], echo_protocol, []),
%% Trace supervisor spawns.
1 = erlang:trace(Pid, true, [procs, set_on_spawn]),
- ListenerPid0 = ranch_server:lookup_listener(Ref),
+ ListenerPid0 = ranch_server:lookup_listener(Name),
erlang:exit(ListenerPid0, kill),
receive after 1000 -> ok end,
%% Verify that supervisor is alive
@@ -339,22 +366,24 @@ supervisor_clean_restart(_) ->
error(invalid_restart)
after 1000 -> ok end,
%% Verify that new children registered themselves properly.
- ListenerPid = ranch_server:lookup_listener(Ref),
+ ListenerPid = ranch_server:lookup_listener(Name),
_ = erlang:trace(all, false, [all]),
- ok = clean_traces().
+ ok = clean_traces(),
+ ranch:stop_listener(Name).
supervisor_clean_child_restart(_) ->
%% Then we verify that only parts of the supervision tree
%% restarted in the case of failure.
- Ref = supervisor_clean_child_restart,
+ Name = supervisor_clean_child_restart,
%% Trace socket allocations.
_ = erlang:trace(new, true, [call]),
- 1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]),
- {ok, Pid} = ranch:start_listener(Ref,
+ 1 = erlang:trace_pattern({ranch_tcp, listen, 1},
+ [{'_', [], [{return_trace}]}], [global]),
+ {ok, Pid} = ranch:start_listener(Name,
1, ranch_tcp, [{port, 0}], echo_protocol, []),
%% Trace supervisor spawns.
1 = erlang:trace(Pid, true, [procs, set_on_spawn]),
- ListenerPid0 = ranch_server:lookup_listener(Ref),
+ ListenerPid = ranch_server:lookup_listener(Name),
%% Manually shut the listening socket down.
LSocket = receive
{trace, _, return_from, {ranch_tcp, listen, 1}, {ok, Socket}} ->
@@ -366,39 +395,39 @@ supervisor_clean_child_restart(_) ->
receive after 1000 -> ok end,
%% Verify that supervisor and its first two children are alive.
true = is_process_alive(Pid),
- true = is_process_alive(ListenerPid0),
+ true = is_process_alive(ListenerPid),
%% Check that acceptors_sup is restarted properly.
AccSupPid = receive {trace, Pid, spawn, Pid1, _} -> Pid1 end,
- AccPid = receive {trace, AccSupPid, spawn, Pid2, _} -> Pid2 end,
- receive {trace, AccPid, spawn, _, _} -> ok end,
+ receive {trace, AccSupPid, spawn, _, _} -> ok end,
%% No more traces then.
receive
{trace, _, spawn, _, _} -> error(invalid_restart)
after 1000 -> ok end,
%% Verify that children still registered right.
- ListenerPid0 = ranch_server:lookup_listener(Ref),
+ ListenerPid = ranch_server:lookup_listener(Name),
_ = erlang:trace_pattern({ranch_tcp, listen, 1}, false, []),
_ = erlang:trace(all, false, [all]),
ok = clean_traces(),
- ok.
+ ranch:stop_listener(Name).
supervisor_conns_alive(_) ->
%% And finally we make sure that in the case of partial failure
%% live connections are not being killed.
- Ref = supervisor_conns_alive,
+ Name = supervisor_conns_alive,
_ = erlang:trace(new, true, [call]),
- 1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]),
- {ok, _} = ranch:start_listener(Ref,
- 1, ranch_tcp, [{port, 0}], remove_conn_and_wait_protocol, [{remove, false}]),
- ok,
+ 1 = erlang:trace_pattern({ranch_tcp, listen, 1},
+ [{'_', [], [{return_trace}]}], [global]),
+ {ok, _} = ranch:start_listener(Name, 1,
+ ranch_tcp, [{port, 0}],
+ remove_conn_and_wait_protocol, [{remove, false}]),
%% Get the listener socket
LSocket = receive
{trace, _, return_from, {ranch_tcp, listen, 1}, {ok, S}} ->
S
after 0 ->
error(lsocket_unknown)
end,
- TcpPort = ranch:get_port(Ref),
+ TcpPort = ranch:get_port(Name),
{ok, Socket} = gen_tcp:connect("localhost", TcpPort,
[binary, {active, true}, {packet, raw}]),
%% Shut the socket down
@@ -408,17 +437,8 @@ supervisor_conns_alive(_) ->
ok = gen_tcp:send(Socket, <<"poke">>),
receive {tcp_closed, _} -> ok end,
_ = erlang:trace(all, false, [all]),
- ok = clean_traces().
-
-clean_traces() ->
- receive
- {trace, _, _, _} ->
- clean_traces();
- {trace, _, _, _, _} ->
- clean_traces()
- after 0 ->
- ok
- end.
+ ok = clean_traces(),
+ ranch:stop_listener(Name).
%% Utility functions.
@@ -438,3 +458,13 @@ receive_loop(Message, Timeout, N) ->
after Timeout ->
N
end.
+
+clean_traces() ->
+ receive
+ {trace, _, _, _} ->
+ clean_traces();
+ {trace, _, _, _, _} ->
+ clean_traces()
+ after 0 ->
+ ok
+ end.

0 comments on commit 33db3b0

Please sign in to comment.