Skip to content

Commit

Permalink
add soap server concurrency (Andreas Hellstrom)
Browse files Browse the repository at this point in the history
Add soap_workers param in global conf record, defaulting to 3 workers.  Add
handling of soap_workers config param in yaws_config.  Add
gconf.soap_workers arg in add_yaws_soap_srv/2.  Modify start_link and init
in yaws_soap_srv to receive optional NumOfWorkers arg and start that number
of workers at startup in init_complete().  Send SOAP request/add_wsdl calls
to worker process.  Move some internal functions to new
yaws_soap_srv_worker module. Add yaws_soap_sup to supervise SOAP workers.
Augment documentation to reflect soap_workers config param, and also add
the enable_soap param to the docs since it was missing.
  • Loading branch information
vinoski committed May 9, 2013
1 parent 8ed725e commit cdcd5c7
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 114 deletions.
15 changes: 12 additions & 3 deletions doc/yaws.tex
Expand Up @@ -2313,15 +2313,24 @@ \section{Global Part}
Fail completely or not if \Yaws\ fails to bind a listen socket
Default is true.

\item \verb+enable_soap = true | false+ ---
If true, a SOAP server will be started when \Yaws\ is
started. Default is false.

\item \verb+soap_workers = integer+ ---
Number of SOAP worker processes to start. A value larger than 1
means that the SOAP server will handle concurrent requests.
Default value is 3.

\item \verb+soap_srv_mods = ListOfModuleSetting+ ---
If \verb+enable_soap+ is true, a startup \Yaws\ will invoke
If \verb+enable_soap+ is true, at startup \Yaws\ will invoke
\verb+yaws_soap_srv:setup()+ to setup modules set
here. \verb+ModuleSetting+ is either a triad like\\
\verb+<Mod, HandlerFun, WsdlFile>+ or a tetrad like
\verb+<Mod, HandlerFun, WsdlFile, Prefix>+\\ which specifies the
prefix. A prefix will be used as argument of
\verb+yaws_soap_lib:initModel()+ and then be used as a XML
namespace prefix. Note, the WsdlFile here should be an
\verb+yaws_soap_lib:initModel()+ and will also be used as a XML
namespace prefix. Note, the \verb+WsdlFile+ here should be an
absolute-path file in local file systems.

For example, we can specify
Expand Down
1 change: 1 addition & 0 deletions include/yaws.hrl
Expand Up @@ -91,6 +91,7 @@
id = "default", % string identifying this instance of yaws

enable_soap = false, % start yaws_soap_srv iff true
soap_workers = 3, % the number of soap workers to start

%% a list of
%% {{Mod, Func}, WsdlFile, Prefix} | {{Mod, Func}, WsdlFile}
Expand Down
22 changes: 14 additions & 8 deletions man/yaws.conf.5
Expand Up @@ -177,18 +177,24 @@ Fail completely or not if Yaws fails to bind a listen socket Default is

.TP
\fBenable_soap = true | false\fR
If true, a soap server will be started at startup of Yaws. Default is
If true, a SOAP server will be started at startup of Yaws. Default is
\fIfalse\fR.

.TP
\fBsoap_workers = integer\fR
Number of SOAP worker processes to start. A value larger than 1 means that
the SOAP server will handle concurrent requests. Default value is \fI3\fR.

.TP
\fBsoap_srv_mods = ListOfModuleSetting\fR
If enable_soap is true, a startup Yaws will invoke \fIyaws_soap_srv:setup()\fR
to setup modules set here. ModuleSetting is either a triad like \fI<Mod,
HandlerFunc, WsdlFile>\fR or a quadruple form like \fI<Mod, HandlerFunc,
WsdlFile, Prefix>\fR which specifies the \fIprefix\fR. A \fIprefix\fR will be
used as argument of \fIyaws_soap_lib:initModel()\fR and then be used as a XML
namespace prefix. Note, the \fIWsdlFile\fR here should be an absolute-path file
in local file systems.
If \fIenable_soap\fR is true, at startup Yaws will invoke
\fIyaws_soap_srv:setup()\fR to setup modules set here. \fIModuleSetting\fR
is either a triad like \fI<Mod, HandlerFunc, WsdlFile>\fR or a quadruple
form like \fI<Mod, HandlerFunc, WsdlFile, Prefix>\fR which specifies the
\fIprefix\fR. A \fIprefix\fR will be used as argument of
\fIyaws_soap_lib:initModel()\fR and then be used as a XML namespace prefix.
Note, the \fIWsdlFile\fR here should be an absolute-path file in local file
systems.

For example, we can specify

