Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

t: Make process handling more robust with IPC::Run #3123

Merged
merged 1 commit into from May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 49 additions & 58 deletions t/05-scheduler-full.t
Expand Up @@ -24,6 +24,7 @@ BEGIN {
$ENV{FULLSTACK} = 1 if $ENV{SCHEDULER_FULLSTACK};
}

use IPC::Run qw(start);
use FindBin;
use lib "$FindBin::Bin/lib";
use OpenQA::Constants qw(WORKERS_CHECKER_THRESHOLD DB_TIMESTAMP_ACCURACY);
Expand Down Expand Up @@ -57,8 +58,9 @@ my $api_secret = $api_credentials->secret;

# create web UI and websocket server
my $mojoport = $ENV{OPENQA_BASE_PORT} = Mojo::IOLoop::Server->generate_port();
my $wspid = create_websocket_server($mojoport + 1, 0, 1, 1);
my $ws = create_websocket_server($mojoport + 1, 0, 1, 1);
my $webapi = create_webapi($mojoport, sub { });
my @workers;

# setup share and result dir
my $sharedir = setup_share_dir($ENV{OPENQA_BASEDIR});
Expand All @@ -67,26 +69,33 @@ ok -d $resultdir, "results directory created under $resultdir";

sub create_worker {
my ($apikey, $apisecret, $host, $instance, $log) = @_;
my $connect_args = "--instance=${instance} --apikey=${apikey} --apisecret=${apisecret} --host=${host}";
my @connect_args = ("--instance=${instance}", "--apikey=${apikey}", "--apisecret=${apisecret}", "--host=${host}");
note("Starting standard worker. Instance: $instance for host $host");
# save testing time as we do not test a webUI host being down for
# multiple minutes
$ENV{OPENQA_WORKER_CONNECT_RETRIES} = 1;
my @cmd = qw(perl ./script/worker --isotovideo=../os-autoinst/isotovideo --verbose);
push @cmd, @connect_args;
return $log ? start \@cmd, \undef, '>&', $log : start \@cmd;
}

my $workerpid = fork();
if ($workerpid == 0) {
exec("perl ./script/worker $connect_args --isotovideo=../os-autoinst/isotovideo --verbose"
. (defined $log ? " 2>&1 > $log" : ""));
die "FAILED TO START WORKER";
}
return defined $log ? `pgrep -P $workerpid` : $workerpid;
sub stop_workers { stop_service($_, 1) for @workers }

sub dead_workers {
my $schema = shift;
$_->update({t_updated => DateTime->from_epoch(epoch => time - WORKERS_CHECKER_THRESHOLD - DB_TIMESTAMP_ACCURACY)})
for $schema->resultset("Workers")->all();
}

sub scheduler_step { OpenQA::Scheduler::Model::Jobs->singleton->schedule() }

subtest 'Scheduler worker job allocation' => sub {
note('try to allocate to previous worker (supposed to fail)');
my $allocated = scheduler_step();
is @$allocated, 0;

note('starting two workers');
my $w1_pid = create_worker($api_key, $api_secret, "http://localhost:$mojoport", 1);
my $w2_pid = create_worker($api_key, $api_secret, "http://localhost:$mojoport", 2);
@workers = map { create_worker($api_key, $api_secret, "http://localhost:$mojoport", $_) } (1, 2);
wait_for_worker($schema, 3);
wait_for_worker($schema, 4);

Expand All @@ -103,7 +112,7 @@ subtest 'Scheduler worker job allocation' => sub {
($allocated) = scheduler_step();
is @$allocated, 0;

stop_service($_, 1) for ($w1_pid, $w2_pid);
stop_workers;
dead_workers($schema);
};

Expand All @@ -121,15 +130,15 @@ subtest 're-scheduling and incompletion of jobs when worker rejects jobs or goes
is(@$allocated, 0, 'no jobs allocated');

# simulate a worker in broken state; it will register itself but declare itself as broken
my $broken_w_pid = broken_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, 'out of order');
@workers = broken_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, 'out of order');
wait_for_worker($schema, 5);
$allocated = scheduler_step();
is(@$allocated, 0, 'scheduler does not consider broken worker for allocating job');
stop_service($broken_w_pid, 1);
stop_workers;
dead_workers($schema);

