Skip to content

Commit

Permalink
hacking the existing node/cluster relationship apart - state=broken
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Watson committed May 31, 2012
1 parent 7d655f2 commit ddb1454
Show file tree
Hide file tree
Showing 7 changed files with 395 additions and 111 deletions.
14 changes: 14 additions & 0 deletions resources/supervision.config
@@ -0,0 +1,14 @@

{systest_supervision, [
{cluster, [
{host1, [red, blue]},
{host2, [green, yellow]}
]},
{startup, [
{handler, systest_stub},
{link_to_parent, true},
{detached, true},
{rpc_enabled, default}
]}
]}.

147 changes: 61 additions & 86 deletions src/systest_cli.erl
Expand Up @@ -43,8 +43,8 @@
handle_info/2, terminate/2, code_change/3]).

%% private record for tracking...
-record(sh, {command, node, state, log, rpc_enabled, pid,
port, shutdown, detached, shutdown_port}).
-record(sh, {command, args, env, node, state, log, rpc_enabled,
pid, port, shutdown, detached, shutdown_port}).

-include("systest.hrl").

Expand Down Expand Up @@ -84,16 +84,26 @@ interact(#'systest.node_info'{owner=Server}, Data) ->
gen_server:call(Server, {command, Data}).

%%
%% OTP gen_server API
%% systest_node API
%%

init([Node, Cmd, Args, Extra]) ->
process_flag(trap_exit, true),
start(Node=#'systest.node_info'{config=Config, host=Host, name=Name}) ->
%% TODO: provide a 'get_multi' version that avoids traversing repeatedly
Cmd = systest_config:eval("flags.start.program", Config,
[{callback, {node, fun systest_node:get_node_info/2}},
{return, value}]),
Env = systest_config:eval("flags.start.environment", Config,
[{callback, {node, fun systest_node:get_node_info/2}},
{return, value}]),
Args = case ?ENCONFIG("flags.start.args", Config) of
not_found -> [];
Argv -> Argv
end,
Extra = [{env, Env}|?CONFIG(on_start, Config, [])],

Scope = systest_node:get_node_info(scope, Node),
Id = systest_node:get_node_info(id, Node),
Config = systest_node:get_node_info(config, Node),
% ScratchDir = ?CONFIG(scratch_dir, Config, term),
Flags = systest_node:get_node_info(flags, Node),

Startup = ?CONFIG(startup, Config, []),
Expand Down Expand Up @@ -122,41 +132,28 @@ init([Node, Cmd, Args, Extra]) ->
end,

on_startup(Scope, Id, Port, Detached, RpcEnabled, Config,
fun(Port2, Pid) ->
net_kernel:monitor_nodes(true),
LogFd =
case LogEnabled of
true ->
LogFile = log_file("-stdio.log", Scope,
Id, Env, Config),
ct:pal("~p logging stdio to ~s~n", [Id, LogFile]),

{ok, Fd2} = file:open(LogFile, [write]),
Fd2;
false ->
user
fun(Port2, Pid, LogFd) ->
%% NB: as not all kinds of nodes can be contacted
%% via rpc, we have to do this manually here....
if RpcEnabled =:= true -> monitor_node({Id, true});
true -> ok
end,
%% TODO: it would probably be better to store all the cli
%% related stuff in the 'private' field if at all...
N2 = Node#'systest.node_info'{
os_pid=Pid,
owner=self(),
private=[
{command, ExecutableCommand},
{args, Args}|Env]},

N2 = Node#'systest.node_info'{os_pid=Pid},
Sh = #sh{pid=Pid,
port=Port2,
detached=Detached,
log=LogFd,
rpc_enabled=RpcEnabled,
shutdown=Shutdown,
command=ExecutableCommand,
state=running,
node=N2},
args=Args,
env=Env
state=running},
ct:pal(info,
"External Process Handler ~p::~p"
" Started at ~p~n", [Scope, Id, self()]),
{ok, Sh}
{ok, N2, Sh}
end);
StopError ->
StopError
Expand All @@ -166,26 +163,28 @@ on_startup(Scope, Id, Port, Detached, RpcEnabled, Config, StartFun) ->
%% we do the initial receive stuff up-front
%% just to avoid any initial ordering problems...

