Permalink
Browse files

first succesfull run

SVN Revision: 1065
  • Loading branch information...
nniclausse committed Jan 14, 2010
1 parent ff3194d commit a92dc2953b28b6e284eef26cea98c86562303a39
View
@@ -0,0 +1,50 @@
+%%% This code was developped by IDEALX (http://IDEALX.org/) and
+%%% contributors (their names can be found in the CONTRIBUTORS file).
+%%% Copyright (C) 2000-2001 IDEALX
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+%%% In addition, as a special exception, you have the permission to
+%%% link the code of this program with any library released under
+%%% the EPL license and distribute linked combinations including
+%%% the two.
+
+
+-vc('$Id$ ').
+-author('nicolas.niclausse@niclux.org').
+
+%% use by the client to create the request
+
+-record(fs_dyndata,
+ {
+ position,
+ iodev
+ }
+ ).
+
+-record(fs, {
+ command,
+ mode,
+ path,
+ size,
+ dest,
+ position
+ }).
+
+-record(fs_sess, {
+ fixme
+ }).
+
+
View
@@ -86,7 +86,7 @@ init({#session{id = SessionId,
size = Count,
type = CType}, IP, Server,Id}) ->
?DebugF("Init ... started with count = ~p~n",[Count]),
- ts_utils:init_seed(),
+ ts_utils:init_seed(),
?DebugF("Get dynparams for ~p~n",[CType]),
DynData = CType:init_dynparams(),
@@ -179,6 +179,15 @@ handle_info({NetEvent, _Socket, Data}, wait_ack, State) when NetEvent==tcp;
TimeOut=(NewState#state_rcv.proto_opts)#proto_opts.idle_timeout,
{next_state, wait_ack, NewState#state_rcv{socket=NewSocket}, TimeOut}
end;
+handle_info({erlang, _Socket, Data}, wait_ack, State) ->
+ ?DebugF("erlang function result received: size=~p ~n",[size(term_to_binary(Data))]),
+ case handle_data_msg(Data, State) of
+ {NewState=#state_rcv{ack_done=true}, _Opts} ->
+ handle_next_action(NewState#state_rcv{ack_done=false});
+ {NewState, _Opts} ->
+ TimeOut=(NewState#state_rcv.proto_opts)#proto_opts.idle_timeout,
+ {next_state, wait_ack, NewState, TimeOut}
+ end;
handle_info({udp, Socket,_IP,_InPortNo, Data}, wait_ack, State) ->
?DebugF("UDP packet received: size=~p ~n",[size(Data)]),
%% we don't care about IP,InPortNo, do the same as for a tcp connection:
@@ -541,7 +550,7 @@ handle_next_request(Request, State) ->
_ -> %page already started
State#state_rcv.page_timestamp
end,
- ts_mon:add({ sum, size_sent, size(Message)}),
+ ts_mon:add({ sum, size_sent, size_msg(Message)}),
ts_mon:sendmes({State#state_rcv.dump, self(), Message}),
NewState = State#state_rcv{socket = NewSocket,
protocol = Protocol,
@@ -597,6 +606,12 @@ handle_next_request(Request, State) ->
{stop, normal, State}
end.
+
+%% @spec size_msg(Data::term) -> integer()
+size_msg(Data) when is_binary(Data) ->
+ size(Data);
+size_msg({_Mod,_Fun,_Args,Size}) -> Size.
+
%%----------------------------------------------------------------------
%% Func: finish_session/1
%% Args: State
@@ -688,17 +703,20 @@ reconnect(Socket, _Server, _Port, _Protocol, _IP) ->
send(gen_tcp,Socket,Message,_,_) -> gen_tcp:send(Socket,Message);
send(ssl,Socket,Message,_,_) -> ssl:send(Socket,Message);
send(gen_udp,Socket,Message,Host,Port) ->gen_udp:send(Socket,Host,Port,Message);
-send(erlang,Pid,Message,_,_) -> Pid ! Message.
+send(erlang,Pid,Message,_,_) ->
+ Pid ! Message,
+ ok.
%%----------------------------------------------------------------------
%% Func: connect/4
%% Return: {ok, Socket} | {error, Reason}
%%----------------------------------------------------------------------
connect(gen_tcp,Server, Port, Opts) -> gen_tcp:connect(Server, Port, Opts);
connect(ssl,Server, Port,Opts) -> ssl:connect(Server, Port, Opts);
-connect(gen_udp,_Server, _Port, Opts)-> gen_udp:open(0,Opts).
+connect(gen_udp,_Server, _Port, Opts)-> gen_udp:open(0,Opts);
connect(erlang,Server,Port,Opts) ->
- Pid=spawn(ts_erlang,client,[self(),Server,Port,Opts]),
+ Pid=spawn_link(ts_erlang,client,[self(),Server,Port,Opts]),
+ ?LOGF("erlang process created with pid ~p~n" ,[Pid],?DEB),
{ok, Pid}.
@@ -724,7 +742,8 @@ protocol_options(gen_udp,#proto_opts{udp_rcv_size=Rcv, udp_snd_size=Snd}) ->
{active, once},
{recbuf, Rcv},
{sndbuf, Snd}
- ].
+ ];
+protocol_options(erlang,_) -> [].
@@ -762,7 +781,7 @@ handle_data_msg(Data,State=#state_rcv{request=Req,clienttype=Type,maxcount=MaxCo
ts_mon:rcvmes({State#state_rcv.dump, self(), Data}),
{NewState, Opts, Close} = Type:parse(Data, State),
- NewBuffer=set_new_buffer(Req, State#state_rcv.buffer, Data),
+ NewBuffer=set_new_buffer(Req, NewState#state_rcv.buffer, Data),
?DebugF("Dyndata is now ~p~n",[NewState#state_rcv.dyndata]),
case NewState#state_rcv.ack_done of
@@ -854,9 +873,11 @@ set_new_buffer(#ts_request{match=[], dynvar_specs=[]},_,_) ->
<< >>;
set_new_buffer(_, Buffer,closed) ->
Buffer;
-set_new_buffer(_, OldBuffer, Data) ->
+set_new_buffer(_, OldBuffer, Data) when is_binary(OldBuffer)->
?Debug("Bufferize response~n"),
- << OldBuffer/binary, Data/binary >>.
+ << OldBuffer/binary, Data/binary >>;
+set_new_buffer(_, _, Data) -> % don't need buffer for non binary responses (erlang fun case)
+ Data.
%%----------------------------------------------------------------------
%% Func: set_connected_status/1
View
@@ -23,14 +23,19 @@
-vc('$Id: ts_erlang.erl,v 0.0 2009/08/20 16:31:58 nniclaus Exp $ ').
-author('nniclaus@sophia.inria.fr').
+-define(TIMEOUT,36000000). % 1 hour
-export([client/4]).
+-include("ts_profile.hrl").
+
+-export([client/4]).
client(MasterPid,Server,Port,Opts)->
receive
- {msg, Module, Fun, Args} ->
+ {Module, Fun, Args, Size} ->
+ ?DebugF("Calling ~:~s with args ~p and size ~p~n",[Module,Fun,Args, Size]),
Res=apply(Module,Fun,Args),
- MasterPid ! {erlang,self(),Res},
+ ?DebugF("result: ~p~n",[Res]),
+ MasterPid ! {erlang,self(),{Module,Fun,Args,Res}},
client(MasterPid,Server,Port,Opts)
after ?TIMEOUT ->
MasterPid ! timeout
View
@@ -0,0 +1,168 @@
+%%%
+%%% Copyright 2009 © INRIA
+%%%
+%%% Author : Nicolas Niclausse <nniclaus@sophia.inria.fr>
+%%% Created: 20 août 2009 by Nicolas Niclausse <nniclaus@sophia.inria.fr>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+
+-module(ts_fs).
+-vc('$Id: ts_erlang.erl,v 0.0 2009/08/20 16:31:58 nniclaus Exp $ ').
+-author('nniclaus@sophia.inria.fr').
+
+-include("ts_profile.hrl").
+-include("ts_fs.hrl").
+-include_lib("kernel/include/file.hrl").
+
+-export([init_dynparams/0,
+ add_dynparams/4,
+ get_message/1,
+ session_defaults/0,
+ parse/2,
+ parse_config/2,
+ new_session/0]).
+
+
+%%====================================================================
+%% Data Types
+%%====================================================================
+
+%% @type dyndata() = #dyndata{proto=ProtoData::term(),dynvars=list()}.
+%% Dynamic data structure
+%% @end
+
+%% @type server() = {Host::tuple(),Port::integer(),Protocol::atom()}.
+%% Host/Port/Protocol tuple
+%% @end
+
+%% @type param() = {dyndata(), server()}.
+%% Dynamic data structure
+%% @end
+
+%% @type hostdata() = {Host::tuple(),Port::integer()}.
+%% Host/Port pair
+%% @end
+
+%% @type client_data() = binary() | closed.
+%% Data passed to a protocol implementation is either a binary or the
+%% atom closed indicating that the server closed the tcp connection.
+%% @end
+
+%%====================================================================
+%% API
+%%====================================================================
+
+parse_config(El,Config) ->
+ ts_config_fs:parse_config(El, Config).
+
+
+%% @spec session_defaults() -> {ok, Persistent} | {ok, Persistent, Bidi}
+%% Persistent = bool()
+%% Bidi = bool()
+%% @doc Default parameters for sessions of this protocol. Persistent
+%% is true if connections are preserved after the underlying tcp
+%% connection closes. Bidi should be true for bidirectional protocols
+%% where the protocol module needs to reply to data sent from the
+%% server. @end
+session_defaults() ->
+ {ok, true}. % not relevant for erlang type (?).
+
+%% @spec new_session() -> State::term()
+%% @doc Initialises the state for a new protocol session.
+%% @end
+new_session() ->
+ #fs{}.
+
+%% @spec init_dynparams() -> dyndata()
+%% @doc Creates a new record/term for storing dynamic request data.
+%% @end
+init_dynparams() ->
+ #dyndata{proto=#fs_dyndata{}}.
+
+%% @spec add_dynparams(Subst, dyndata(), param(), hostdata()) -> {dyndata(), server()} | dyndata()
+%% Subst = term()
+%% @doc Updates the dynamic request data structure created by
+%% {@link ts_protocol:init_dynparams/0. init_dynparams/0}.
+%% @end
+add_dynparams(false, DynData, Param, HostData) ->
+ add_dynparams(DynData#dyndata.proto, Param, HostData);
+add_dynparams(true, DynData, Param, HostData) ->
+ NewParam = subst(Param, DynData#dyndata.dynvars),
+ add_dynparams(DynData#dyndata.proto,NewParam, HostData).
+
+add_dynparams(Dyn, Param, _HostData) ->
+ ?DebugF("Dyndata=~p, param=~p~n",[Dyn, Param]),
+ Param.
+
+%%----------------------------------------------------------------------
+%% @spec subst(Req, term())
+%% Purpose: Replace on the fly dynamic element of the request.
+%% Returns: #pgsql_request
+%%----------------------------------------------------------------------
+subst(Req, DynData) ->
+ Req.
+%% Req#fs{sql=ts_search:subst(SQL, DynData)}.
+
+
+%% @spec parse(Data::client_data(), State) -> {NewState, Opts, Close}
+%% State = #state_rcv{}
+%% Opts = proplist()
+%% Close = bool()
+%% @doc
+%% Opts is a list of inet:setopts socket options. Don't change the
+%% active/passive mode here as tsung will set {active,once} before
+%% your options.
+%% Setting Close to true will cause tsung to close the connection to
+%% the server.
+%% @end
+parse({file, write_file, Args, ok},State) ->
+ {State#state_rcv{ack_done=true,datasize=0}, [], false};
+parse({file, write_file, [Path,_], {error,Reason}},State) ->
+ ?LOGF("error while writing file: ~p~n",[Path],?ERR),
+ ts_mon:add({count,error_fs_write}),
+ {State#state_rcv{ack_done=true, datasize=0}, [], false};
+parse({file, read_file, [Path], {ok,Res}},State) ->
+ % we don't know the file size
+ Size = case file:read_file_info(Path) of
+ {ok,#file_info{size=S}} -> S;
+ _ -> 0
+ end,
+ {State#state_rcv{ack_done=true,datasize=Size}, [], false};
+parse({file, read_file, [Path], {error,Reason}},State) ->
+ ?LOGF("error while reading file: ~p~n",[Path],?ERR),
+ ts_mon:add({count,error_fs_read}),
+ {State#state_rcv{ack_done=true,datasize=0}, [], false}.
+
+%% @spec parse_bidi(Data, State) -> {nodata, NewState} | {Data, NewState}
+%% Data = client_data()
+%% NewState = term()
+%% State = term()
+%% @doc Parse a block of data from the server. No reply will be sent
+%% if the return value is nodata, otherwise the Data binary will be
+%% sent back to the server immediately.
+%% @end
+parse_bidi(_Data, _State) ->
+ erlang:error(dummy_implementation).
+
+%% @spec get_message(param()) -> Message::binary()
+%% @doc Creates a new message to send to the connected server.
+%% @end
+get_message(#fs{command=read, path=Path}) ->
+ {file,read_file,[Path],0};
+get_message(#fs{command=write,path=Path, size=Size}) ->
+ {file,write_file,[Path,ts_utils:urandomstr(Size)],Size}.
+
+
@@ -102,7 +102,8 @@ parse(Element = #xmlElement{name=server, attributes=Attrs}, Conf=#config{servers
Type = case getAttr(Attrs, type) of
"ssl" -> ssl;
"tcp" -> gen_tcp;
- "udp" -> gen_udp
+ "udp" -> gen_udp;
+ "erlang" -> erlang
end,
lists:foldl(fun parse/2,
Oops, something went wrong.

0 comments on commit a92dc29

Please sign in to comment.