Expand Down
4 changes: 4 additions & 0 deletions src/yaws.erl
Expand Up @@ -26,6 +26,7 @@
gconf_mnesia_dir/1, gconf_log_wrap_size/1, gconf_cache_refresh_secs/1,
gconf_include_dir/1, gconf_phpexe/1, gconf_yaws/1, gconf_id/1,
gconf_enable_soap/1, gconf_soap_srv_mods/1, gconf_ysession_mod/1,
gconf_soap_workers/1,
gconf_acceptor_pool_size/1, gconf_mime_types_info/1]).

-export([sconf_port/1, sconf_flags/1, sconf_redirect_map/1, sconf_rhost/1,
Expand Down Expand Up @@ -235,6 +236,7 @@ gconf_yaws (#gconf{yaws = X}) -> X.
gconf_id (#gconf{id = X}) -> X.
gconf_enable_soap (#gconf{enable_soap = X}) -> X.
gconf_soap_srv_mods (#gconf{soap_srv_mods = X}) -> X.
gconf_soap_workers (#gconf{soap_workers = X}) -> X.
gconf_ysession_mod (#gconf{ysession_mod = X}) -> X.
gconf_acceptor_pool_size (#gconf{acceptor_pool_size = X}) -> X.
gconf_mime_types_info (#gconf{mime_types_info = X}) -> X.
Expand Down Expand Up @@ -508,6 +510,8 @@ setup_gconf(GL, GC) ->
enable_soap = lkup(enable_soap, GL, GC#gconf.enable_soap),
soap_srv_mods = lkup(soap_srv_mods, GL,
GC#gconf.soap_srv_mods),
soap_workers = lkup(soap_workers, GL,
GC#gconf.soap_workers),
ysession_mod = lkup(ysession_mod, GL,
GC#gconf.ysession_mod),
acceptor_pool_size = lkup(acceptor_pool_size, GL,
Expand Down
12 changes: 11 additions & 1 deletion src/yaws_config.erl
Expand Up @@ -90,7 +90,8 @@ add_yaws_soap_srv(GC) when GC#gconf.enable_soap == true ->
add_yaws_soap_srv(_GC) ->
[].
add_yaws_soap_srv(GC, false) when GC#gconf.enable_soap == true ->
[{yaws_soap_srv, {yaws_soap_srv, start_link, [GC#gconf.soap_srv_mods]},
[{yaws_soap_srv, {yaws_soap_srv, start_link,
[GC#gconf.soap_srv_mods, GC#gconf.soap_workers]},
permanent, 5000, worker, [yaws_soap_srv]}];
add_yaws_soap_srv(GC, true) when GC#gconf.enable_soap == true ->
Spec = add_yaws_soap_srv(GC, false),
Expand Down Expand Up @@ -795,6 +796,15 @@ fload(FD, globals, GC, C, Cs, Lno, Chars) ->
{error, ?F("~s at line ~w", [Str, Lno])}
end;

["soap_workers", '=', Int] ->
case catch list_to_integer(Int) of
I when is_integer(I) ->
fload(FD, globals, GC#gconf{soap_workers = I},
C, Cs, Lno+1, Next);
_ ->
{error, ?F("Expect integer at line ~w", [Lno])}
end;

["max_connections", '=', Int] ->
case (catch list_to_integer(Int)) of
I when is_integer(I) ->
Expand Down
181 changes: 81 additions & 100 deletions src/yaws_soap_srv.erl
Expand Up @@ -8,8 +8,9 @@
-behaviour(gen_server).

%% API
-export([start_link/0, start_link/1,
-export([start_link/0, start_link/1, start_link/2,
setup/1, setup/2, setup/3,
worker/1,
handler/4
]).

Expand All @@ -25,14 +26,20 @@

%% State
-record(s, {
wsdl_list = [] % list of {Id, WsdlModel} tuples, where Id == {M,F}
num_of_workers = 0,
workers = [], % list of Pids
busy_workers = [], % list of {Pids, From} pairs.
queue = [], % list of waiting jobs
wsdl_list = [] % list of {Id, WsdlModel} tuples, where Id == {M,F}
}).

-define(OK_CODE, 200).
-define(BAD_MESSAGE_CODE, 400).
%% -define(METHOD_NOT_ALLOWED_CODE, 405).
-define(SERVER_ERROR_CODE, 500).

-define(DEFAULT_NUM_OF_WORKERS, 3).

%%====================================================================
%% API
%%====================================================================
Expand All @@ -42,7 +49,13 @@
%%--------------------------------------------------------------------
start_link() ->
start_link([]).
%%
start_link(N) when is_integer(N) ->
start_link([], N);
start_link(L) ->
start_link(L, ?DEFAULT_NUM_OF_WORKERS).
%%
start_link(L, N) ->
%% We are dependent on erlsom
case code:ensure_loaded(erlsom) of
{error, _} ->
Expand All @@ -51,7 +64,7 @@ start_link(L) ->
[?MODULE, Emsg]),
{error, Emsg};
{module, erlsom} ->
gen_server:start_link({local, ?SERVER}, ?MODULE, L, [])
gen_server:start_link({local, ?SERVER}, ?MODULE, {L, N}, [])
end.

%%% To be called from yaws_rpc.erl
Expand Down Expand Up @@ -89,7 +102,9 @@ setup(Id, WsdlFile, PrefixOrOptions) when is_tuple(Id),size(Id)==2 ->
Wsdl = yaws_soap_lib:initModel(WsdlFile, PrefixOrOptions),
gen_server:call(?SERVER, {add_wsdl, Id, Wsdl}, infinity).


%% Send message to worker
worker(X) ->
gen_server:cast(?MODULE, {worker, X, self()}).

%%====================================================================
%% gen_server callbacks
Expand All @@ -102,11 +117,12 @@ setup(Id, WsdlFile, PrefixOrOptions) when is_tuple(Id),size(Id)==2 ->
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init(L) -> %% [ {{Mod,Handler}, WsdlFile} ]
WsdlList = lists:foldl( fun( SoapSrvMod, OldList) ->
setup_on_init( SoapSrvMod, OldList )
end,[],L),
{ok, #s{wsdl_list = WsdlList}}.
init({L, N}) -> % { [ {{Mod,Handler}, WsdlFile} ] , NumOfWorkers }
WsdlList = lists:foldl(fun(SoapSrvMod, OldList) ->
setup_on_init( SoapSrvMod, OldList )
end,[],L),
gen_server:cast(?MODULE, complete_init),
{ok, #s{wsdl_list = WsdlList, num_of_workers = N}}.

setup_on_init( {Id, WsdlFile}, OldList ) when is_tuple(Id),size(Id) == 2 ->
Wsdl = yaws_soap_lib:initModel(WsdlFile),
Expand All @@ -126,19 +142,46 @@ setup_on_init( {Id, WsdlFile, Prefix}, OldList ) when is_tuple(Id),
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({add_wsdl, Id, WsdlModel}, _From, State) ->
yaws_soap_sup:setup({Id, WsdlModel}),
NewWsdlList = uinsert({Id, WsdlModel}, State#s.wsdl_list),
{reply, ok, State#s{wsdl_list = NewWsdlList}};
%%
handle_call( {request, Id, Payload, SessionValue, SoapAction}, _From, State) ->
Reply = request(State, Id, Payload, SessionValue, SoapAction),
{reply, Reply, State}.
handle_call(Req = {request, _Id, _Payload, _SessionValue, _SoapAction}, From, State) ->
{noreply, call_worker({int_request, Req, From}, State)};
handle_call(_, _, State) ->
{noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(complete_init, State) ->
{ok, _} = supervisor:start_child(yaws_sup, yaws_soap_sup:child_spec()),
yaws_soap_sup:start_children(State#s.num_of_workers),
{noreply, State};
%%
handle_cast({worker, started, Pid}, State = #s{busy_workers = Busy}) ->
erlang:monitor(process, Pid),
case State#s.wsdl_list of
[] -> ok;
Wsdls -> yaws_soap_srv_worker:setup(Pid, init, Wsdls)
end,
{noreply, State#s{busy_workers = [{Pid, dummy} | Busy]}};
%%
handle_cast({worker, done, Pid}, State) ->
#s{workers = Workers,
busy_workers = Busy,
queue = Queue} = State,
State1 = State#s{workers = [Pid | Workers],
busy_workers = lists:keydelete(Pid, 1, Busy)},
State2 = case Queue of
[] -> State1;
[Q | Qs] -> call_worker(Q, State1#s{queue = Qs})
end,
{noreply, State2};
%%
handle_cast(_Msg, State) ->
{noreply, State}.

Expand All @@ -148,6 +191,18 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({'DOWN', _, process, Pid, Info}, State) ->
#s{workers = Workers,
busy_workers = Busy} = State,
case lists:keysearch(Pid, 1, Busy) of
{value, {Pid, From}} ->
gen_server:reply(
From, srv_error(f("Process termination: ~p", [Info])));
_ -> ok
end,
{noreply, State#s{workers = lists:delete(Pid, Workers),
busy_workers = lists:keydelete(Pid, 1, Busy)}};
%%
handle_info(_Info, State) ->
{noreply, State}.

Expand All @@ -172,102 +227,28 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------

request(State, {M,F} = Id, {Req, Attachments}, SessionValue, Action) ->
{ok, Model} = get_model(State, Id),
%%error_logger:info_report([?MODULE, {payload, Req}]),
case catch yaws_soap_lib:parseMessage(Req, Model) of
{ok, Header, Body} ->
%% call function
result(Model, catch apply(M, F, [Header, Body,
Action, SessionValue,
Attachments]));
{error, Error} ->
cli_error(Error);
OtherError ->
srv_error(io_lib:format("Error parsing message: ~p", [OtherError]))
end;
request(State, {M,F} = Id, Req, SessionValue, Action) ->
%%error_logger:info_report([?MODULE, {payload, Req}]),
{ok, Model} = get_model(State, Id),
Umsg = (catch erlsom_lib:toUnicode(Req)),
case catch yaws_soap_lib:parseMessage(Umsg, Model) of
{ok, Header, Body} ->
%% call function
result(Model, catch apply(M, F, [Header, Body,
Action, SessionValue]));
{error, Error} ->
cli_error(Error);
OtherError ->
srv_error(io_lib:format("Error parsing message: ~p", [OtherError]))
end.

%%% Analyse the result and produce some output
result(Model, {ok, ResHeader, ResBody, ResCode, SessVal}) ->
return(Model, ResHeader, ResBody, ResCode, SessVal, undefined);
result(Model, {ok, ResHeader, ResBody}) ->
return(Model, ResHeader, ResBody, ?OK_CODE, undefined, undefined);
result(Model, {ok, ResHeader, ResBody, Files}) ->
return(Model, ResHeader, ResBody, ?OK_CODE, undefined, Files);
result(_Model, {error, client, ClientMssg}) ->
cli_error(ClientMssg);
result(_Model, false) -> % soap notify !
false;
result(_Model, Error) ->
srv_error(io_lib:format("Error processing message: ~p", [Error])).

return(#wsdl{model = Model}, ResHeader, ResBody, ResCode, SessVal, Files) ->
return(Model, ResHeader, ResBody, ResCode, SessVal, Files);
return(Model, ResHeader, ResBody, ResCode, SessVal, Files)
when not is_list(ResBody) ->
return(Model, ResHeader, [ResBody], ResCode, SessVal, Files);
return(Model, ResHeader, ResBody, ResCode, SessVal, Files) ->
%% add envelope
Header2 = case ResHeader of
undefined -> undefined;
_ -> #'soap:Header'{choice = ResHeader}
end,
Envelope = #'soap:Envelope'{'Body' = #'soap:Body'{choice = ResBody},
'Header' = Header2},
case catch erlsom:write(Envelope, Model) of
{ok, XmlDoc} ->
case Files of
undefined ->
{ok, XmlDoc, ResCode, SessVal};
_ ->
DIME = yaws_dime:encode(XmlDoc, Files),
{ok, DIME, ResCode, SessVal}
end;
{error, WriteError} ->
srv_error(f("Error writing XML: ~p", [WriteError]));
OtherWriteError ->
error_logger:error_msg("~p(~p): OtherWriteError=~p~n",
[?MODULE, ?LINE, OtherWriteError]),
srv_error(f("Error writing XML: ~p", [OtherWriteError]))
end.

f(S,A) -> lists:flatten(io_lib:format(S,A)).

cli_error(Error) ->
error_logger:error_msg("~p(~p): Cli Error: ~p~n",
[?MODULE, ?LINE, Error]),
Fault = yaws_soap_lib:makeFault("Client", "Client error"),
{error, Fault, ?BAD_MESSAGE_CODE}.

srv_error(Error) ->
error_logger:error_msg("~p(~p): Srv Error: ~p~n",
[?MODULE, ?LINE, Error]),
Fault = yaws_soap_lib:makeFault("Server", "Server error"),
{error, Fault, ?SERVER_ERROR_CODE}.




get_model(State, Id) ->
case lists:keysearch(Id, 1, State#s.wsdl_list) of
{value, {_, Model}} -> {ok, Model};
_ -> {error, "model not found"}
end.

uinsert({K,_} = E, [{K,_}|T]) -> [E|T];
uinsert(E, [H|T]) -> [H|uinsert(E,T)];
uinsert(E, []) -> [E].

call_worker(Req = {int_request, _, From},
State = #s{workers = [Worker | Workers],
busy_workers = Busy})->
case catch yaws_soap_srv_worker:call(Worker, Req) of
ok ->
State#s{workers = Workers,
busy_workers = [{Worker, From} | Busy]};
{'EXIT',{noproc,_}} ->
call_worker(Req, State#s{workers = Workers})
end;
%%
call_worker(Req, State = #s{queue = Queue}) ->
State#s{queue = [Req | Queue]}.

0 comments on commit cdcd5c7

Please sign in to comment.