StartupLog = log_to("-port.startup.log",
Scope, Id, default_log_dir(Config)),
LogEnabled = ?CONFIG(log_enabled, Startup, true),
{LogName, LogFd} = case LogEnabled of
true ->
LogFile = log_file("-stdio.log", Scope,
Id, Env, Config),
{ok, Fd2} = file:open(LogFile, [write]),
{LogFile, Fd2};
false ->
{"console", user}
end,

ct:pal("Reading OS process id for ~p from ~p~n"
"RPC Enabled: ~p~n"
"Startup Log: ~s~n",
[Id, Port, RpcEnabled, StartupLog]),
{ok, Fd} = file:open(StartupLog, [write]),

try
case read_pid(Id, Port, Detached, RpcEnabled, Fd) of
{error, {stopped, Rc}} ->
{stop, {launch_failure, Rc}};
{error, Reason} ->
{stop, {launch_failure, Reason}};
{Port2, Pid} ->
StartFun(Port2, Pid)
end
after
file:close(Fd)
"StdIO Log: ~s~n",
[Id, Port, RpcEnabled, LogName]),
case read_pid(Id, Port, Detached, RpcEnabled, LogFd) of
{error, {stopped, Rc}} ->
{stop, {launch_failure, Rc}};
{error, Reason} ->
{stop, {launch_failure, Reason}};
{Port2, Pid, LogFd} ->
StartFun(Port2, Pid, LogFd)
end.

