Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

implement dump=protocol for job plugin

  • Loading branch information...
commit 041e001832d284adefa65465851d75f4ca638dab 1 parent c9f2b9b
Nicolas Niclausse nniclausse authored
8 include/ts_job.hrl
View
@@ -53,7 +53,13 @@
-record(job_session, {
jobid,
- submission_time
+ owner,
+ submission_time,
+ queue_time,
+ start_time,
+ end_time,
+ dump,
+ status
}).
14 src/tsung/ts_job.erl
View
@@ -132,8 +132,14 @@ subst(Job=#job{duration=D,req=Req,jobid=Id}, DynVars) ->
jobid=ts_search:subst(Id,DynVars)}.
-dump(A,B) ->
- ts_plugin:dump(A,B).
+dump(protocol,{none,#job_session{jobid=JobId,owner=Owner,submission_time=Sub,queue_time=Q,
+ start_time=Start,end_time=E,status=Status},Name,_,_})->
+ {R,_}=lists:mapfoldl(fun(A,Acc) -> {integer_to_list(round(ts_utils:elapsed(Acc,A))),A} end,Sub,[Q,Start,E]),
+ Date=integer_to_list(round(ts_utils:time2sec_hires(Sub))),
+ Data=ts_utils:join(";",[JobId,Name,Date]++R++[Status]),
+ ts_mon:dump({protocol, Owner, Data });
+dump(_P,_Args) ->
+ ok.
%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close}
%% State = #state_rcv{}
@@ -146,7 +152,7 @@ dump(A,B) ->
%% Setting Close to true will cause tsung to close the connection to
%% the server.
%% @end
-parse({os, cmd, _Args, Res},State=#state_rcv{session=S}) when is_list(Res)->
+parse({os, cmd, _Args, Res},State=#state_rcv{session=S,dump=Dump}) when is_list(Res)->
?LOGF("os:cmd result: ~p",[Res],?DEB),
%% oarsub output:
%% [ADMISSION RULE] Modify resource description with type constraints
@@ -156,7 +162,7 @@ parse({os, cmd, _Args, Res},State=#state_rcv{session=S}) when is_list(Res)->
case lists:last(Lines) of
"OAR_JOB_ID="++ID ->
?LOGF("OK,job id is ~p",[ID],?INFO),
- ts_job_notify:monitor({ID,self(),S#job_session.submission_time, now()}),
+ ts_job_notify:monitor({ID,self(),S#job_session.submission_time, now(),Dump}),
{State#state_rcv{ack_done=true,datasize=length(Res)}, [], false};
_ ->
{State#state_rcv{ack_done=true,datasize=length(Res)}, [], false}
34 src/tsung_controller/ts_job_notify.erl
View
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-include("ts_profile.hrl").
+-include("ts_job.hrl").
%% API
-export([start_link/0]).
@@ -47,7 +48,6 @@
acceptloop_pid, % The PID of the companion process that blocks
jobs}).
--record(job, {id,owner,queue_time,start_time}).
%%%===================================================================
%%% API
@@ -66,8 +66,8 @@ start_link() ->
listen(Port) ->
gen_server:cast({global, ?MODULE}, {listen, Port}).
-monitor({JobID, OwnerPid, StartTime, QueuedTime}) ->
- gen_server:cast({global, ?MODULE}, {monitor, {JobID, OwnerPid, StartTime, QueuedTime}}).
+monitor({JobID, OwnerPid, StartTime, QueuedTime, Dump}) ->
+ gen_server:cast({global, ?MODULE}, {monitor, {JobID, OwnerPid, StartTime, QueuedTime,Dump}}).
demonitor({JobID}) ->
gen_server:cast({global, ?MODULE}, {monitor, {JobID}}).
@@ -94,7 +94,7 @@ init([]) ->
?LOG("Starting~n",?DEB),
case global:whereis_name(ts_config_server) of
undefined ->
- {ok, #state{jobs=ets:new(jobs,[{keypos, #job.id}])}};
+ {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}};
_Pid ->
?LOG("Config server is alive !~n",?DEB),
case ts_config_server:get_jobs_state() of
@@ -104,7 +104,7 @@ init([]) ->
{ok, NewState};
Else ->
?LOGF("Got this from config server:~p~n",[Else],?DEB),
- {ok, #state{jobs=ets:new(jobs,[{keypos, #job.id}])}}
+ {ok, #state{jobs=ets:new(jobs,[{keypos, #job_session.jobid}])}}
end
end.
@@ -145,9 +145,9 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({monitor, {JobID, OwnerPid, SubmitTS, QueuedTS}}, State=#state{jobs=Jobs}) ->
+handle_cast({monitor, {JobID, OwnerPid, SubmitTS, QueuedTS,Dump}}, State=#state{jobs=Jobs}) ->
?LOGF("monitoring job ~p from pid ~p~n",[JobID,OwnerPid],?DEB),
- ets:insert(Jobs,#job{id=JobID,owner=OwnerPid, queue_time=QueuedTS}),
+ ets:insert(Jobs,#job_session{jobid=JobID,owner=OwnerPid, submission_time=SubmitTS, queue_time=QueuedTS,dump=Dump}),
SubmitTime=ts_utils:elapsed(SubmitTS,QueuedTS),
ts_mon:add([{sum,job_queued,1},{sample,tr_job_submit,SubmitTime}]),
{noreply, State};
@@ -210,31 +210,33 @@ handle_info({tcp, Socket, Data}, State=#state{jobs=Jobs}) ->
?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
[Job] ->
Now=now(),
- Queued=ts_utils:elapsed(Job#job.queue_time,Now),
+ Queued=ts_utils:elapsed(Job#job_session.queue_time,Now),
ts_mon:add([{sample,tr_job_wait,Queued},{sum,job_running,1}, {sum,job_queued,-1}]),
- ets:update_element(Jobs,Id,{#job.start_time,Now})
+ ets:update_element(Jobs,Id,{#job_session.start_time,Now})
end;
- [Id, _Name, "END"|_] ->
+ [Id, Name, "END"|_] ->
case ets:lookup(Jobs,Id) of
[] ->
?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
[Job]->
Now=now(),
- Duration=ts_utils:elapsed(Job#job.start_time,Now),
+ Duration=ts_utils:elapsed(Job#job_session.start_time,Now),
ts_mon:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,ok_job ,1}]),
+ ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="ok"},Name,undefined,undefined}),
ets:delete_object(Jobs,Job),
- check_jobs(Jobs,Job#job.owner)
+ check_jobs(Jobs,Job#job_session.owner)
end;
- [Id, _Name, "ERROR"|_] ->
+ [Id, Name, "ERROR"|_] ->
case ets:lookup(Jobs,Id) of
[] ->
?LOGF("Job owner of ~p is unknown",[Id],?NOTICE);
[Job]->
Now=now(),
- Duration=ts_utils:elapsed(Job#job.start_time,Now),
+ Duration=ts_utils:elapsed(Job#job_session.start_time,Now),
ts_mon:add([{sample,tr_job_duration,Duration},{sum,job_running,-1}, {sum,error_job,1}]),
+ ts_job:dump(Job#job_session.dump,{none,Job#job_session{end_time=Now,status="error"},Name,undefined,undefined}),
ets:delete_object(Jobs,Job),
- check_jobs(Jobs,Job#job.owner)
+ check_jobs(Jobs,Job#job_session.owner)
end;
[_Id, _Name, "INFO"|_] ->
ok;
@@ -294,7 +296,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
check_jobs(Jobs,Pid)->
- case ets:match_object(Jobs, #job{owner=Pid, _='_'}) of
+ case ets:match_object(Jobs, #job_session{owner=Pid, _='_'}) of
[] ->
?LOGF("no jobs for pid ~p~n",[Pid],?DEB),
Pid ! {erlang, ok, nojobs};
Please sign in to comment.
Something went wrong with that request. Please try again.