# simulate a worker in idle state that rejects all jobs assigned to it
my $rejective_w_pid = rejective_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, 'rejection reason');
@workers = rejective_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, 'rejection reason');
wait_for_worker($schema, 5);

note('waiting for job to be assigned and set back to re-scheduled');
Expand All @@ -152,12 +161,12 @@ subtest 're-scheduling and incompletion of jobs when worker rejects jobs or goes
sleep .2;
}
ok($job_scheduled, 'assigned job set back to scheduled if worker reports back again but has abandoned the job');
stop_service($rejective_w_pid, 1);
stop_workers;
dead_workers($schema);

# start an unstable worker; it will register itself but ignore any job assignment (also not explicitely reject
# assignments)
my $unstable_w_pid = unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, -1);
@workers = unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, -1);
wait_for_worker($schema, 5);

($allocated) = scheduler_step();
Expand All @@ -166,10 +175,10 @@ subtest 're-scheduling and incompletion of jobs when worker rejects jobs or goes
and is(@{$allocated}[0]->{worker}, 5, 'job allocated to expected worker');

# kill the worker but assume the job has been actually started and is running
stop_service($unstable_w_pid, 1);
stop_workers;
$jobs->find(99982)->update({state => OpenQA::Jobs::Constants::RUNNING});

$unstable_w_pid = unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, -1);
@workers = unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", 3, -1);
wait_for_worker($schema, 5);

note('waiting for job to be incompleted');
Expand All @@ -185,13 +194,11 @@ subtest 're-scheduling and incompletion of jobs when worker rejects jobs or goes
'running job incompleted if its worker re-connects claiming not to work on it anymore';
like $job->reason, qr/abandoned: associated worker .+:\d+ re-connected but abandoned the job/, 'reason is set';

stop_service($unstable_w_pid, 1);
stop_workers;
dead_workers($schema);
};

subtest 'Simulation of heavy unstable load' => sub {
my $allocated;
my @workers;
dead_workers($schema);
my @duplicated;

Expand All @@ -201,11 +208,11 @@ subtest 'Simulation of heavy unstable load' => sub {
push(@duplicated, $duplicate) if defined $duplicate;
}

push(@workers, unresponsive_worker($api_key, $api_secret, "http://localhost:$mojoport", $_)) for (1 .. 50);
@workers = map { unresponsive_worker($api_key, $api_secret, "http://localhost:$mojoport", $_) } (1 .. 50);
my $i = 4;
wait_for_worker($schema, ++$i) for 1 .. 10;

($allocated) = scheduler_step(); # Will try to allocate to previous worker and fail!
my $allocated = scheduler_step(); # Will try to allocate to previous worker and fail!
is(@$allocated, 10, "Allocated maximum number of jobs that could have been allocated") or die;
my %jobs;
my %w;
Expand All @@ -223,16 +230,14 @@ subtest 'Simulation of heavy unstable load' => sub {
}
is $dup->state, OpenQA::Jobs::Constants::SCHEDULED, "Job(" . $dup->id . ") back in scheduled state";
}
stop_service($_, 1) for @workers;
stop_workers;
dead_workers($schema);

@workers = ();

