Permalink
Browse files

no longer start cport on all nodes: start only one cport instance per

node, before the launchers.
  • Loading branch information...
1 parent 5c180b2 commit 14e863c1a93f10f26790d76af9a8b6af09c7bcae @nniclausse nniclausse committed Aug 6, 2010
@@ -113,6 +113,18 @@ read_config_maxusers_test() ->
{ok,{[{_,Max4},{_,_}],_,_}}=ts_config_server:get_client_config("client4"),
?assert(Max1+Max2+Max3+Max4 =< MaxNumber).
+
+cport_list_node_test() ->
+ List=['tsung1@toto',
+ 'tsung3@titi',
+ 'tsung2@toto',
+ 'tsung7@titi',
+ 'tsung6@toto',
+ 'tsung4@tutu'],
+ Rep = ts_config_server:get_one_node_per_host(List),
+ ?assertEqual(['tsung1@toto', 'tsung3@titi', 'tsung4@tutu'], lists:sort(Rep)).
+
+
myset_env()->
myset_env(0).
myset_env(Level)->
View
@@ -32,7 +32,7 @@
-include("ts_profile.hrl").
%% API
--export([start_link/0, get_port/2]).
+-export([start_link/1, get_port/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -52,8 +52,8 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(Name) ->
+ gen_server:start_link({global, Name}, ?MODULE, [], []).
get_port(CPortServer,IP)->
@@ -72,16 +72,14 @@ get_port(CPortServer,IP)->
%%--------------------------------------------------------------------
init([]) ->
%% registering can be long (global:sync needed), do it after the
- %% init phase FIXME: maybe we could use phase_start of OTP ?
- %% instead of using timeout
-
+ %% init phase (the config_server will send us a message)
{Min, Max} = {?config(cport_min),?config(cport_max)},
ts_utils:init_seed(),
%% set random port for the initial value.
case catch Min+random:uniform(Max-Min) of
Val when is_integer(Val) ->
?LOGF("Ok, starting with ~p value~n",[Val],?NOTICE),
- {ok, #state{min_port=Min, max_port=Max},10};
+ {ok, #state{min_port=Min, max_port=Max}};
Err ->
?LOGF("ERR starting: ~p~n",[Err],?ERR),
{ok, #state{}}
@@ -126,13 +124,7 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
-handle_info(timeout, State) ->
- %% several beam can be started on a host: we want a single cport
- %% server to respond: register a name unique to the host
- {ok, MyHostName} = ts_utils:node_to_hostname(node()),
- Id="cport-" ++ MyHostName,
- global:sync(),
- global:register_name(Id,self(),{global, random_notify_name}),
+handle_info(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
@@ -96,8 +96,9 @@ handle_call({check_registered}, _From,State=#state{synced=undefined}) ->
global:sync();
_ ->
ok
- end.
+ end,
{reply, ok, State#state{synced=yes}};
+
handle_call({check_registered}, _From,State=#state{synced=yes}) ->
?LOG("syncing already done, skip~n", ?INFO),
{reply, ok, State#state{synced=yes}};
View
@@ -32,7 +32,7 @@
-behaviour(supervisor).
%% External exports
--export([start_link/0]).
+-export([start_link/0, start_cport/1]).
%% supervisor callbacks
-export([init/1]).
@@ -44,6 +44,13 @@ start_link() ->
?LOG("starting supervisor ...~n",?INFO),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+start_cport({Node, CPortName}) ->
+ ?LOGF("starting cport server ~p on node ~p ~n",[CPortName, Node],?INFO),
+ PortServer = {CPortName, {ts_cport, start_link, [CPortName]},
+ transient, 2000, worker, [ts_cport]},
+ supervisor:start_child({?MODULE, Node}, PortServer).
+
+
%%%----------------------------------------------------------------------
%%% Callback functions from supervisor
%%%----------------------------------------------------------------------
@@ -65,13 +72,11 @@ init([]) ->
transient, 2000, worker, [ts_launcher_static]},
LauncherManager = {ts_launcher_mgr, {ts_launcher_mgr, start, []},
transient, 2000, worker, [ts_launcher_mgr]},
- PortServer = {ts_cport, {ts_cport, start_link, []},
- transient, 2000, worker, [ts_cport]},
SessionCache = {ts_session_cache, {ts_session_cache, start, []},
transient, 2000, worker, [ts_session_cache]},
MonCache = {ts_mon_cache, {ts_mon_cache, start, []},
transient, 2000, worker, [ts_mon_cache]},
- {ok,{{one_for_one,?retries,10}, [PortServer,LauncherManager, SessionCache, MonCache,ClientsSup, StaticLauncher,Launcher ]}}.
+ {ok,{{one_for_one,?retries,10}, [LauncherManager, SessionCache, MonCache,ClientsSup, StaticLauncher,Launcher ]}}.
%%%----------------------------------------------------------------------
%%% Internal functions
@@ -352,8 +352,22 @@ handle_cast({newbeams, HostList}, State=#state{logdir = LogDir,
ts_launcher_static:launch({Node,[]}),
ts_launcher:launch({Node, [], Seed})
end,
+ case Config#config.ports_range of
+ undefined ->
+ ok;
+ _ ->
+ ?LOG("Start client port server on remote nodes ~n",?NOTICE),
+ %% first, get a single erlang node per host, and start the cport gen_server on this node
+ UNodes = get_one_node_per_host(RemoteNodes),
+ SetParams = fun(Node) ->
+ {ok, MyHostName} =ts_utils:node_to_hostname(Node),
+ {Node, "cport-" ++ MyHostName}
+ end,
+ CPorts = lists:map(SetParams, UNodes),
+ ?LOGF("Will run start_cport with arg:~p ~n",[CPorts],?DEB),
+ lists:foreach(fun ts_sup:start_cport/1 ,CPorts)
+ end,
lists:foreach(StartLaunchers, RemoteNodes),
- %% FIXME: active cport if needed
{noreply, State#state{last_beam_id = LastId}}
end;
@@ -748,7 +762,7 @@ set_remote_args(LogDir,PortsRange)->
?DebugF("Boot ~p~n", [Boot]),
Sys_Args= ts_utils:erl_system_args(),
LogDirEnc = encode_filename(LogDir),
- Ports = case of
+ Ports = case PortsRange of
{Min, Max} ->
" -tsung cport_min " ++ integer_to_list(Min) ++ " -tsung cport_max " ++ integer_to_list(Max);
undefined ->
@@ -761,3 +775,25 @@ set_remote_args(LogDir,PortsRange)->
" -tsung dump ", atom_to_list(?config(dump)),
" -tsung log_file ", LogDirEnc, Ports
]).
+
+
+%% @spec get_one_node_per_host(RemoteNodes::list()) -> Nodes::list()
+%% @doc From a list if erlang nodenames, return a list with only a
+%% single node per host
+%% @end
+
+get_one_node_per_host(RemoteNodes) ->
+ get_one_node_per_host(RemoteNodes,dict:new()) .
+
+get_one_node_per_host([], Dict) ->
+ {_,Nodes} = lists:unzip(dict:to_list(Dict)),
+ Nodes;
+get_one_node_per_host([Node | Nodes], Dict) ->
+ Host = ts_utils:node_to_hostname(Node),
+ case dict:is_key(Host, Dict) of
+ true ->
+ get_one_node_per_host(Nodes,Dict);
+ false ->
+ NewDict = dict:store(Host, Node, Dict),
+ get_one_node_per_host(Nodes,NewDict)
+ end.

0 comments on commit 14e863c

Please sign in to comment.