Skip to content

Commit

Permalink
queue incoming connections if no handlers are available
Browse files Browse the repository at this point in the history
  • Loading branch information
mojombo committed Jul 9, 2009
1 parent edc7ded commit d37f6a5
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 22 deletions.
20 changes: 11 additions & 9 deletions elib/asset_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-behaviour(gen_server).

%% api
-export([start_link/1, start/1, lease_asset/0, return_asset/1]).
-export([start_link/1, start/1, lease/0, return/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -21,11 +21,11 @@ start_link(Args) ->
start(Args) ->
gen_server:start({global, ?MODULE}, ?MODULE, Args, []).

lease_asset() ->
gen_server:call({global, ?MODULE}, {lease_asset}).
lease() ->
gen_server:call({global, ?MODULE}, {lease}).

return_asset(Asset) ->
gen_server:call({global, ?MODULE}, {return_asset, Asset}).
return(Asset) ->
gen_server:call({global, ?MODULE}, {return, Asset}).

%%====================================================================
%% gen_server callbacks
Expand Down Expand Up @@ -53,10 +53,12 @@ init([Count, Handler]) ->
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({lease_asset}, _From, State) ->
{{value, Asset}, Assets2} = queue:out(State#state.assets),
{reply, Asset, State#state{assets = Assets2}};
handle_call({return_asset, Asset}, _From, State) ->
handle_call({lease}, _From, State) ->
case queue:out(State#state.assets) of
{{value, Asset}, Assets2} -> {reply, {ok, Asset}, State#state{assets = Assets2}};
{empty, _Assets2} -> {reply, empty, State}
end;
handle_call({return, Asset}, _From, State) ->
Assets2 = queue:in(Asset, State#state.assets),
{reply, ok, State#state{assets = Assets2}};
handle_call(_Request, _From, State) ->
Expand Down
67 changes: 54 additions & 13 deletions elib/ernie_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
-behaviour(gen_server).

%% api
-export([start_link/1, start/1]).
-export([start_link/1, start/1, process/1, asset_freed/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {lsock = undefined,
handler = undefined,
ducky = undefined}).
pending = queue:new()}).

%%====================================================================
%% API
Expand All @@ -22,6 +21,12 @@ start_link(Args) ->
start(Args) ->
gen_server:start({global, ?MODULE}, ?MODULE, Args, []).

process(Sock) ->
gen_server:cast({global, ?MODULE}, {process, Sock}).

asset_freed() ->
gen_server:cast({global, ?MODULE}, {asset_freed}).

%%====================================================================
%% gen_server callbacks
%%====================================================================
Expand Down Expand Up @@ -58,13 +63,34 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast({process, Sock}, State) ->
case queue:is_empty(State#state.pending) of
false ->
Pending2 = queue:in(Sock, State#state.pending),
io:format("Q", []),
{noreply, State#state{pending = Pending2}};
true ->
State2 = try_process_now(Sock, State),
{noreply, State2}
end;
handle_cast({asset_freed}, State) ->
case queue:is_empty(State#state.pending) of
false ->
case asset_pool:lease() of
{ok, Asset} ->
{{value, Sock}, Pending2} = queue:out(State#state.pending),
io:format("d", []),
spawn(fun() -> process_now(Sock, Asset) end),
{noreply, State#state{pending = Pending2}};
empty ->
io:format(".", []),
{noreply, State}
end;
true ->
{noreply, State}
end;
handle_cast(_Msg, State) -> {noreply, State}.

handle_info({'EXIT', _Pid, _Error}, State) ->
error_logger:error_msg("Port closed, restarting port...~n", []),
Handler = State#state.handler,
Ducky = port_wrapper:wrap_link("ruby " ++ Handler),
{noreply, State#state{ducky = Ducky}};
handle_info(Msg, State) ->
error_logger:error_msg("Unexpected message: ~p~n", [Msg]),
{noreply, State}.
Expand Down Expand Up @@ -93,20 +119,35 @@ try_listen(Port, Times) ->

loop(LSock) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn(fun() -> handle_method(Sock) end),
ernie_server:process(Sock),
loop(LSock).

handle_method(Sock) ->
try_process_now(Sock, State) ->
case asset_pool:lease() of
{ok, Asset} ->
io:format("i", []),
spawn(fun() -> process_now(Sock, Asset) end),
State;
empty ->
io:format("q", []),
Pending2 = queue:in(Sock, State#state.pending),
State#state{pending = Pending2}
end.

process_now(Sock, Asset) ->
case gen_tcp:recv(Sock, 0) of
{ok, BinaryTerm} ->
io:format(".", []),
Asset = asset_pool:lease_asset(),
% io:format(".", []),
% error_logger:info_msg("From Internet: ~p~n", [BinaryTerm]),
{ok, Data} = port_wrapper:rpc(Asset, BinaryTerm),
% error_logger:info_msg("From Port: ~p~n", [Data]),
asset_pool:return_asset(Asset),
asset_pool:return(Asset),
ernie_server:asset_freed(),
gen_tcp:send(Sock, Data),
ok = gen_tcp:close(Sock);
{error, closed} ->
asset_pool:return(Asset),
ernie_server:asset_freed(),
io:format("c", []),
ok = gen_tcp:close(Sock)
end.

0 comments on commit d37f6a5

Please sign in to comment.