Skip to content
Browse files

add gen_server to choose local ports to bypass the 64K limit.

  • Loading branch information...
1 parent 239c2c3 commit c1163bd53427d1dbfe26ac32fcfc4ec7661acaf9 @nniclausse nniclausse committed Mar 5, 2010
View
8 src/test/ts_test_config.erl
@@ -113,14 +113,6 @@ read_config_maxusers_test() ->
{ok,{[{_,Max4},{_,_}],_,_}}=ts_config_server:get_client_config("client4"),
?assert(Max1+Max2+Max3+Max4 =< MaxNumber).
-choose_port_test() ->
- myset_env(),
- {Dict,3} = ts_config_server:choose_port('client',undefined,{3,5}),
- {Dict2,4} = ts_config_server:choose_port('client',Dict,{3,5}),
- {Dict3,5} = ts_config_server:choose_port('client',Dict2,{3,5}),
- {Dict4,3} = ts_config_server:choose_port('client2',Dict3,{3,5}),
- ?assertMatch({_,3}, ts_config_server:choose_port('client',Dict4,{3,5})).
-
myset_env()->
myset_env(0).
myset_env(Level)->
View
45 src/tsung/ts_client.erl
@@ -40,6 +40,9 @@
-define(MAX_RETRIES,3). % max number of connection retries
-define(RETRY_TIMEOUT,10000). % waiting time between retries (msec)
+-define(CLIENT_PORT_MIN,1025).
+-define(CLIENT_PORT_MAX,65535).
+
%% External exports
-export([start/1, next/1]).
@@ -95,6 +98,13 @@ init({#session{id = SessionId,
NewDynData = DynData#dyndata{dynvars=NewDynVars},
StartTime= now(),
set_thinktime(?short_timeout),
+ NewIP = case IP of
+ { RealIP, -1 } ->
+ {ok, MyHostName} = ts_utils:node_to_hostname(node()),
+ {RealIP, "cport-" ++ MyHostName};
+ Val ->
+ Val
+ end,
{ok, think, #state_rcv{ port = Server#server.port,
host = Server#server.host,
session_id = SessionId,
@@ -107,7 +117,7 @@ init({#session{id = SessionId,
dump = ?config(dump),
proto_opts = ProtoOpts,
count = Count,
- ip = IP,
+ ip = NewIP,
id = Id,
hibernate = Hibernate,
maxcount = Count,
@@ -599,7 +609,8 @@ handle_next_request(Request, State) ->
host=Host,
port=Port});
{error, Reason} ->
- ?LOGF("Error: Unable to send data, reason: ~p~n",[Reason],?ERR),
+ %% LOG only at INFO level since we report also an error to ts_mon
+ ?LOGF("Error: Unable to send data, reason: ~p~n",[Reason],?INFO),
CountName="error_send_"++atom_to_list(Reason),
ts_mon:add({ count, list_to_atom(CountName) }),
handle_timeout_while_sending(State);
@@ -703,7 +714,9 @@ set_profile(MaxCount, Count, ProfileId) when is_integer(ProfileId) ->
%% {stop, Reason}
%% purpose: try to reconnect if this is needed (when the socket is set to none)
%%----------------------------------------------------------------------
-reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort}) ->
+reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,0}) ->
+ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,0,0});
+reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort, Try}) when is_integer(CPort)->
?DebugF("Try to (re)connect to: ~p:~p from ~p using protocol ~p~n",
[ServerName,Port,IP,Protocol]),
Opts = protocol_options(Protocol, Proto_opts) ++ [{ip, IP},{port,CPort}],
@@ -716,16 +729,32 @@ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort}) ->
?Debug("(Re)connected~n"),
{ok, Socket};
{error, Reason} ->
- {A,B,C,D} = IP,
- ?LOGF("(Re)connect from ~p.~p.~p.~p to ~s:~p, Error: ~p~n",
- [A,B,C,D, ServerName, Port, Reason],?ERR),
- CountName="error_connect_"++atom_to_list(Reason),
- ts_mon:add({ count, list_to_atom(CountName) }),
+ {A,B,C,D} = IP,
+ %% LOG only at INFO level since we report also an error to ts_mon
+ ?LOGF("(Re)connect from ~p.~p.~p.~p:~p to ~s:~p, Error: ~p~n",
+ [A,B,C,D, CPort, ServerName, Port , Reason],?INFO),
+ case {Reason,CPort,Try} of
+ {eaddrinuse, Val,CPortServer} when Val == 0; CPortServer == undefined ->
+ %% already retry once, don't try again.
+ ts_mon:add({ count, error_connect_eaddrinuse });
+ {eaddrinuse, Val,CPortServer} when Val > 0 ->
+ %% retry once when tsung allocates port number
+ NewCPort = ts_cport:get_port(CPortServer,IP),
+ ?LOGF("Connect failed with client port ~p, retry with ~p~n",[CPort, NewCPort],?INFO),
+ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,NewCPort, undefined});
+ _ ->
+ CountName="error_connect_"++atom_to_list(Reason),
+ ts_mon:add({ count, list_to_atom(CountName) })
+ end,
{error, Reason}
end;
+reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPortServer}) ->
+ CPort = ts_cport:get_port(CPortServer,IP),
+ reconnect(none, ServerName, Port, {Protocol, Proto_opts}, {IP,CPort,CPortServer});
reconnect(Socket, _Server, _Port, _Protocol, _IP) ->
{ok, Socket}.
+
%%----------------------------------------------------------------------
%% Func: send/5
%% Purpose: wrapper function for send
View
158 src/tsung/ts_cport.erl
@@ -0,0 +1,158 @@
+%%%
+%%% Copyright 2009 © Nicolas Niclausse
+%%%
+%%% Author : Nicolas Niclausse <nicolas.nniclausse@niclux.org>
+%%% Created: 17 mar 2009 by Nicolas Niclausse <nicolas.nniclausse@niclux.org>
+%%%
+%%% This program is free software; you can redistribute it and/or modify
+%%% it under the terms of the GNU General Public License as published by
+%%% the Free Software Foundation; either version 2 of the License, or
+%%% (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+%%% GNU General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License
+%%% along with this program; if not, write to the Free Software
+%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
+%%%
+%%% In addition, as a special exception, you have the permission to
+%%% link the code of this program with any library released under
+%%% the EPL license and distribute linked combinations including
+%%% the two.
+
+-module(ts_cport).
+-vc('$Id: ts_cport.erl,v 0.0 2009/03/17 10:26:56 nniclaus Exp $ ').
+-author('nniclausse@niclux.org').
+
+-behaviour(gen_server).
+
+-include("ts_profile.hrl").
+
+%% API
+-export([start_link/0, get_port/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(EPMD_PORT,4369).
+
+-record(state, {
+ max_port = 1025,
+ min_port = 65535
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+
+get_port(CPortServer,IP)->
+ gen_server:call({global,CPortServer},{get,IP}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ %% registering can be long (global:sync needed), do it after the
+ %% init phase FIXME: maybe we could use phase_start of OTP ?
+ %% instead of using timeout
+
+ {Min, Max} = {?config(cport_min),?config(cport_max)},
+ ts_utils:init_seed(),
+ %% set random port for the initial value.
+ case catch Min+random:uniform(Max-Min) of
+ Val when is_integer(Val) ->
+ ?LOGF("Ok, starting with ~p value~n",[Val],?ERR),
+ {ok, #state{min_port=Min, max_port=Max},10};
+ Err ->
+ ?LOGF("ERR starting: ~p~n",[Err],?ERR),
+ {ok, #state{}}
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({get, ClientIP}, _From, State) ->
+ %% use the process dictionnary to store tha last port of each ip
+ %% should we use ets instead ?
+ Reply = case get(ClientIP) of
+ ?EPMD_PORT ->
+ ?EPMD_PORT + 1;
+ Val when Val > State#state.max_port ->
+ State#state.min_port;
+ Val ->
+ Val
+ end,
+ put(ClientIP,Reply+1),
+ ?LOGF("Give port number ~p to IP ~p~n",[Reply,ClientIP],?DEB),
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info(timeout, State) ->
+ %% several beam can be started in a host: we want a single cport
+ %% server to respond: register a name unique to the host
+ {ok, MyHostName} = ts_utils:node_to_hostname(node()),
+ Id="cport-" ++ MyHostName,
+ global:sync(),
+ global:register_name(Id,self(),{global, random_notify_name}),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
View
6 src/tsung/ts_launcher.erl
@@ -376,10 +376,14 @@ do_launch({Intensity, MyHostName})->
%% Check if global names are synced; Annoying "feature" of R10B7 and up
check_registered() ->
case global:registered_names() of
+ ["cport"++_Tail] ->
+ ?LOG("Only cport server registered ! syncing ...~n", ?WARN),
+ global:sync();
[] ->
?LOG("No registered processes ! syncing ...~n", ?WARN),
global:sync();
- _ -> ok
+ _ ->
+ ok
end.
set_warm_timeout(StartDate)->
View
4 src/tsung/ts_sup.erl
@@ -65,11 +65,13 @@ init([]) ->
transient, 2000, worker, [ts_launcher_static]},
LauncherManager = {ts_launcher_mgr, {ts_launcher_mgr, start, []},
transient, 2000, worker, [ts_launcher_mgr]},
+ PortServer = {ts_cport, {ts_cport, start_link, []},
+ transient, 2000, worker, [ts_cport]},
SessionCache = {ts_session_cache, {ts_session_cache, start, []},
transient, 2000, worker, [ts_session_cache]},
MonCache = {ts_mon_cache, {ts_mon_cache, start, []},
transient, 2000, worker, [ts_mon_cache]},
- {ok,{{one_for_one,?retries,10}, [LauncherManager, SessionCache, MonCache,ClientsSup, StaticLauncher,Launcher ]}}.
+ {ok,{{one_for_one,?retries,10}, [PortServer,LauncherManager, SessionCache, MonCache,ClientsSup, StaticLauncher,Launcher ]}}.
%%%----------------------------------------------------------------------
%%% Internal functions
View
10 src/tsung/ts_utils.erl
@@ -306,13 +306,21 @@ erl_system_args(extended)->
" -kernel inetrc '"++ InetRcFile ++ "'" ;
_ -> " "
end,
+ ListenMin = case application:get_env(kernel,inet_dist_listen_min) of
+ undefined -> "";
+ {ok, Min} -> " -kernel inet_dist_listen_min " ++ integer_to_list(Min)++ " "
+ end,
+ ListenMax = case application:get_env(kernel,inet_dist_listen_max) of
+ undefined -> "";
+ {ok, Max} -> " -kernel inet_dist_listen_max " ++ integer_to_list(Max)++" "
+ end,
Threads= "+A "++integer_to_list(erlang:system_info(thread_pool_size))++" ",
ProcessMax="+P "++integer_to_list(erlang:system_info(process_limit))++" ",
Mea = case erlang:system_info(version) of
"5.3" ++ _Tail -> " +Mea r10b ";
_ -> " "
end,
- lists:append([BasicArgs, Shared, Hybrid, Smp, Mea, Inet, Threads,ProcessMax]).
+ lists:append([BasicArgs, Shared, Hybrid, Smp, Mea, Inet, Threads,ProcessMax,ListenMin,ListenMax]).
%%----------------------------------------------------------------------
%% setsubdir/1
View
2 src/tsung/tsung.erl
@@ -50,7 +50,7 @@ start(_Type, _StartArgs) ->
{ok, Pid} ->
{ok, Pid};
Error ->
- ?LOGF("Can't start ! ~p ~n",[Error],?ERR),
+ ?LOGF("Can't start supervisor ! ~p ~n",[Error],?ERR),
Error
end.
View
42 src/tsung_controller/ts_config_server.erl
@@ -237,7 +237,6 @@ handle_call({get_req, Id, N}, _From, State) ->
{reply, {error, Other}, State}
end;
-%%
handle_call({get_user_agents}, _From, State) ->
Config = State#state.config,
case ets:lookup(Config#config.session_tab, {http_user_agent, value}) of
@@ -248,25 +247,25 @@ handle_call({get_user_agents}, _From, State) ->
end;
%% get user parameters (static user: the session id is already known)
-handle_call({get_user_param, HostName}, _From, State=#state{users=UserId,ports=Ports}) ->
+handle_call({get_user_param, HostName}, _From, State=#state{users=UserId}) ->
Config = State#state.config,
{value, Client} = lists:keysearch(HostName, #client.host, Config#config.clients),
- {IPParam, Server, NewPorts} = get_user_param(Client,Config,Ports),
+ {IPParam, Server} = get_user_param(Client,Config),
ts_mon:newclient({static,now()}),
- {reply, {ok, { IPParam, Server, UserId}}, State#state{users=UserId+1,ports=NewPorts}};
+ {reply, {ok, { IPParam, Server, UserId}}, State#state{users=UserId+1}};
%% get a new session id and user parameters for the given node
-handle_call({get_next_session, HostName}, _From, State=#state{users=Users,ports=Ports}) ->
+handle_call({get_next_session, HostName}, _From, State=#state{users=Users}) ->
Config = State#state.config,
{value, Client} = lists:keysearch(HostName, #client.host, Config#config.clients),
?DebugF("get new session for ~p~n",[_From]),
case choose_session(Config#config.sessions) of
{ok, Session=#session{id=Id}} ->
?LOGF("Session ~p choosen~n",[Id],?INFO),
ts_mon:newclient({Id,now()}),
- {IPParam, Server, NewPorts} = get_user_param(Client,Config,Ports),
+ {IPParam, Server} = get_user_param(Client,Config),
{reply, {ok, {Session, IPParam, Server, Users}},
- State#state{users=Users+1,ports=NewPorts}};
+ State#state{users=Users+1}};
Other ->
{reply, {error, Other}, State}
end;
@@ -379,13 +378,19 @@ handle_cast({newbeam, Host, Arrivals}, State=#state{last_beam_id = NodeId}) ->
{ok, Boot, _} = regexp:gsub(BootController,"tsung_controller","tsung"),
?DebugF("Boot ~p~n", [Boot]),
Sys_Args= ts_utils:erl_system_args(),
+ Ports = case (State#state.config)#config.ports_range of
+ {Min, Max} ->
+ " -tsung cport_min " ++ integer_to_list(Min) ++ " -tsung cport_max " ++ integer_to_list(Max);
+ undefined ->
+ ""
+ end,
LogDir = encode_filename(State#state.logdir),
Args = lists:flatten([ Sys_Args," -boot ", Boot,
" -boot_var ", ?TSUNGPATH, " ",PathVar, PA,
" +K true ",
" -tsung debug_level ", integer_to_list(?config(debug_level)),
" -tsung dump ", atom_to_list(?config(dump)),
- " -tsung log_file ", LogDir
+ " -tsung log_file ", LogDir, Ports
]),
?LOGF("starting newbeam on host ~p from ~p with Args ~p~n", [Host, State#state.hostname, Args], ?INFO),
Seed=(State#state.config)#config.seed,
@@ -441,11 +446,11 @@ set_start_date(undefined)->
ts_utils:add_time(now(), ?config(warm_time));
set_start_date(Date) -> Date.
-get_user_param(Client,Config,Ports)->
+get_user_param(Client,Config)->
{ok,IP} = choose_client_ip(Client),
{ok, Server} = choose_server(Config#config.servers),
- {NewPorts,CPort} = choose_port(IP, Ports,Config#config.ports_range),
- { {IP, CPort}, Server, NewPorts}.
+ CPort = choose_port(IP, Config#config.ports_range),
+ { {IP, CPort}, Server}.
%%----------------------------------------------------------------------
%% Func: choose_client_ip/1
@@ -681,19 +686,8 @@ start_slave(Host, Name, Args, Arrivals, Seed)->
exit({slave_failure, Reason})
end.
-choose_port(_,_, undefined) ->
- {[],0};
-choose_port(Client,undefined, Range) ->
- choose_port(Client,dict:new(), Range);
-choose_port(ClientIp,Ports, {Min, Max}) ->
- case dict:find(ClientIp,Ports) of
- {ok, Val} when Val =< Max ->
- NewPorts=dict:update_counter(ClientIp,1,Ports),
- {NewPorts,Val};
- _ -> % Max Reached or new entry
- NewPorts=dict:store(ClientIp,Min+1,Ports),
- {NewPorts,Min}
- end.
+choose_port(_,undefined) -> 0;
+choose_port(_, _Range) -> -1.
%% @spec session_name_to_session(Sessions::list(), Static::list() ) -> StaticUsers::list()
%% @doc convert session name to session id in static users list
View
19 tsung.sh.in
@@ -2,7 +2,14 @@
UNAME=`uname`
case $UNAME in
- "Linux") HOST=`hostname -s`;;
+ "Linux")
+ HOST=`hostname -s 2>/dev/null`
+ RET=$?
+ if [ $RET != 0 ]; then
+ HOST=`hostname`
+ echo "WARN: hostname -s failed, use '$HOST' as hostname" > /dev/stderr
+ fi
+ ;;
"SunOS") HOST=`hostname`;;
*) HOST=`hostname -s`;;
esac
@@ -22,6 +29,7 @@ PGSQL_SERVER_PORT=5432
NAME=tsung
CONTROLLER=tsung_controller
SMP_DISABLE=true
+WARM_TIME=10
TSUNGPATH=$INSTALL_DIR/lib/tsung-$VERSION/ebin
CONTROLLERPATH=$INSTALL_DIR/lib/tsung_controller-$VERSION/ebin
@@ -30,7 +38,8 @@ CONF_OPT_FILE="$HOME/.tsung/tsung.xml"
BOOT_OPT="-boot $INSTALL_DIR/lib/tsung_controller-$VERSION/priv/tsung_controller -boot_var TSUNGPATH $INSTALL_DIR "
DEBUG_LEVEL=5
ERL_RSH=" -rsh ssh "
-ERL_OPTS=" -smp auto +P 250000 +A 256 +K true @ERL_OPTS@ "
+ERL_DIST_PORTS=" -kernel inet_dist_listen_min 64000 -kernel inet_dist_listen_max 65500 "
+ERL_OPTS=" $ERL_DIST_PORTS -smp auto +P 250000 +A 256 +K true @ERL_OPTS@ "
COOKIE='tsung'
ERTS_RUN=`$ERL -version 2>&1 | tr -cd 0123456789.`
ERTS_BOOT=`grep erts $TSUNGPATH/../priv/tsung.rel 2> /dev/null| tr -cd 0123456789.`
@@ -63,6 +72,7 @@ start() {
-pa $TSUNGPATH -pa $CONTROLLERPATH \
-tsung_controller smp_disable $SMP_DISABLE \
-tsung_controller debug_level $DEBUG_LEVEL \
+ -tsung_controller warm_time $WARM_TIME \
-tsung_controller config_file \"$CONF_OPT_FILE\" -tsung_controller $LOG_OPT
}
@@ -122,15 +132,17 @@ usage() {
echo " -f <file> set configuration file (default is ~/.tsung/tsung.xml)"
echo " -l <logfile> set log file (default is ~/.tsung/log/YYYYMMDD-HH:MM/tsung.log)"
echo " -i <id> set controller id (default is empty)"
+ echo " -i <id> set controller id (default is empty)"
echo " -r <command> set remote connector (default is ssh)"
echo " -s enable erlang smp on client nodes"
echo " -F use long names (FQDN) for erlang nodes"
+ echo " -w warmup delay (default is 10 sec)"
echo " -v print version information and exit"
echo " -h display this help and exit"
exit
}
-while getopts "vhf:l:d:r:i:Fs" Option
+while getopts "vhf:l:d:r:i:Fsw:" Option
do
case $Option in
f) CONF_OPT_FILE=$OPTARG;;
@@ -146,6 +158,7 @@ do
d) DEBUG_LEVEL=$OPTARG;;
r) ERL_RSH=" -rsh $OPTARG ";;
F) NAMETYPE="-name";;
+ w) WARM_TIME=$OPTARG;;
s) SMP_DISABLE="false";;
v) version;;
i) ID=$OPTARG

0 comments on commit c1163bd

Please sign in to comment.
Something went wrong with that request. Please try again.