Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding support for multiple listeners per gen_nb_server instance

  • Loading branch information...
commit bfea2e584a038ae0139903394e6a131243b2e83a 1 parent aca066e
@bigkevmcd bigkevmcd authored
View
3  .gitignore
@@ -1 +1,2 @@
-ebin/*.beam
+ebin/*.beam
+doc
View
2  priv/example/Makefile
@@ -4,7 +4,7 @@ ebin:
mkdir ebin
compile: ebin
- cd src;erl -pa ../../ebin -make
+ cd src;erl -pa ../../../ebin -make
clean:
rm -rf ebin
View
BIN  priv/example/ebin/example.beam
Binary file not shown
View
BIN  priv/example/ebin/http_server.beam
Binary file not shown
View
BIN  priv/example/ebin/http_worker.beam
Binary file not shown
View
80 priv/example/src/example.erl
@@ -20,52 +20,74 @@
-module(example).
--export([start_link/2]).
+-export([start_link/0,
+ add_listener/3,
+ remove_listener/3]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
--export([terminate/2, sock_opts/0, new_connection/2]).
+-export([init/2, handle_call/3, handle_cast/2, handle_info/2]).
+-export([terminate/2, sock_opts/0, new_connection/4]).
-behavior(gen_nb_server).
-start_link(IpAddr, Port) ->
- gen_nb_server:start_link(?MODULE, IpAddr, Port, []).
-
-init([]) ->
- {ok, []}.
-
+start_link() ->
+ gen_nb_server:start_link(?MODULE, []).
+
+add_listener(Pid, IpAddr, Port) ->
+ gen_server:call(Pid, {add_listener, IpAddr, Port}).
+
+remove_listener(Pid, IpAddr, Port) ->
+ gen_server:call(Pid, {remove_listener, IpAddr, Port}).
+
+init([], State) ->
+ {ok, State}.
+
+handle_call({add_listener, IpAddr, Port}, _From, State) ->
+ case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of
+ {ok, State1} ->
+ {reply, ok, State1};
+ Error ->
+ {reply, Error, State}
+ end;
+handle_call({remove_listener, IpAddr, Port}, _From, State) ->
+ case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of
+ {ok, State1} ->
+ {reply, ok, State1};
+ Error ->
+ {reply, Error, State}
+ end;
handle_call(_Msg, _From, State) ->
- {reply, ignored, State}.
+ {reply, ignored, State}.
handle_cast(_Msg, State) ->
- {noreply, State}.
+ {noreply, State}.
handle_info({tcp, Sock, Data}, State) ->
- Me = self(),
- P = spawn(fun() -> worker(Me, Sock, Data) end),
- gen_tcp:controlling_process(Sock, P),
- {noreply, State};
+ Me = self(),
+ P = spawn(fun() -> worker(Me, Sock, Data) end),
+ gen_tcp:controlling_process(Sock, P),
+ {noreply, State};
handle_info(_Msg, State) ->
- {noreply, State}.
+ {noreply, State}.
terminate(_Reason, _State) ->
- ok.
+ ok.
sock_opts() ->
- [binary, {active, once}, {packet, 0}].
+ [binary, {active, once}, {packet, 0}].
-new_connection(Sock, State) ->
- Me = self(),
- P = spawn(fun() -> worker(Me, Sock) end),
- gen_tcp:controlling_process(Sock, P),
- {ok, State}.
+new_connection(_IpAddr, _Port, Sock, State) ->
+ Me = self(),
+ P = spawn(fun() -> worker(Me, Sock) end),
+ gen_tcp:controlling_process(Sock, P),
+ {ok, State}.
worker(Owner, Sock) ->
- gen_tcp:send(Sock, "Hello"),
- inet:setopts(Sock, [{active, once}]),
- gen_tcp:controlling_process(Sock, Owner).
+ gen_tcp:send(Sock, "Hello\n"),
+ inet:setopts(Sock, [{active, once}]),
+ gen_tcp:controlling_process(Sock, Owner).
worker(Owner, Sock, Data) ->
- gen_tcp:send(Sock, Data),
- inet:setopts(Sock, [{active, once}]),
- gen_tcp:controlling_process(Sock, Owner).
+ gen_tcp:send(Sock, Data),
+ inet:setopts(Sock, [{active, once}]),
+ gen_tcp:controlling_process(Sock, Owner).
View
56 priv/example/src/http_server.erl
@@ -1,56 +0,0 @@
-%% Copyright (c) 2009 Hypothetical Labs, Inc.
-
-%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%% of this software and associated documentation files (the "Software"), to deal
-%% in the Software without restriction, including without limitation the rights
-%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the Software is
-%% furnished to do so, subject to the following conditions:
-%%
-%% The above copyright notice and this permission notice shall be included in
-%% all copies or substantial portions of the Software.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-%% THE SOFTWARE.
-
--module(http_server).
-
--export([start/0, start_link/2]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
--export([terminate/2, sock_opts/0, new_connection/2]).
-
--behavior(gen_nb_server).
-
-start() ->
- start_link("0.0.0.0", 9292).
-
-start_link(IpAddr, Port) ->
- gen_nb_server:start_link(?MODULE, IpAddr, Port, []).
-
-init([]) ->
- {ok, []}.
-
-handle_call(_Msg, _From, State) ->
- {reply, ignored, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(_Msg, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-sock_opts() ->
- [binary, {active, once}, {packet, 0}].
-
-new_connection(Sock, State) ->
- http_worker:start(Sock),
- {ok, State}.
View
70 priv/example/src/http_worker.erl
@@ -1,70 +0,0 @@
--module(http_worker).
-
--behaviour(gen_server).
-
--define(RESPONSE, "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Length: 1\r\n\r\nA").
-%% API
--export([start/1]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--define(SERVER, ?MODULE).
-
--record(state, {sock}).
-
-configure(Pid) ->
- gen_server:call(Pid, configure).
-
-start(Sock) ->
- {ok, Pid} = gen_server:start(?MODULE, [Sock], []),
- gen_tcp:controlling_process(Sock, Pid),
- configure(Pid).
-
-init([Sock]) ->
- {ok, #state{sock=Sock}}.
-
-
-handle_call(configure, _From, #state{sock=Sock}=State) ->
- inet:setopts(Sock, [{active, once}, {packet, http}, binary]),
- {reply, ok, State};
-
-handle_call(_Request, _From, State) ->
- {reply, ignore, State}.
-
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info({http, Sock, http_eoh}, State) ->
- io:format("RESPONSE~n"),
- inet:setopts(Sock, [{packet, 0}]),
- gen_tcp:send(Sock, list_to_binary(?RESPONSE)),
- inet:setopts(Sock, [{packet, http}, {active, once}]),
- {noreply, State};
-
-handle_info({http, Sock, {http_header, _, _, _, _}}, State) ->
- io:format("HEADER~n"),
- inet:setopts(Sock, [{active, once}]),
- {noreply, State};
-
-handle_info({http, Sock, {http_request, _, _, _}}, State) ->
- io:format("REQUEST~n"),
- inet:setopts(Sock, [{active, once}]),
- {noreply, State};
-
-handle_info({http, Sock, Data}, #state{sock=Sock}=State) ->
- io:format("Data: ~p~n", [Data]),
- gen_tcp:close(Sock),
- {noreply, State};
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%% Internal function
View
243 src/gen_nb_server.erl
@@ -25,128 +25,170 @@
-behaviour(gen_server).
%% API
--export([start_link/4, start_link/5]).
+-export([start_link/2,
+ start_link/3,
+ get_state/1,
+ store_state/2,
+ add_listen_socket/2,
+ remove_listen_socket/2]).
%% Behavior callbacks
-export([behaviour_info/1]).
%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {cb,
- sock,
+ addrs=dict:new(),
+ socks=dict:new(),
server_state}).
%% @hidden
behaviour_info(callbacks) ->
- [{init, 1},
- {handle_call, 3},
- {handle_cast, 2},
- {handle_info, 2},
- {terminate, 2},
- {sock_opts, 0},
- {new_connection, 2}];
+ [{init, 2},
+ {handle_call, 3},
+ {handle_cast, 2},
+ {handle_info, 2},
+ {terminate, 2},
+ {sock_opts, 0},
+ {new_connection, 4}];
behaviour_info(_) ->
- undefined.
+ undefined.
%% @spec start_link(CallbackModule, IpAddr, Port, InitParams) -> Result
%% CallbackModule = atom()
-%% IpAddr = string()
-%% Port = integer()
%% InitParams = [any()]
%% Result = {ok, pid()} | {error, any()}
%% @doc Start server listening on IpAddr:Port
-start_link(CallbackModule, IpAddr, Port, InitParams) ->
- gen_server:start_link(?MODULE, [CallbackModule, IpAddr, Port, InitParams], []).
+start_link(CallbackModule, InitParams) ->
+ gen_server:start_link(?MODULE, [CallbackModule, InitParams], [{fullsweep_after, 0}]).
-%% @spec start_link(CallbackModule, IpAddr, Port, InitParams) -> Result
+%% @spec start_link(Name, CallbackModule, IpAddr, Port, InitParams) -> Result
%% Name = {local, atom()} | {global, atom()}
%% CallbackModule = atom()
-%% IpAddr = string()
-%% Port = integer()
%% InitParams = [any()]
%% Result = {ok, pid()} | {error, any()}
%% @doc Start server listening on IpAddr:Port registered as Name
-start_link(Name, CallbackModule, IpAddr, Port, InitParams) ->
- gen_server:start_link(Name, ?MODULE, [CallbackModule, IpAddr, Port, InitParams], []).
-
-%% @hidden
-init([CallbackModule, IpAddr, Port, InitParams]) ->
- case CallbackModule:init(InitParams) of
- {ok, ServerState} ->
- case listen_on(CallbackModule, IpAddr, Port) of
+start_link(Name, CallbackModule, InitParams) ->
+ gen_server:start_link(Name, ?MODULE, [CallbackModule, InitParams], [{fullsweep_after, 0}]).
+
+%% @spec get_state(#state{}) -> any()
+%% @doc Extracts the callback module's state from the server's overall state
+get_state(#state{server_state=SState}) ->
+ SState.
+
+%% @spec store_state(any(), #state{}) -> #state{}
+%% @doc Stores the callback module's state into the server's state
+store_state(CBState, State) when is_record(State, state) ->
+ State#state{server_state=CBState}.
+
+add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) ->
+ Key = {IpAddr, Port},
+ case dict:find(Key, Socks) of
+ {ok, _} ->
+ {error, already_listening};
+ error ->
+ case listen_on(Callback, IpAddr, Port) of
+ {ok, Sock} ->
+ {ok, State#state{socks=dict:store(Key, Sock, Socks),
+ addrs=dict:store(Sock, Key, Addrs)}};
+ Error ->
+ Error
+ end
+ end.
+
+remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
+ Key = {IpAddr, Port},
+ case dict:find(Key, Socks) of
+ error ->
+ {error, not_listening};
{ok, Sock} ->
- maybe_call_listening(CallbackModule, ServerState, Sock);
- Error ->
- CallbackModule:terminate(Error, ServerState),
- Error
- end;
- Err ->
- Err
- end.
+ gen_tcp:close(Sock),
+ {ok, State#state{socks=dict:erase(Key, Socks),
+ addrs=dict:erase(Sock, Addrs)}}
+ end.
+
+%% @spec init(#state{}) -> Result
+%% Result = any()
+%% @doc Returns the callback module's state
+init([CallbackModule, InitParams]) ->
+ process_flag(trap_exit, true),
+ State = #state{cb=CallbackModule},
+ case CallbackModule:init(InitParams, State) of
+ {ok, ServerState} ->
+ {ok, ServerState};
+ Err ->
+ Err
+ end.
%% @hidden
-handle_call(Request, From, #state{cb=Callback, server_state=ServerState}=State) ->
- case Callback:handle_call(Request, From, ServerState) of
- {reply, Reply, NewServerState} ->
- {reply, Reply, State#state{server_state=NewServerState}};
- {reply, Reply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
- {reply, Reply, State#state{server_state=NewServerState}, Arg};
- {noreply, NewServerState} ->
- {noreply, State#state{server_state=NewServerState}};
- {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
- {noreply, State#state{server_state=NewServerState}, Arg};
- {stop, Reason, NewServerState} ->
- {stop, Reason, State#state{server_state=NewServerState}};
- {stop, Reason, Reply, NewServerState} ->
- {stop, Reason, Reply, State#state{server_state=NewServerState}}
- end.
+handle_call(Request, From, #state{cb=Callback}=State) ->
+ case Callback:handle_call(Request, From, State) of
+ {reply, Reply, NewServerState} ->
+ {reply, Reply, NewServerState};
+ {reply, Reply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+ {reply, Reply, NewServerState, Arg};
+ {noreply, NewServerState} ->
+ {noreply, NewServerState};
+ {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+ {noreply, NewServerState, Arg};
+ {stop, Reason, NewServerState} ->
+ {stop, Reason, NewServerState};
+ {stop, Reason, Reply, NewServerState} ->
+ {stop, Reason, Reply, NewServerState}
+ end.
%% @hidden
-handle_cast(Msg, #state{cb=Callback, server_state=ServerState}=State) ->
- case Callback:handle_cast(Msg, ServerState) of
- {noreply, NewServerState} ->
- {noreply, State#state{server_state=NewServerState}};
- {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
- {noreply, State#state{server_state=NewServerState}, Arg};
- {stop, Reason, NewServerState} ->
- {stop, Reason, State#state{server_state=NewServerState}}
- end.
+handle_cast(Msg, #state{cb=Callback}=State) ->
+ case Callback:handle_cast(Msg, State) of
+ {noreply, NewServerState} ->
+ {noreply, NewServerState};
+ {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+ {noreply, NewServerState, Arg};
+ {stop, Reason, NewServerState} ->
+ {stop, Reason, NewServerState}
+ end.
%% @hidden
-handle_info({inet_async, ListSock, _Ref, {ok, CliSocket}}, #state{cb=Callback, server_state=ServerState}=State) ->
- inet_db:register_socket(CliSocket, inet_tcp),
- case Callback:new_connection(CliSocket, ServerState) of
- {ok, NewServerState} ->
- prim_inet:async_accept(ListSock, -1),
- {noreply, State#state{server_state=NewServerState}};
- {stop, Reason, NewServerState} ->
- {stop, Reason, State#state{server_state=NewServerState}}
- end;
-
-handle_info(Info, #state{cb=Callback, server_state=ServerState}=State) ->
- case Callback:handle_info(Info, ServerState) of
- {noreply, NewServerState} ->
- {noreply, State#state{server_state=NewServerState}};
- {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
- {noreply, State#state{server_state=NewServerState}, Arg};
- {stop, Reason, NewServerState} ->
- {stop, Reason, State#state{server_state=NewServerState}}
- end.
+handle_info({inet_async, ListSock, _Ref, {ok, CliSocket}}, #state{cb=Callback, addrs=Addrs}=State) ->
+ inet_db:register_socket(CliSocket, inet_tcp),
+ {IpAddr, Port} = dict:fetch(ListSock, Addrs),
+ case Callback:new_connection(IpAddr, Port, CliSocket, State) of
+ {ok, NewServerState} ->
+ prim_inet:async_accept(ListSock, -1),
+ {noreply, NewServerState};
+ {stop, Reason, NewServerState} ->
+ {stop, Reason, NewServerState}
+ end;
+
+handle_info(Info, #state{cb=Callback}=State) ->
+ case Callback:handle_info(Info, State) of
+ {noreply, NewServerState} ->
+ {noreply, NewServerState};
+ {noreply, NewServerState, Arg} when Arg =:= hibernate orelse is_number(Arg) ->
+ {noreply, NewServerState, Arg};
+ {stop, Reason, NewServerState} ->
+ {stop, Reason, NewServerState}
+ end.
%% @hidden
-terminate(Reason, #state{cb=Callback, sock=Sock, server_state=ServerState}) ->
- gen_tcp:close(Sock),
- Callback:terminate(Reason, ServerState),
- ok.
+terminate(Reason, #state{cb=Callback, addrs=Addrs}=State) ->
+ [gen_tcp:close(Sock) || Sock <- dict:fetch_keys(Addrs)],
+ State1 = State#state{addrs=dict:new(), socks=dict:new()},
+ Callback:terminate(Reason, State1),
+ ok.
%% @hidden
code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ {ok, State}.
%% Internal functions
@@ -157,14 +199,14 @@ code_change(_OldVsn, State, _Extra) ->
%% Port = integer()
%% Result = {ok, port()} | {error, any()}
listen_on(CallbackModule, IpAddr, Port) ->
- SockOpts = [{ip, convert(IpAddr)}|CallbackModule:sock_opts()],
- case gen_tcp:listen(Port, SockOpts) of
- {ok, LSock} ->
- {ok, _Ref} = prim_inet:async_accept(LSock, -1),
- {ok, LSock};
- Err ->
- Err
- end.
+ SockOpts = [{ip, convert(IpAddr)}|CallbackModule:sock_opts()],
+ case gen_tcp:listen(Port, SockOpts) of
+ {ok, LSock} ->
+ {ok, _Ref} = prim_inet:async_accept(LSock, -1),
+ {ok, LSock};
+ Err ->
+ Err
+ end.
%% @hidden
%% @spec convert(Addr) -> Result
@@ -172,20 +214,5 @@ listen_on(CallbackModule, IpAddr, Port) ->
%% Result = {integer(), integer(), integer(), integer()}
%% @doc Converts text IP addresses "0.0.0.0" to tuples {0, 0, 0, 0}
convert(Addr) ->
- T = string:tokens(Addr, "."),
- list_to_tuple([list_to_integer(X) || X <- T]).
-
-%% @hidden - call listening/2 if it exists and return gen_server response
-maybe_call_listening(CallbackModule, ServerState, Sock) ->
- case erlang:function_exported(CallbackModule, listening, 2) of
- false ->
- {ok, #state{cb=CallbackModule, sock=Sock, server_state=ServerState}};
- true ->
- case CallbackModule:listening(Sock, ServerState) of
- {ok, ServerState2} ->
- {ok, #state{cb=CallbackModule, sock=Sock, server_state=ServerState2}};
- Error ->
- CallbackModule:terminate(Error, ServerState),
- Error
- end
- end.
+ T = string:tokens(Addr, "."),
+ list_to_tuple([list_to_integer(X) || X <- T]).
Please sign in to comment.
Something went wrong with that request. Please try again.