push(@workers, unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", $_, 3)) for (1 .. 30);
$i = 5;
@workers = map { unstable_worker($api_key, $api_secret, "http://localhost:$mojoport", $_, 3) } (1 .. 30);
$i = 5;
wait_for_worker($schema, ++$i) for 0 .. 12;

($allocated) = scheduler_step(); # Will try to allocate to previous worker and fail!
$allocated = scheduler_step(); # Will try to allocate to previous worker and fail!
is @$allocated, 0, "All failed allocation on second step - workers were killed";
for my $dup (@duplicated) {
for (0 .. 100) {
Expand All @@ -242,50 +247,36 @@ subtest 'Simulation of heavy unstable load' => sub {
is $dup->state, OpenQA::Jobs::Constants::SCHEDULED, "Job(" . $dup->id . ") is still in scheduled state";
}

stop_service($_, 1) for @workers;
stop_workers;
};

subtest 'Websocket server - close connection test' => sub {
stop_service($wspid);
stop_service($ws);

local $ENV{OPENQA_LOGFILE};
local $ENV{MOJO_LOG_LEVEL};

my $log_file = tempfile;
my $unstable_ws_pid = create_websocket_server($mojoport + 1, 1, 0);
my $w2_pid = create_worker($api_key, $api_secret, "http://localhost:$mojoport", 2, $log_file);
my $log;
# create unstable ws
$ws = create_websocket_server($mojoport + 1, 1, 0);
@workers = create_worker($api_key, $api_secret, "http://localhost:$mojoport", 2, \$log);

my $found_connection_closed_in_log = 0;
my $log_file_content = '';
for (my $attempt = 0; $attempt < 300; ++$attempt) {
$log_file_content = $log_file->slurp;
if ($log_file_content =~ qr/.*Websocket connection to .* finished by remote side with code 1008.*/) {
for my $attempt (0 .. 300) {
$log = '';
$workers[0]->pump;
note "worker out: $log";
if ($log =~ qr/.*Websocket connection to .* finished by remote side with code 1008.*/) {
$found_connection_closed_in_log = 1;
last;
}
sleep 1;
}

is($found_connection_closed_in_log, 1, 'closed ws connection logged by worker');
stop_service($_) for ($unstable_ws_pid, $w2_pid);
dead_workers($schema);

if (!$found_connection_closed_in_log) {
note('worker log file contained:');
note($log_file_content);
}
is $found_connection_closed_in_log, 1, 'closed ws connection logged by worker';
};

END {
stop_service($_) for ($wspid, $webapi);
stop_workers;
stop_service($_, 1) for ($ws, $webapi);
}

sub dead_workers {
my $schema = shift;
$_->update({t_updated => DateTime->from_epoch(epoch => time - WORKERS_CHECKER_THRESHOLD - DB_TIMESTAMP_ACCURACY)})
for $schema->resultset("Workers")->all();
}

sub scheduler_step { OpenQA::Scheduler::Model::Jobs->singleton->schedule() }

done_testing;
5 changes: 3 additions & 2 deletions t/14-grutasks.t
Expand Up @@ -95,7 +95,7 @@ my $t = Test::Mojo->new('OpenQA::WebAPI');

# launch an additional app to serve some file for testing blocking downloads
my $mojo_port = Mojo::IOLoop::Server->generate_port;
my $pid = OpenQA::Test::Utils::create_webapi($mojo_port, sub { });
my $webapi = OpenQA::Test::Utils::create_webapi($mojo_port, sub { });

# define a fix asset_size_limit configuration for this test to be independent of the default value
# we possibly want to adjust without going into the details of this test
Expand Down Expand Up @@ -608,7 +608,8 @@ subtest 'download assets with correct permissions' => sub {
ok -f $assetpath, 'asset downloaded';
};

kill TERM => $pid;
$webapi->signal('TERM');
$webapi->finish;

done_testing();

Expand Down
18 changes: 9 additions & 9 deletions t/33-developer_mode.t
Expand Up @@ -56,12 +56,12 @@ unless (can_load(modules => {'Selenium::Remote::WDKeys' => undef})) {
exit(0);
}

my $workerpid;
my $wspid;
my $livehandlerpid;
my $schedulerpid;
my $worker;
my $ws;
my $livehandler;
my $scheduler;
sub turn_down_stack {
stop_service($_) for ($workerpid, $wspid, $livehandlerpid, $schedulerpid);
stop_service($_) for ($worker, $ws, $livehandler, $scheduler);
}

# skip if appropriate modules aren't available
Expand Down Expand Up @@ -94,9 +94,9 @@ ok(Mojolicious::Commands->start_app('OpenQA::WebAPI', 'eval', '1+0'));
# start Selenium test driver and other daemons
my $mojoport = Mojo::IOLoop::Server->generate_port;
my $driver = call_driver(sub { }, {mojoport => $mojoport});
$wspid = create_websocket_server($mojoport + 1, 0, 0);
$schedulerpid = create_scheduler($mojoport + 3);
$livehandlerpid = create_live_view_handler($mojoport);
$ws = create_websocket_server($mojoport + 1, 0, 0);
$scheduler = create_scheduler($mojoport + 3);
$livehandler = create_live_view_handler($mojoport);

# login
$driver->title_is('openQA', 'on main page');
Expand Down Expand Up @@ -128,7 +128,7 @@ for my $ext (qw(.json .png)) {
'can rename needle ' . $ext);
}

$workerpid = start_worker(get_connect_args());
$worker = start_worker(get_connect_args());
ok wait_for_job_running($driver), 'test 1 is running';

sub wait_for_session_info {
Expand Down
4 changes: 2 additions & 2 deletions t/40-script_load_templates.t
Expand Up @@ -55,8 +55,8 @@ $ENV{MOJO_LOG_LEVEL} = 'fatal';
my $mojoport = Mojo::IOLoop::Server->generate_port;
$host = "localhost:$mojoport";
my $schema = OpenQA::Test::Database->new->create;
my $pid = OpenQA::Test::Utils::create_webapi($mojoport, sub { });
END { stop_service $pid; }
my $webapi = OpenQA::Test::Utils::create_webapi($mojoport, sub { });
END { stop_service $webapi; }
# Note: See t/fixtures/03-users.pl for test user credentials
my $apikey = 'PERCIVALKEY02';
my $apisecret = 'PERCIVALSECRET02';
Expand Down
21 changes: 9 additions & 12 deletions t/43-scheduling-and-worker-scalability.t
Expand Up @@ -23,6 +23,7 @@ use File::Path 'make_path';
use Scalar::Util 'looks_like_number';
use Mojo::File 'path';
use Mojo::Util 'dumper';
use IPC::Run qw(start);
use FindBin;
use lib "$FindBin::Bin/lib";
use OpenQA::Scheduler::Model::Jobs;
Expand Down Expand Up @@ -78,8 +79,8 @@ my $workers = $schema->resultset('Workers');
my $jobs = $schema->resultset('Jobs');

# create web UI and websocket server
my $web_socket_server_pid = create_websocket_server($ports{websocket}, 0, 1, 1);
my $webui_pid = create_webapi($ports{webui}, sub { });
my $web_socket_server = create_websocket_server($ports{websocket}, 0, 1, 1);
my $webui = create_webapi($ports{webui}, sub { });

# prepare spawning workers
my $sharedir = setup_share_dir($ENV{OPENQA_BASEDIR});
Expand All @@ -103,14 +104,10 @@ sub spawn_worker {
my ($instance) = @_;

note("Starting worker '$instance'");
my $workerpid = fork();
return $workerpid if $workerpid != 0;

exec('perl', $worker_path, "--instance=$instance", @worker_args);
die "failed to start worker $instance";
start ['perl', $worker_path, "--instance=$instance", @worker_args];
}
my %worker_ids;
my @worker_pids = map { spawn_worker($_) } (1 .. $worker_count);
my @workers = map { spawn_worker($_) } (1 .. $worker_count);

# create jobs
note("Creating $job_count jobs");
Expand Down Expand Up @@ -197,7 +194,7 @@ subtest 'assign and run jobs' => sub {
};

subtest 'stop all workers' => sub {
stop_service $_ for @worker_pids;
stop_service $_ for @workers;
my @non_offline_workers;
for my $try (1 .. $polling_tries_workers) {
@non_offline_workers = ();
Expand All @@ -214,7 +211,7 @@ subtest 'stop all workers' => sub {
done_testing;

END {
stop_service $_ for @worker_pids;
stop_service $web_socket_server_pid;
stop_service $webui_pid;
stop_service $_ for @workers;
stop_service $web_socket_server;
stop_service $webui;
}