Permalink
Browse files

add job plugin (can be used to stress resources manager; currently

works with OAR)
  • Loading branch information...
1 parent a806615 commit a8fc0ab3f9a4ad827188fb1fccd9e8e11cc6990f @nniclausse nniclausse committed May 5, 2011
View
3 include/ts_config.hrl
@@ -68,7 +68,8 @@
seed = now, % random seed: (default= current time)
vhost_file = none, % file server user for virtual host jabber testing
user_server_maxuid = none, % user_id max
- rate_limit
+ rate_limit,
+ job_notify_port
}).
View
59 include/ts_job.hrl
@@ -0,0 +1,59 @@
+%%%
+%%% Copyright 2011 © INRIA
+%%%
+%%% Author : Nicolas Niclausse <nniclaus@sophia.inria.fr>
+%%% Created: 4 mai 2011 by Nicolas Niclausse <nniclaus@sophia.inria.fr>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+%%% In addition, as a special exception, you have the permission to
+%%% link the code of this program with any library released under
+%%% the EPL license and distribute linked combinations including
+%%% the two.
+
+
+-author('nicolas.niclausse@inria.fr').
+
+%% use by the client to create the request
+
+-record(job_dyndata,
+ {
+ fixme
+ }
+ ).
+
+-record(job, {
+ script,
+ resources,
+ walltime,
+ queue,
+ duration,
+ jobid,
+ req, % submit|stat|delete
+ type, % oar|torque
+ notify_port,
+ notify_script,
+ name,
+ user,
+ options,
+ args
+ }).
+
+-record(job_session, {
+ jobid,
+ submission_time
+ }).
+
+
View
215 src/tsung/ts_job.erl
@@ -0,0 +1,215 @@
+%%%
+%%% Copyright 2011 © INRIA
+%%%
+%%% Author : Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%% Created: 4 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+
+%%% In addition, as a special exception, you have the permission to
+%%% link the code of this program with any library released under
+%%% the EPL license and distribute linked combinations including
+%%% the two.
+
+-module(ts_job).
+-author('nicolas.niclausse@inria.fr').
+
+-behaviour(ts_plugin).
+
+-include("ts_profile.hrl").
+-include("ts_job.hrl").
+-include_lib("kernel/include/file.hrl").
+
+-export([init_dynparams/0,
+ add_dynparams/4,
+ get_message/1,
+ session_defaults/0,
+ dump/2,
+ parse/2,
+ parse_bidi/2,
+ parse_config/2,
+ decode_buffer/2,
+ new_session/0]).
+
+
+%%====================================================================
+%% Data Types
+%%====================================================================
+
+%% @type dyndata() = #dyndata{proto=ProtoData::term(),dynvars=list()}.
+%% Dynamic data structure
+%% @end
+
+%% @type server() = {Host::tuple(),Port::integer(),Protocol::atom()}.
+%% Host/Port/Protocol tuple
+%% @end
+
+%% @type param() = {dyndata(), server()}.
+%% Dynamic data structure
+%% @end
+
+%% @type hostdata() = {Host::tuple(),Port::integer()}.
+%% Host/Port pair
+%% @end
+
+%% @type client_data() = binary() | closed.
+%% Data passed to a protocol implementation is either a binary or the
+%% atom closed indicating that the server closed the tcp connection.
+%% @end
+
+%%====================================================================
+%% API
+%%====================================================================
+
+parse_config(El,Config) ->
+ ts_config_job:parse_config(El, Config).
+
+%% @spec session_defaults() -> {ok, Persistent} | {ok, Persistent, Bidi}
+%% Persistent = bool()
+%% Bidi = bool()
+%% @doc Default parameters for sessions of this protocol. Persistent
+%% is true if connections are preserved after the underlying tcp
+%% connection closes. Bidi should be true for bidirectional protocols
+%% where the protocol module needs to reply to data sent from the
+%% server. @end
+session_defaults() ->
+ {ok, true}. % not relevant for erlang type (?).
+
+%% @spec new_session() -> State::term()
+%% @doc Initialises the state for a new protocol session.
+%% @end
+new_session() ->
+ #job_session{}.
+
+%% @spec decode_buffer(Buffer::binary(),Session::record(job)) -> NewBuffer::binary()
+%% @doc We need to decode buffer (remove chunks, decompress ...) for
+%% matching or dyn_variables
+%% @end
+decode_buffer(Buffer,#job_session{}) ->
+ Buffer.
+
+%% @spec init_dynparams() -> dyndata()
+%% @doc Creates a new record/term for storing dynamic request data.
+%% @end
+init_dynparams() ->
+ #dyndata{proto=#job_dyndata{}}.
+
+%% @spec add_dynparams(Subst, dyndata(), param(), hostdata()) -> {dyndata(), server()} | dyndata()
+%% Subst = term()
+%% @doc Updates the dynamic request data structure created by
+%% {@link ts_protocol:init_dynparams/0. init_dynparams/0}.
+%% @end
+add_dynparams(false, DynData, Param, HostData) ->
+ add_dynparams(DynData#dyndata.proto, Param, HostData);
+add_dynparams(true, DynData, Param, HostData) ->
+ NewParam = subst(Param, DynData#dyndata.dynvars),
+ add_dynparams(DynData#dyndata.proto,NewParam, HostData).
+
+add_dynparams(#job_dyndata{}, Param, _HostData) ->
+ Param.
+
+%%----------------------------------------------------------------------
+%% @spec subst(Req, term())
+%% Purpose: Replace on the fly dynamic element of the request.
+%% Returns: record()
+%%----------------------------------------------------------------------
+subst(Job=#job{duration=D,req=Req,jobid=Id}, DynVars) ->
+ Job#job{duration=ts_search:subst(D,DynVars),
+ req=ts_search:subst(Req,DynVars),
+ jobid=ts_search:subst(Id,DynVars)}.
+
+
+dump(A,B) ->
+ ts_plugin:dump(A,B).
+
+%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close}
+%% State = #state_rcv{}
+%% Opts = proplist()
+%% Close = bool()
+%% @doc
+%% Opts is a list of inet:setopts socket options. Don't change the
+%% active/passive mode here as tsung will set {active,once} before
+%% your options.
+%% Setting Close to true will cause tsung to close the connection to
+%% the server.
+%% @end
+parse({os, cmd, _Args, Res},State) when is_list(Res)->
+ ?LOGF("os:cmd result: ~p",[Res],?DEB),
+ %% oarsub output:
+ %% [ADMISSION RULE] Modify resource description with type constraints
+ %% Generate a job key...
+ %% OAR_JOB_ID=468822
+ Lines = string:tokens(Res,"\n"),
+ case lists:last(Lines) of
+ "OAR_JOB_ID="++ID ->
+ ?LOGF("OK,job id is ~p",[ID],?INFO),
+ Job=#job_session{jobid=ID},
+ ts_job_notify:monitor({ID,self(),now()}),%FIXME: should use time when oarsub is started instead of now()
+ {State#state_rcv{ack_done=true,session=Job,datasize=length(Res)}, [], false};
+ _ ->
+ {State#state_rcv{ack_done=true,datasize=length(Res)}, [], false}
+ end;
+parse(nojobs,State) ->
+ ?LOGF(" no jobs in queue for ~p, stop waiting",[self()],?DEB),
+ {State#state_rcv{ack_done=true}, [], false};
+parse({Mod, Fun, Args, Res},State) ->
+ ?LOGF(" result: ~p",[{Mod, Fun, Args, Res}],?DEB),
+ {State#state_rcv{ack_done=false}, [], false}.
+
+%% @spec parse_bidi(Data, State) -> {nodata, NewState} | {Data, NewState}
+%% Data = client_data()
+%% NewState = term()
+%% State = term()
+%% @doc Parse a block of data from the server. No reply will be sent
+%% if the return value is nodata, otherwise the Data binary will be
+%% sent back to the server immediately.
+%% @end
+parse_bidi(Data, State) ->
+ ts_plugin:parse_bidi(Data,State).
+
+%% @spec get_message(param()) -> Message::binary()|tuple()
+%% @doc Creates a new message to send to the connected server.
+%% @end
+get_message(#job{type=oar,req=wait_jobs}) ->
+ ts_job_notify:wait_jobs(self()),
+ {erlang, now,[], 0}; % not used
+get_message(Job=#job{duration=D}) when is_integer(D)->
+ get_message(Job#job{duration=integer_to_list(D)});
+get_message(Job=#job{notify_port=P}) when is_integer(P)->
+ get_message(Job#job{notify_port=integer_to_list(P)});
+get_message(#job{type=oar,user=U,req=submit, name=N,script=S, resources=R, queue=Q, walltime=W,notify_port=P, notify_script=NS,duration=D,options=Opts}) ->
+ Submit = case U of
+ undefined -> "oarsub ";
+ User -> "sudo -u "++User++" oarsub "
+ end,
+ Queue = case Q of
+ "" -> "";
+ _ -> "-q "++ Q
+ end,
+ Cmd=Submit++Queue++" -l "++R++ ",walltime="++W
+ ++" -n " ++N ++" "
+ ++ Opts ++ " "
+ ++" --notify \"exec:" ++NS++" "++P++"\" "
+ ++"\""++S++" "++D++"\"",
+ ?LOGF("Will run ~p",[Cmd],?INFO),
+ {os, cmd, [Cmd], length(Cmd) }.
+
+
+
+
+
+
+
View
23 src/tsung/ts_utils.erl
@@ -43,7 +43,7 @@
decode_base64/1, encode_base64/1, to_lower/1, release_is_newer_or_eq/1,
randomstr/1,urandomstr/1,urandomstr_noflat/1, eval/1, list_to_number/1,
time2sec/1, time2sec_hires/1, read_file_raw/1, init_seed/1, jsonpath/2, pmap/2,
- concat_atoms/1, ceiling/1
+ concat_atoms/1, ceiling/1, accept_loop/3
]).
level2int("debug") -> ?DEB;
@@ -790,3 +790,24 @@ ceiling(X) ->
_ -> T
end.
+%%--------------------------------------------------------------------
+%% Func: accept_loop/3
+%% Purpose: infinite listen/accept loop, delegating handling of accepts
+%% to the gen_server proper.
+%% Returns: only returns by throwing an exception
+%%--------------------------------------------------------------------
+accept_loop(PPid, Tag, ServerSock)->
+ case
+ case gen_tcp:accept(ServerSock) of
+ {ok, ClientSock} ->
+ ok = gen_tcp:controlling_process(ClientSock, PPid),
+ gen_server:call(PPid, {accepted, Tag, ClientSock});
+ Error ->
+ gen_server:call(PPid, {accept_error, Tag, Error})
+ end
+ of
+ continue ->
+ accept_loop(PPid, Tag, ServerSock);
+ _->
+ normal
+ end.
View
4 src/tsung_controller/ts_config.erl
@@ -664,6 +664,10 @@ parse(Element = #xmlElement{name=option, attributes=Attrs},
lists:foldl( fun parse/2, Conf#config{ports_range={Min,Max}},
Element#xmlElement.content)
end;
+ "job_notify_port" ->
+ Port = getAttr(integer,Attrs, value, ?config(job_notify_port)),
+ lists:foldl( fun parse/2, Conf#config{job_notify_port=Port},
+ Element#xmlElement.content);
"tcp_rcv_buffer" ->
Size = getAttr(integer,Attrs, value, ?config(rcv_size)),
OldProto = Conf#config.proto_opts,
View
76 src/tsung_controller/ts_config_job.erl
@@ -0,0 +1,76 @@
+%%%
+%%% Copyright 2011 © INRIA
+%%%
+%%% Author : Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%% Created: 4 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+
+-module(ts_config_job).
+-vc('$Id$ ').
+-author('nicolas.niclausse@inria.fr').
+
+-export([parse_config/2]).
+
+-include("ts_profile.hrl").
+-include("ts_http.hrl").
+-include("ts_config.hrl").
+
+-include("xmerl.hrl").
+
+-include("ts_job.hrl").
+
+%% @spec parse_config(#xmlElement{}, Config::term()) -> NewConfig::term()
+%% @doc Parses a tsung.xml configuration file xml element for this
+%% protocol and updates the Config term.
+%% @end
+parse_config(Element = #xmlElement{name=dyn_variable}, Conf = #config{}) ->
+ ts_config:parse(Element,Conf);
+parse_config(Element = #xmlElement{name=job},
+ Config=#config{curid = Id, session_tab = Tab,
+ sessions = [CurS | _], dynvar=DynVar,
+ subst = SubstFlag, match=MatchRegExp}) ->
+ Request = #job{req = ts_config:getAttr(atom,Element#xmlElement.attributes, req, submit),
+ type = ts_config:getAttr(atom,Element#xmlElement.attributes, type, oar),
+ script = ts_config:getAttr(string,Element#xmlElement.attributes, script),
+ notify_script = ts_config:getAttr(string,Element#xmlElement.attributes, notify_script),
+ walltime = ts_config:getAttr(string,Element#xmlElement.attributes, walltime, "1:00:00"),
+ resources = ts_config:getAttr(string,Element#xmlElement.attributes, resources, ""),
+ queue = ts_config:getAttr(string,Element#xmlElement.attributes, queue),
+ notify_port = ts_config:getAttr(integer_or_string,Element#xmlElement.attributes, notify_port),
+ jobid = ts_config:getAttr(integer_or_string,Element#xmlElement.attributes, jobid, undefined),
+ name = ts_config:getAttr(string,Element#xmlElement.attributes, name, "tsung"),
+ user = ts_config:getAttr(string,Element#xmlElement.attributes, user, undefined),
+ options = ts_config:getAttr(string,Element#xmlElement.attributes, options),
+ duration = ts_config:getAttr(integer_or_string,Element#xmlElement.attributes, duration, 3600)
+ },
+ Msg= #ts_request{ack = parse,
+ endpage = true,
+ dynvar_specs = DynVar,
+ subst = SubstFlag,
+ match = MatchRegExp,
+ param = Request},
+
+ ts_config:mark_prev_req(Id-1, Tab, CurS),
+ ets:insert(Tab,{{CurS#session.id, Id},Msg}),
+ lists:foldl( fun(A,B)->ts_config:parse(A,B) end, Config#config{dynvar=[]},
+ Element#xmlElement.content);
+%% Parsing other elements
+parse_config(Element = #xmlElement{}, Conf = #config{}) ->
+ ts_config:parse(Element,Conf);
+%% Parsing non #xmlElement elements
+parse_config(_, Conf = #config{}) ->
+ Conf.
View
1 src/tsung_controller/ts_config_server.erl
@@ -209,6 +209,7 @@ handle_call({read_config, ConfigFile}, _From, State=#state{logdir=LogDir}) ->
spawn(?MODULE, start_file_server, [Config]),
NewConfig=loop_load(sort_static(Config#config{sessions=[NewLast]++Sessions})),
set_max_duration(Config#config.duration),
+ ts_job_notify:listen(Config#config.job_notify_port),
{reply, ok, State#state{config=NewConfig, static_users=NewConfig#config.static_users,total_weight = Sum}};
{error, Reason} ->
?LOGF("Error while checking config: ~p~n",[Reason],?EMERG),
View
4 src/tsung_controller/ts_controller_sup.erl
@@ -83,9 +83,11 @@ init([LogDir]) ->
worker, [ts_msg_server]},
UserSup = {ts_user_server_sup,{ts_user_server_sup,start_link,[]},transient,2000,
supervisor,[ts_user_server_sup]},
+ Notify = {ts_job_notify, {ts_job_notify, start_link, []}, transient, 2000,
+ worker, [ts_job_notify]},
{ok,{{one_for_one,?retries,10},
[Config, Mon, Stats_Mon, Request_Mon, Page_Mon, Connect_Mon, Transaction_Mon,
- Match_Log, Timer, Msg, UserSup, ErlangSup, MuninSup,SNMPSup]}}.
+ Match_Log, Timer, Msg, Notify,UserSup, ErlangSup, MuninSup,SNMPSup]}}.
%%%----------------------------------------------------------------------
%%% Internal functions
View
264 src/tsung_controller/ts_job_notify.erl
@@ -0,0 +1,264 @@
+%%%
+%%% Copyright 2011 © INRIA
+%%%
+%%% Author : Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%% Created: 04 mai 2011 by Nicolas Niclausse <nicolas.niclausse@inria.fr>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+%%% @doc
+%%%
+%%% @end
+
+-module(ts_job_notify).
+-vc('$Id: ts_notify.erl,v 0.0 2011/05/04 11:18:48 nniclaus Exp $ ').
+-author('nicolas.niclausse@inria.fr').
+
+
+-behaviour(gen_server).
+
+-include("ts_profile.hrl").
+
+%% API
+-export([start_link/0]).
+
+-export([listen/1, monitor/1, demonitor/1, wait_jobs/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {port, % listen port
+ acceptsock, % The socket we are accept()ing at
+ acceptloop_pid, % The PID of the companion process that blocks
+ jobs}).
+
+-record(job, {id,owner,queue_time,start_time}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({global, ?MODULE}, ?MODULE, [], []).
+
+listen(Port) ->
+ gen_server:cast({global, ?MODULE}, {listen, Port}).
+
+monitor({JobID, OwnerPid, StartTime}) ->
+ gen_server:cast({global, ?MODULE}, {monitor, {JobID, OwnerPid, StartTime}}).
+
+demonitor({JobID}) ->
+ gen_server:cast({global, ?MODULE}, {monitor, {JobID}}).
+
+wait_jobs(Pid) ->
+ gen_server:cast({global, ?MODULE}, {wait_jobs, Pid}).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([]) ->
+ ?LOG("Starting~n",?DEB),
+ {ok, #state{jobs=ets:new(jobs,[{keypos, #job.id}])}}.
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+
+handle_call({accepted, _Tag, Sock}, _From, State) ->
+ ?LOGF("New socket:~p~n", [Sock],?DEB),
+ {reply, continue, State#state{}};
+
+handle_call({accept_error, _Tag, Error}, _From, State) ->
+ ?LOGF("accept() failed ~p~n",[Error],?ERR),
+ {stop, Error, stop, State};
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast({monitor, {JobID, OwnerPid, QueueTime}}, State=#state{jobs=Jobs}) ->
+ ?LOGF("monitoring job ~p from pid ~p~n",[JobID,OwnerPid],?DEB),
+ ets:insert(Jobs,#job{id=JobID,owner=OwnerPid, queue_time=QueueTime}),
+ ts_mon:add({sum,job_queued,1}),
+ {noreply, State};
+handle_cast({demonitor, {JobID}}, State=#state{jobs=Jobs}) ->
+ ets:delete(Jobs,JobID),
+ {noreply, State};
+handle_cast({wait_jobs, Pid}, State=#state{jobs=Jobs}) ->
+ %% look for all jobs started by this pid
+ ?LOGF("look for job of ~p~n",[Pid],?DEB),
+ check_jobs(Jobs,Pid),
+ {noreply, State};
+
+handle_cast({listen, {undef_var, Name}}, State) ->
+ ?LOGF("No listen port defined, can't open listening socket ~p~n",[Name],?ERR),
+ {noreply, State};
+handle_cast({listen,Port}, State) ->
+ Opts = [{reuseaddr, true}, {active, once}],
+ case gen_tcp:listen(Port, Opts) of
+ {ok, ListenSock} ->
+ ?LOGF("Listening on port ~p done, start accepting loop~n",[Port],?INFO),
+ {noreply, State#state
+ {acceptsock=ListenSock,
+ acceptloop_pid = spawn_link(ts_utils,
+ accept_loop,
+ [self(), unused, ListenSock])}};
+ {error, Reason} ->
+ ?LOGF("Error when trying to listen to socket: ~p~n",[Reason],?ERR),
+ {noreply, State}
+ end;
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({tcp, Socket, Data}, State=#state{jobs=Jobs}) ->
+%% OAR:
+%% args are job_id,job_name,TAG,comment
+%% TAG can be:
+%% - RUNNING : when the job is launched
+%% - END : when the job is finished normally
+%% - ERROR : when the job is finished abnormally
+%% - INFO : used when oardel is called on the job
+%% - SUSPENDED : when the job is suspended
+%% - RESUMING : when the job is resumed
+ ?LOGF("received ~p from socket ~p",[Data,Socket],?DEB),
+ case string:tokens(Data," ") of
+ [Id, _Name, "RUNNING"|_] ->
+ ?LOGF("look for job ~p in table",[Id],?DEB),
+ [Job]=ets:lookup(Jobs,Id),
+ Now=now(),
+ Queued=ts_utils:elapsed(Job#job.queue_time,Now),
+ ts_mon:add([{sample,tr_job_wait,Queued},{sum,job_running,1}, {sum,job_queued,-1}]),
+ ets:update_element(Jobs,Id,{#job.start_time,Now});
+ [Id, _Name, "END"|_] ->
+ [Job]=ets:lookup(Jobs,Id),
+ Now=now(),
+ Duration=ts_utils:elapsed(Job#job.start_time,Now),
+ ts_mon:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,job_ok,1}]),
+ ets:delete_object(Jobs,Job),
+ check_jobs(Jobs,Job#job.owner);
+ [Id, _Name, "ERROR"|_] ->
+ [Job]=ets:lookup(Jobs,Id),
+ Now=now(),
+ Duration=ts_utils:elapsed(Job#job.start_time,Now),
+ ts_mon:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,job_error,1}]),
+ ets:delete_object(Jobs,Job),
+ check_jobs(Jobs,Job#job.owner);
+ [Id, _Name, "INFO"|_] ->
+ ok;
+ [Id, _Name, "SUSPENDED"|_] ->
+ ok;
+ [Id, _Name, "RESUMING"|_] ->
+ ok
+ end,
+ inet:setopts(Socket,[{active,once}]),
+ {noreply, State};
+handle_info({tcp_closed, _Socket}, State) ->
+ {noreply, State};
+handle_info(Info, State) ->
+ ?LOGF("Unexpected message received: ~p", [Info], ?WARN),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+check_jobs(Jobs,Pid)->
+ case ets:match_object(Jobs, #job{owner=Pid, _='_'}) of
+ [] ->
+ ?LOGF("no jobs for pid ~p~n",[Pid],?DEB),
+ Pid ! {erlang, ok, nojobs};
+ PidJobs->
+ ?LOGF("still ~p jobs for pid ~p~n",[length(PidJobs),Pid],?INFO)
+ end.
+
View
25 src/tsung_recorder/ts_proxy_listener.erl
@@ -54,8 +54,6 @@
%% Self callbacks
--export([accept_loop/3]).
-
-record(state, {
plugin,
acceptsock, % The socket we are accept()ing at
@@ -206,7 +204,7 @@ activate(State=#state{plugin=Plugin})->
{ok, ServerSock} ->
{ok, State#state
{acceptsock=ServerSock,
- acceptloop_pid = spawn_link(?MODULE,
+ acceptloop_pid = spawn_link(ts_utils,
accept_loop,
[self(), unused, ServerSock])}};
{error, Reason} ->
@@ -217,27 +215,6 @@ activate(State=#state{plugin=Plugin})->
{ok, State}
end.
-%%--------------------------------------------------------------------
-%% Func: accept_loop/3
-%% Purpose: infinite listen/accept loop, delegating handling of accepts
-%% to the gen_server proper.
-%% Returns: only returns by throwing an exception
-%%--------------------------------------------------------------------
-accept_loop(PPid, Tag, ServerSock)->
- case
- case gen_tcp:accept(ServerSock) of
- {ok, ClientSock} ->
- ok = gen_tcp:controlling_process(ClientSock, PPid),
- gen_server:call(PPid, {accepted, Tag, ClientSock});
- Error ->
- gen_server:call(PPid, {accept_error, Tag, Error})
- end
- of
- continue ->
- accept_loop(PPid, Tag, ServerSock);
- _->
- normal
- end.
%% Local Variables:
%% tab-width:4
View
26 tsung-1.0.dtd
@@ -112,19 +112,20 @@ repeat | if | change_type | foreach | set_option)*>
bidi CDATA #IMPLIED
persistent (true | false) #IMPLIED
probability NMTOKEN #REQUIRED
- type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs |ts_shell) #REQUIRED>
+ type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs |ts_shell|ts_job) #REQUIRED>
<!ELEMENT change_type EMPTY>
<!ATTLIST change_type
- new_type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs | ts_shell) #REQUIRED
+ new_type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs | ts_shell|ts_job) #REQUIRED
host NMTOKEN #REQUIRED
port NMTOKEN #REQUIRED
server_type NMTOKEN #REQUIRED
store ( true | false ) "false"
restore ( true | false ) "false"
>
-<!ELEMENT request ( match*, dyn_variable*, ( http | jabber | raw | pgsql | ldap | mysql |fs | shell ) )>
+<!ELEMENT request ( match*, dyn_variable*, ( http | jabber | raw |
+ pgsql | ldap | mysql |fs | shell | job ) )>
<!ATTLIST request
subst (true|false) "false"
>
@@ -238,6 +239,25 @@ repeat | if | change_type | foreach | set_option)*>
args CDATA ""
>
+<!ELEMENT job EMPTY >
+<!ATTLIST job
+ type (oar|torque) "oar"
+ req (submit|delete|stat|suspend|resume|wait_jobs) #REQUIRED
+ script CDATA #IMPLIED
+ walltime CDATA #IMPLIED
+ duration CDATA #IMPLIED
+ jobid CDATA #IMPLIED
+ script CDATA #IMPLIED
+ resources CDATA #IMPLIED
+ nodes CDATA #IMPLIED
+ queue CDATA #IMPLIED
+ options CDATA #IMPLIED
+ user CDATA #IMPLIED
+ name CDATA "tsung"
+ notify_port CDATA #IMPLIED
+ notify_script CDATA #IMPLIED
+>
+
<!ELEMENT pgsql (#PCDATA) >
<!ATTLIST pgsql
password CDATA #IMPLIED

0 comments on commit a8fc0ab

Please sign in to comment.