Skip to content

Commit

Permalink
more cmd implemented for fs plugin
Browse files Browse the repository at this point in the history
SVN Revision: 1066
  • Loading branch information
nniclausse committed Jan 14, 2010
1 parent a92dc29 commit 297af7d
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 15 deletions.
3 changes: 2 additions & 1 deletion include/ts_fs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

-record(fs_dyndata,
{
position,
position=0,
iodev
}
).
Expand All @@ -38,6 +38,7 @@
command,
mode,
path,
iodev,
size,
dest,
position
Expand Down
5 changes: 5 additions & 0 deletions src/tsung/ts_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,12 @@ connect(gen_tcp,Server, Port, Opts) -> gen_tcp:connect(Server, Port, Opts);
connect(ssl,Server, Port,Opts) -> ssl:connect(Server, Port, Opts);
connect(gen_udp,_Server, _Port, Opts)-> gen_udp:open(0,Opts);
connect(erlang,Server,Port,Opts) ->
%% we are running on a slave node, so we must disable proxied IO;
%% to do this, get the groupe leader of a local process and use it
%% as the group leader of the started processes.
{group_leader,GLPid }=process_info(whereis(net_kernel),group_leader),
Pid=spawn_link(ts_erlang,client,[self(),Server,Port,Opts]),
group_leader(GLPid,Pid),
?LOGF("erlang process created with pid ~p~n" ,[Pid],?DEB),
{ok, Pid}.