log_file(Suffix, Scope, Id, Env, Config) ->
Expand Down Expand Up @@ -272,26 +271,28 @@ handle_cast(stop, Sh=#sh{node=Node, shutdown=Shutdown, rpc_enabled=true}) ->
handle_cast(_Msg, Sh) ->
{noreply, Sh}.

handle_info({nodedown, NodeId},
Sh=#sh{state=State, node=#'systest.node_info'{id=NodeId}}) ->
%% TODO: can we handle this one in the generic server???
handle_msg({nodedown, NodeId}, node=#'systest.node_info'{id=NodeId},
Sh=#sh{state=State}) ->
ShutdownType = case State of
killed -> normal;
stopped -> normal;
_ -> nodedown
end,
{stop, ShutdownType, Sh};
handle_info({nodedown, NodeId},

handle_msg({nodedown, NodeId},
Sh=#sh{state=_, node=#'systest.node_info'{id=NodeId}}) ->
{stop, nodedown, Sh};
handle_info({Port, {data, {_, Line}}},
handle_msg({Port, {data, {_, Line}}},
Sh=#sh{port=Port, log=LogFd}) ->
io:format(LogFd, "~s~n", [Line]),
{noreply, Sh};
handle_info({Port, {exit_status, 0}},
handle_msg({Port, {exit_status, 0}},
Sh=#sh{port=Port, command=Cmd}) ->
ct:pal("Program ~s exited normally (status 0)~n", [Cmd]),
{stop, normal, Sh#sh{state=stopped}};
handle_info({Port, {exit_status, Exit}=Rc},
handle_msg({Port, {exit_status, Exit}=Rc},
Sh=#sh{port=Port, state=State, node=Node}) ->
ct:pal("Node ~p shut down with error/status code ~p~n",
[Node#'systest.node_info'.id, Exit]),
Expand All @@ -300,7 +301,7 @@ handle_info({Port, {exit_status, Exit}=Rc},
_ -> Rc
end,
{stop, ShutdownType, Sh#sh{state=stopped}};
handle_info({Port, closed}, Sh=#sh{port=Port, node=Node,
handle_msg({Port, closed}, Sh=#sh{port=Port, node=Node,
state=killed, detached=false}) ->
ct:pal("~p (attached) closed~n", [Port]),
case Sh#sh.rpc_enabled of
Expand All @@ -316,28 +317,28 @@ handle_info({Port, closed}, Sh=#sh{port=Port, node=Node,
end,
%% ct:pal("Node Status: ~p~n", [systest_node:status_check(Id)]),
{stop, normal, Sh};
handle_info({Port, closed}, Sh=#sh{port=Port}) ->
handle_msg({Port, closed}, Sh=#sh{port=Port}) ->
ct:pal("~p closed~n", [Port]),
{stop, {port_closed, Port}, Sh};
handle_info({'EXIT', Pid, ok}, Sh=#sh{shutdown_port=Pid, state=killed,
handle_msg({'EXIT', Pid, ok}, Sh=#sh{shutdown_port=Pid, state=killed,
detached=Detached}) ->
ct:pal("Termination Port completed ok~n"),
case Detached of
true -> {stop, normal, Sh};
false -> {noreply, Sh}
end;
handle_info({'EXIT', Pid, {error, Rc}}, Sh=#sh{shutdown_port=Pid}) ->
handle_msg({'EXIT', Pid, {error, Rc}}, Sh=#sh{shutdown_port=Pid}) ->
ct:pal("Termination Port stopped abnormally (status ~p)~n", [Rc]),
{stop, termination_port_error, Sh};
handle_info(Info, Sh=#sh{state=St, port=P, shutdown_port=SP}) ->
handle_msg(Info, Sh=#sh{state=St, port=P, shutdown_port=SP}) ->
ct:log("Ignoring Info Message: ~p~n"
"State: ~p~n"
"Port: ~p~n"
"Termination Port: ~p~n",
[Info, St, P, SP]),
{noreply, Sh}.

terminate(Reason, #sh{port=Port, log=Fd}) ->
terminate(Reason, _Node, #sh{port=Port, log=Fd}) ->
ct:pal("Terminating due to ~p~n", [Reason]),
%% TODO: verify that we're not *leaking* ports if we fail to close them
case Fd of
Expand All @@ -350,38 +351,11 @@ terminate(Reason, #sh{port=Port, log=Fd}) ->
ok
end.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%
%% Private API
%%

start_it(NI=#'systest.node_info'{config=BaseConfig, host=Host,
name=Name}, StartType) ->
Id = list_to_atom(atom_to_list(Name) ++ "@" ++ atom_to_list(Host)),
NI2 = systest_node:set_node_info([{id, Id}], NI),

Config = [{node, NI2}|BaseConfig],

%% TODO: provide a 'get_multi' version that avoids traversing repeatedly
Prog = systest_config:eval("flags.start.program", Config,
[{callback, {node, fun systest_node:get_node_info/2}},
{return, value}]),
Env = systest_config:eval("flags.start.environment", Config,
[{callback, {node, fun systest_node:get_node_info/2}},
{return, value}]),
Args = case ?ENCONFIG("flags.start.args", Config) of
not_found -> [];
Argv -> Argv
end,
Extra = [{env, Env}|?CONFIG(on_start, Config, [])],

case apply(gen_server, StartType,
[?MODULE, [NI2, Prog, Args, Extra], []]) of
{ok, Pid} -> gen_server:call(Pid, inspect);
Error -> Error
end.

run_shutdown_hook(Sh, Prog, Args, Env) ->
ct:pal("Spawning executable~n"
Expand Down Expand Up @@ -545,3 +519,4 @@ default_log_dir(Config) ->

logfile(Scope, Id) ->
atom_to_list(Scope) ++ "-" ++ atom_to_list(Id).

54 changes: 54 additions & 0 deletions src/systest_cth.erl
@@ -0,0 +1,54 @@
%% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% ex: ts=4 sw=4 et
%% ----------------------------------------------------------------------------
%%
%% Copyright (c) 2005 - 2012 Nebularis.
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in
%% all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
%% IN THE SOFTWARE.
%% ----------------------------------------------------------------------------
-module(systest_cth).

-include("systest.hrl").
-include_lib("common_test/include/ct.hrl").

-record(state, {
auto_start = false :: boolean(),
data = dict:new() :: dict()
}).

%% @doc Return a unique id for this CTH.
id(_Opts) ->
systest.

%% @doc Always called before any other callback function. Use this to initiate
%% any common state.
init(systest, Opts) ->
case application:start(systest, permanent) of
{error, {already_started, systest}} -> systest:reset();
{error, _Reason}=Err -> Err;
ok -> ok
end,
{ok, #state{auto_start=application:get_env(systest, auto_start)}}.

%% @doc Called before init_per_suite is called, this code might start a
%% cluster, if one is configured for this suite.
pre_init_per_suite(Suite, Config, State) ->
{Config, State}.


0 comments on commit ddb1454

Please sign in to comment.