Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

351 lines (315 sloc) 12.784 kb
-module(ernie_server).
-behaviour(gen_server).
-include_lib("ernie.hrl").
%% api
-export([start_link/1, start/1, process/1, enqueue_request/1, kick/0, fin/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%%====================================================================
%% API
%%====================================================================
start_link(Args) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []).
start(Args) ->
gen_server:start({local, ?MODULE}, ?MODULE, Args, []).
process(Sock) ->
gen_server:cast(?MODULE, {process, Sock}).
enqueue_request(Request) ->
gen_server:call(?MODULE, {enqueue_request, Request}).
kick() ->
gen_server:cast(?MODULE, kick).
fin() ->
gen_server:cast(?MODULE, fin).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Port, Configs]) ->
process_flag(trap_exit, true),
error_logger:info_msg("~p starting~n", [?MODULE]),
{ok, LSock} = try_listen(Port, 500),
spawn(fun() -> loop(LSock) end),
Map = init_map(Configs),
io:format("pidmap = ~p~n", [Map]),
{ok, #state{lsock = LSock, map = Map}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({enqueue_request, Request}, _From, State) ->
case Request#request.priority of
high ->
Hq2 = queue:in(Request, State#state.hq),
Lq2 = State#state.lq;
low ->
Hq2 = State#state.hq,
Lq2 = queue:in(Request, State#state.lq)
end,
{reply, ok, State#state{hq = Hq2, lq = Lq2}};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast({process, Sock}, State) ->
Log = #log{hq = queue:len(State#state.hq),
lq = queue:len(State#state.lq),
taccept = erlang:now()},
Request = #request{sock = Sock, log = Log},
spawn(fun() -> receive_term(Request, State) end),
logger:debug("Spawned receiver~n", []),
{noreply, State};
handle_cast(kick, State) ->
case queue:out(State#state.hq) of
{{value, Request}, Hq2} ->
State2 = process_request(Request, hq, Hq2, State),
{noreply, State2};
{empty, _Hq} ->
case queue:out(State#state.lq) of
{{value, Request}, Lq2} ->
State2 = process_request(Request, lq, Lq2, State),
{noreply, State2};
{empty, _Lq} ->
{noreply, State}
end
end;
handle_cast(fin, State) ->
Listen = State#state.listen,
Count = State#state.count,
ZCount = State#state.zcount + 1,
logger:debug("Fin; Listen = ~p (~p/~p)~n", [Listen, Count, ZCount]),
case Listen =:= false andalso ZCount =:= Count of
true -> halt();
false -> {noreply, State#state{zcount = ZCount}}
end;
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(Msg, State) ->
error_logger:error_msg("Unexpected message: ~p~n", [Msg]),
{noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVersion, State, _Extra) -> {ok, State}.
%%====================================================================
%% Internal
%%====================================================================
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Module mapping
init_map(Configs) ->
lists:map((fun extract_mapping/1), Configs).
extract_mapping(Config) ->
Id = proplists:get_value(id, Config),
Mod = proplists:get_value(module, Config),
{Mod, Id}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Listen and loop
try_listen(Port, 0) ->
error_logger:error_msg("Could not listen on port ~p~n", [Port]),
{error, "Could not listen on port"};
try_listen(Port, Times) ->
Res = gen_tcp:listen(Port, [binary, {packet, 4}, {active, false}, {reuseaddr, true}, {backlog, 128}]),
case Res of
{ok, LSock} ->
error_logger:info_msg("Listening on port ~p~n", [Port]),
% gen_tcp:controlling_process(LSock, ernie_server),
{ok, LSock};
{error, Reason} ->
error_logger:info_msg("Could not listen on port ~p: ~p~n", [Port, Reason]),
timer:sleep(5000),
try_listen(Port, Times - 1)
end.
loop(LSock) ->
case gen_tcp:accept(LSock) of
{error, closed} ->
logger:debug("Listen socket closed~n", []),
timer:sleep(infinity);
{error, Error} ->
logger:debug("Connection accept error: ~p~n", [Error]),
loop(LSock);
{ok, Sock} ->
logger:debug("Accepted socket: ~p~n", [Sock]),
ernie_server:process(Sock),
loop(LSock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Receive and process
receive_term(Request, State) ->
Sock = Request#request.sock,
case gen_tcp:recv(Sock, 0) of
{ok, BinaryTerm} ->
logger:debug("Got binary term: ~p~n", [BinaryTerm]),
Term = binary_to_term(BinaryTerm),
logger:info("Got term: ~p~n", [Term]),
case Term of
{call, '__admin__', Fun, Args} ->
ernie_admin:process(Sock, Fun, Args, State);
{info, Command, Args} ->
Infos = Request#request.infos,
Infos2 = [BinaryTerm | Infos],
Request2 = Request#request{infos = Infos2},
Request3 = process_info(Request2, Command, Args),
receive_term(Request3, State);
_Any ->
Request2 = Request#request{action = BinaryTerm},
close_if_cast(Term, Request2),
ernie_server:enqueue_request(Request2),
ernie_server:kick()
end;
{error, closed} ->
ok = gen_tcp:close(Sock)
end.
process_info(Request, priority, [Priority]) ->
Request#request{priority = Priority};
process_info(Request, _Command, _Args) ->
Request.
process_request(Request, Priority, Q2, State) ->
ActionTerm = bert:decode(Request#request.action),
{_Type, Mod, _Fun, _Args} = ActionTerm,
Specs = lists:filter(fun({X, _Id}) -> Mod =:= X end, State#state.map),
case Specs of
[] -> no_module(Mod, Request, Priority, Q2, State);
_Else -> process_module(ActionTerm, Specs, Request, Priority, Q2, State)
end.
no_module(Mod, Request, Priority, Q2, State) ->
logger:debug("No such module ~p~n", [Mod]),
Sock = Request#request.sock,
Class = <<"ServerError">>,
Message = list_to_binary(io_lib:format("No such module '~p'", [Mod])),
gen_tcp:send(Sock, term_to_binary({error, [server, 0, Class, Message, []]})),
ok = gen_tcp:close(Sock),
finish(Priority, Q2, State).
process_module(ActionTerm, [], Request, Priority, Q2, State) ->
{_Type, Mod, Fun, _Args} = ActionTerm,
logger:debug("No such function ~p:~p~n", [Mod, Fun]),
Sock = Request#request.sock,
Class = <<"ServerError">>,
Message = list_to_binary(io_lib:format("No such function '~p:~p'", [Mod, Fun])),
gen_tcp:send(Sock, term_to_binary({error, [server, 0, Class, Message, []]})),
ok = gen_tcp:close(Sock),
finish(Priority, Q2, State);
process_module(ActionTerm, Specs, Request, Priority, Q2, State) ->
[{_Mod, Id} | OtherSpecs] = Specs,
case Id of
native ->
logger:debug("Dispatching to native module~n", []),
{_Type, Mod, Fun, Args} = ActionTerm,
case erlang:function_exported(Mod, Fun, length(Args)) of
false ->
logger:debug("Not found in native module ~p~n", [Mod]),
process_module(ActionTerm, OtherSpecs, Request, Priority, Q2, State);
true ->
PredFun = list_to_atom(atom_to_list(Fun) ++ "_pred"),
logger:debug("Checking ~p:~p(~p) for selection.~n", [Mod, PredFun, Args]),
case erlang:function_exported(Mod, PredFun, length(Args)) of
false ->
logger:debug("No such predicate function ~p:~p(~p).~n", [Mod, PredFun, Args]),
process_native_request(ActionTerm, Request, Priority, Q2, State);
true ->
case apply(Mod, PredFun, Args) of
false ->
logger:debug("Predicate ~p:~p(~p) returned false.~n", [Mod, PredFun, Args]),
process_module(ActionTerm, OtherSpecs, Request, Priority, Q2, State);
true ->
logger:debug("Predicate ~p:~p(~p) returned true.~n", [Mod, PredFun, Args]),
process_native_request(ActionTerm, Request, Priority, Q2, State)
end
end
end;
ValidPid when is_pid(ValidPid) ->
logger:debug("Found external pid ~p~n", [ValidPid]),
process_external_request(ValidPid, Request, Priority, Q2, State)
end.
close_if_cast(ActionTerm, Request) ->
case ActionTerm of
{cast, _Mod, _Fun, _Args} ->
Sock = Request#request.sock,
gen_tcp:send(Sock, term_to_binary({noreply})),
ok = gen_tcp:close(Sock),
logger:debug("Closed cast.~n", []);
_Any ->
ok
end.
finish(Priority, Q2, State) ->
case Priority of
hq -> State#state{hq = Q2};
lq -> State#state{lq = Q2}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% Native
process_native_request(ActionTerm, Request, Priority, Q2, State) ->
Count = State#state.count,
State2 = State#state{count = Count + 1},
logger:debug("Count = ~p~n", [Count + 1]),
Log = Request#request.log,
Log2 = Log#log{type = native, tprocess = erlang:now()},
Request2 = Request#request{log = Log2},
spawn(fun() -> ernie_native:process(ActionTerm, Request2) end),
finish(Priority, Q2, State2).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% External
process_external_request(Pid, Request, Priority, Q2, State) ->
Count = State#state.count,
State2 = State#state{count = Count + 1},
logger:debug("Count = ~p~n", [Count + 1]),
case asset_pool:lease(Pid) of
{ok, Asset} ->
logger:debug("Leased asset for pool ~p~n", [Pid]),
Log = Request#request.log,
Log2 = Log#log{type = external, tprocess = erlang:now()},
Request2 = Request#request{log = Log2},
spawn(fun() -> process_now(Pid, Request2, Asset) end),
finish(Priority, Q2, State2);
empty ->
State
end.
process_now(Pid, Request, Asset) ->
try unsafe_process_now(Request, Asset) of
_AnyResponse ->
Log = Request#request.log,
Log2 = Log#log{tdone = erlang:now()},
Request2 = Request#request{log = Log2},
ernie_access_logger:acc(Request2)
catch
AnyClass:AnyError ->
Log = Request#request.log,
Log2 = Log#log{tdone = erlang:now()},
Request2 = Request#request{log = Log2},
ernie_access_logger:err(Request2, "External process error ~w: ~w", [AnyClass, AnyError])
after
asset_pool:return(Pid, Asset),
ernie_server:fin(),
ernie_server:kick(),
logger:debug("Returned asset ~p~n", [Asset]),
gen_tcp:close(Request#request.sock),
logger:debug("Closed socket ~p~n", [Request#request.sock])
end.
unsafe_process_now(Request, Asset) ->
BinaryTerm = Request#request.action,
Term = binary_to_term(BinaryTerm),
case Term of
{call, Mod, Fun, Args} ->
logger:debug("Calling ~p:~p(~p)~n", [Mod, Fun, Args]),
Sock = Request#request.sock,
{asset, Port, Token} = Asset,
logger:debug("Asset: ~p ~p~n", [Port, Token]),
{ok, Data} = port_wrapper:rpc(Port, BinaryTerm),
ok = gen_tcp:send(Sock, Data);
{cast, Mod, Fun, Args} ->
logger:debug("Casting ~p:~p(~p)~n", [Mod, Fun, Args]),
{asset, Port, Token} = Asset,
logger:debug("Asset: ~p ~p~n", [Port, Token]),
{ok, _Data} = port_wrapper:rpc(Port, BinaryTerm)
end.
Jump to Line
Something went wrong with that request. Please try again.