Expand Down
2 changes: 0 additions & 2 deletions src/tsung/ts_erlang.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
client(MasterPid,Server,Port,Opts)->
receive
{Module, Fun, Args, Size} ->
?DebugF("Calling ~:~s with args ~p and size ~p~n",[Module,Fun,Args, Size]),
Res=apply(Module,Fun,Args),
?DebugF("result: ~p~n",[Res]),
MasterPid ! {erlang,self(),{Module,Fun,Args,Res}},
client(MasterPid,Server,Port,Opts)
after ?TIMEOUT ->
Expand Down
49 changes: 42 additions & 7 deletions src/tsung/ts_fs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,18 @@ add_dynparams(true, DynData, Param, HostData) ->
NewParam = subst(Param, DynData#dyndata.dynvars),
add_dynparams(DynData#dyndata.proto,NewParam, HostData).

add_dynparams(Dyn, Param, _HostData) ->
?DebugF("Dyndata=~p, param=~p~n",[Dyn, Param]),
add_dynparams(#fs_dyndata{position=Pos,iodev=IODevice}, Req=#fs{}, _HostData) when is_integer(Pos)->
Req#fs{position=Pos,iodev=IODevice};
add_dynparams(#fs_dyndata{}, Param, _HostData) ->
Param.

%%----------------------------------------------------------------------
%% @spec subst(Req, term())
%% Purpose: Replace on the fly dynamic element of the request.
%% Returns: #pgsql_request
%% Returns: record()
%%----------------------------------------------------------------------
subst(Req, DynData) ->
Req.
%% Req#fs{sql=ts_search:subst(SQL, DynData)}.


%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close}
%% State = #state_rcv{}
Expand All @@ -128,12 +127,36 @@ subst(Req, DynData) ->
%% Setting Close to true will cause tsung to close the connection to
%% the server.
%% @end
parse({file, write_file, Args, ok},State) ->
parse({file, write_file, _Args, ok},State) ->
{State#state_rcv{ack_done=true,datasize=0}, [], false};
parse({file, open, Args, {ok,IODevice}},State=#state_rcv{dyndata=DynData}) ->
NewDyn=(DynData#dyndata.proto)#fs_dyndata{iodev=IODevice,position=0},
{State#state_rcv{ack_done=true,datasize=0,dyndata=DynData#dyndata{proto=NewDyn}}, [], false};
parse({file, close, [IODevice], ok},State=#state_rcv{dyndata=DynData}) ->
NewDyn=(DynData#dyndata.proto)#fs_dyndata{iodev=undefined,position=0},
{State#state_rcv{ack_done=true,datasize=0,dyndata=DynData#dyndata{proto=NewDyn}}, [], false};
parse({file, pread, [IODev,Pos,Size], {ok,_Data}},State=#state_rcv{dyndata=DynData,datasize=DataSize}) ->
NewDyn=(DynData#dyndata.proto)#fs_dyndata{position=Pos+Size},
{State#state_rcv{ack_done=true,datasize=DataSize+Size,dyndata=DynData#dyndata{proto=NewDyn}}, [], false};
parse({file, pread, [IODev,Pos,Size], eof},State=#state_rcv{dyndata=DynData,datasize=DataSize}) ->
NewDyn=(DynData#dyndata.proto)#fs_dyndata{position=0},
{State#state_rcv{ack_done=true,datasize=DataSize+Size,dyndata=DynData#dyndata{proto=NewDyn}}, [], false};
parse({file, pwrite, [IODev,Pos,Bytes], ok},State=#state_rcv{dyndata=DynData}) ->
NewDyn=(DynData#dyndata.proto)#fs_dyndata{position=Pos+size(Bytes)},
{State#state_rcv{ack_done=true,datasize=0,dyndata=DynData#dyndata{proto=NewDyn}}, [], false};
parse({file, open, [Path,_], Error},State) ->
?LOGF("error while opening file: ~p~n",[Path],?ERR),
ts_mon:add({count,error_fs_open}),
{State#state_rcv{ack_done=true,datasize=0}, [], false};
parse({file, write_file, [Path,_], {error,Reason}},State) ->
?LOGF("error while writing file: ~p~n",[Path],?ERR),
ts_mon:add({count,error_fs_write}),
{State#state_rcv{ack_done=true, datasize=0}, [], false};
parse({file, delete, [Path], ok},State) ->
{State#state_rcv{ack_done=true, datasize=0}, [], false};
parse({file, delete, [Path], {error,Reason}},State) ->
?LOGF("error while deleting file: ~p (~p)~n",[Path, Reason],?ERR),
{State#state_rcv{ack_done=true, datasize=0}, [], false};
parse({file, read_file, [Path], {ok,Res}},State) ->
% we don't know the file size
Size = case file:read_file_info(Path) of
Expand All @@ -157,11 +180,23 @@ parse({file, read_file, [Path], {error,Reason}},State) ->
parse_bidi(_Data, _State) ->
erlang:error(dummy_implementation).

%% @spec get_message(param()) -> Message::binary()
%% @spec get_message(param()) -> Message::binary()|tuple()
%% @doc Creates a new message to send to the connected server.
%% @end
get_message(#fs{command=read, path=Path}) ->
{file,read_file,[Path],0};
get_message(#fs{command=read_chunk, iodev=IODevice,position=Loc, size=Size}) when is_integer(Loc)->
{file,pread,[IODevice,Loc,Size],0};
get_message(#fs{command=write_chunk, iodev=IODevice,position=Loc, size=Size}) when is_integer(Loc)->
{file,pwrite,[IODevice,Loc,Size],Size};
get_message(#fs{command=open, mode=read,path=Path,position=Loc}) when is_integer(Loc)->
{file,open,[Path,[read,raw,binary]],0};
get_message(#fs{command=open, mode=write,path=Path,position=Loc}) when is_integer(Loc)->
{file,open,[Path,[write,raw,binary]],0};
get_message(#fs{command=close, iodev=IODevice}) ->
{file,close,[IODevice],0};
get_message(#fs{command=delete, path=Path}) ->
{file,delete,[Path],0};
get_message(#fs{command=write,path=Path, size=Size}) ->
{file,write_file,[Path,ts_utils:urandomstr(Size)],Size}.

Expand Down
5 changes: 2 additions & 3 deletions src/tsung_controller/ts_config_fs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ parse_config(Element = #xmlElement{name=fs},
Cmd = ts_config:getAttr(atom,Element#xmlElement.attributes, cmd, write),
Size = ts_config:getAttr(integer,Element#xmlElement.attributes, size, 1024),
Path = ts_config:getAttr(string,Element#xmlElement.attributes, path),
Mode = ts_config:getAttr(atom,Element#xmlElement.attributes, mode,write),
Mode = ts_config:getAttr(atom,Element#xmlElement.attributes, mode, write),
Dest = ts_config:getAttr(string,Element#xmlElement.attributes, dest),
Position = ts_config:getAttr(integer,Element#xmlElement.attributes, position, undefined),

Expand All @@ -60,8 +60,7 @@ parse_config(Element = #xmlElement{name=fs},
param = Request},

ts_config:mark_prev_req(Id-1, Tab, CurS),
ets:insert(Tab,{{CurS#session.id, Id},Msg#ts_request{endpage=true,
dynvar_specs=DynVar}}),
ets:insert(Tab,{{CurS#session.id, Id},Msg}),
lists:foldl( fun(A,B)->ts_config:parse(A,B) end, Config#config{dynvar=[]},
Element#xmlElement.content);
%% Parsing other elements
Expand Down
4 changes: 2 additions & 2 deletions tsung-1.0.dtd
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ repeat | if )*>

<!ELEMENT fs (#PCDATA) >
<!ATTLIST fs
cmd (read|write|open|stats|copy) "write"
path CDATA #REQUIRED
cmd (read|write|open|delete|stats|copy|read_chunk|write_chunk|close) "write"
path CDATA #IMPLIED
size CDATA "1024"
position CDATA #IMPLIED
mode (read | write | append ) #IMPLIED
Expand Down

0 comments on commit 297af7d

Please sign in to comment.