Skip to content

Commit

Permalink
Merge pull request #3636 from Martchus/single-shot-worker
Browse files Browse the repository at this point in the history
Allow configuring the worker to terminate after all jobs have been processed
  • Loading branch information
okurz committed Dec 15, 2020
2 parents c4c7e4d + 2f2eeca commit 3a581bf
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 18 deletions.
14 changes: 9 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,15 @@ install-generic:
for i in systemd/*.{service,target,timer}; do \
install -m 644 $$i "$(DESTDIR)"/usr/lib/systemd/system ;\
done
sed -e 's_^\(ExecStart=/usr/share/openqa/script/worker\) \(--instance %i\)$$_\1 --no-cleanup \2_' \
systemd/openqa-worker@.service \
> "$(DESTDIR)"/usr/lib/systemd/system/openqa-worker-no-cleanup@.service
sed -i '/Wants/aConflicts=openqa-worker@.service' \
"$(DESTDIR)"/usr/lib/systemd/system/openqa-worker-no-cleanup@.service
sed \
-e 's_^\(ExecStart=/usr/share/openqa/script/worker\) \(--instance %i\)$$_\1 --no-cleanup \2_' \
-e '/Wants/aConflicts=openqa-worker@.service' \
systemd/openqa-worker@.service > "$(DESTDIR)"/usr/lib/systemd/system/openqa-worker-no-cleanup@.service
sed \
-e '/Type/aEnvironment=OPENQA_WORKER_TERMINATE_AFTER_JOBS_DONE=1' \
-e 's/Restart=on-failure/Restart=always/' \
-e '/Wants/aConflicts=openqa-worker@.service' \
systemd/openqa-worker@.service > "$(DESTDIR)"/usr/lib/systemd/system/openqa-worker-auto-restart@.service
install -m 755 systemd/systemd-openqa-generator "$(DESTDIR)"/usr/lib/systemd/system-generators
install -m 644 systemd/tmpfiles-openqa.conf "$(DESTDIR)"/usr/lib/tmpfiles.d/openqa.conf
install -m 644 systemd/tmpfiles-openqa-webui.conf "$(DESTDIR)"/usr/lib/tmpfiles.d/openqa-webui.conf
Expand Down
1 change: 1 addition & 0 deletions dist/rpm/openQA.spec
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ fi
%{_unitdir}/openqa-worker-cacheservice-minion.service
%{_unitdir}/openqa-worker-cacheservice.service
%{_unitdir}/openqa-worker-no-cleanup@.service
%{_unitdir}/openqa-worker-auto-restart@.service
%{_unitdir}/openqa-slirpvde.service
%{_unitdir}/openqa-vde_switch.service
%{_datadir}/openqa/script/openqa-slirpvde
Expand Down
7 changes: 7 additions & 0 deletions etc/openqa/workers.ini
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
# system.
#LOCAL_UPLOAD = 0

# Whether to terminate after all assigned jobs have been processed. When
# combined with auto-restarting on service manager level (e.g. configuring
# Restart=always in the systemd service) this helps applying updates and config
# changes without interrupting jobs. This setting can be overridden by setting
# the environment variable OPENQA_WORKER_TERMINATE_AFTER_JOBS_DONE.
#TERMINATE_AFTER_JOBS_DONE = 0

# Enable an investigation feature to compare the current list of installed
# packages against the list of packages during the previous last good job.
# Configuring this variable provides a command line to list the packages and
Expand Down
12 changes: 7 additions & 5 deletions lib/OpenQA/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ sub _get_next_job {
}

# accepts or skips the next job in the queue of pending jobs
# returns a truthy value if accepting/skipping the next job was started successfully
sub _accept_or_skip_next_job_in_queue {
my ($self, $last_job_exit_status) = @_;

Expand Down Expand Up @@ -478,11 +479,11 @@ sub _accept_or_skip_next_job_in_queue {
$self->_prepare_job_execution($next_job);
if (my $skip_reason = $self->{_jobs_to_skip}->{$next_job_id}) {
log_info("Skipping job $next_job_id from queue (web UI sent command $skip_reason)");
$next_job->skip($skip_reason);
return $next_job->skip($skip_reason);
}
else {
log_info("Accepting job $next_job_id from queue");
$next_job->accept;
return $next_job->accept;
}
}

Expand Down Expand Up @@ -726,10 +727,11 @@ sub _handle_job_status_changed {
# update the general worker availability (e.g. we might detect here that QEMU from the last run
# hasn't been terminated yet)
# incomplete subsequent jobs in the queue if it turns out the worker is generally broken
return $self->_accept_or_skip_next_job_in_queue(WORKER_SR_BROKEN) unless $self->check_availability;

# continue with the next job in the queue (this just returns if there are no further jobs)
$self->_accept_or_skip_next_job_in_queue($reason);
if (!$self->_accept_or_skip_next_job_in_queue($self->check_availability ? $reason : WORKER_SR_BROKEN)) {
# stop if we can not accept/skip the next job (e.g. because there's no further job) if that's configured
$self->stop if $self->settings->global_settings->{TERMINATE_AFTER_JOBS_DONE};
}
}
# FIXME: Avoid so much elsif like in CommandHandler.pm.
}
Expand Down
2 changes: 2 additions & 0 deletions lib/OpenQA/Worker/Job.pm
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ sub accept {
sub {
$self->_set_status(accepted => {});
});
return 1;
}

sub _compute_max_job_time {
Expand Down Expand Up @@ -313,6 +314,7 @@ sub skip {
$reason //= 'skipped';
$self->_set_status(stopping => {reason => $reason});
$self->_stop_step_5_finalize($reason, {result => OpenQA::Jobs::Constants::SKIPPED});
return 1;
}

sub stop {
Expand Down
1 change: 1 addition & 0 deletions lib/OpenQA/Worker/Settings.pm
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sub new {

# read global settings from environment variables
$global_settings{LOG_DIR} = $ENV{OPENQA_WORKER_LOGDIR} if $ENV{OPENQA_WORKER_LOGDIR};
$global_settings{TERMINATE_AFTER_JOBS_DONE} //= $ENV{OPENQA_WORKER_TERMINATE_AFTER_JOBS_DONE};

# read global settings specified via CLI arguments
$global_settings{LOG_LEVEL} = 'debug' if $cli_options->{verbose};
Expand Down
30 changes: 23 additions & 7 deletions t/24-worker-overall.t
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ $ENV{OPENQA_LOGFILE} = undef;
my ($self) = @_;
$self->is_skipped(1);
$self->emit(status_changed => {job => $self, status => 'stopped', reason => 'skipped'});
return 1;
}
sub accept {
my ($self) = @_;
$self->is_accepted(1);
$self->emit(status_changed => {job => $self, status => 'accepted'});
return 1;
}
sub start { }
}
Expand Down Expand Up @@ -250,7 +252,7 @@ subtest 'accept or skip next job' => sub {
ok($worker->is_busy, 'worker considered busy without current job but pending ones');

# assume the last job failed: all jobs in the queue are expected to be skipped
combined_like { $worker->_accept_or_skip_next_job_in_queue(WORKER_SR_API_FAILURE) }
combined_like { is $worker->_accept_or_skip_next_job_in_queue(WORKER_SR_API_FAILURE), 1, 'jobs skipped' }
qr/Job 0.*finished.*skipped.*Job 1.*finished.*skipped.*Job 2.*finished.*skipped.*Job 3.*finished.*skipped/s,
'skipping logged';
is($_->is_accepted, 0, 'job ' . $_->id . ' not accepted') for @jobs;
Expand All @@ -259,6 +261,7 @@ subtest 'accept or skip next job' => sub {
ok(!$worker->is_busy, 'worker not considered busy anymore');
is_deeply($worker->current_job_ids, [], 'no job IDs remaining');
is_deeply($worker->{_pending_jobs}, [], 'no pending jobs left');
is($worker->_accept_or_skip_next_job_in_queue(WORKER_SR_API_FAILURE), undef, 'no more jobs to skip/accept');
};
};

Expand All @@ -268,19 +271,21 @@ subtest 'accept or skip next job' => sub {
$worker->{_pending_jobs} = [$jobs[0], [$jobs[1], $jobs[2]], $jobs[3]];

# assume the last job has been completed: accept the next job in the queue
$worker->_accept_or_skip_next_job_in_queue(WORKER_SR_DONE);
is($worker->_accept_or_skip_next_job_in_queue(WORKER_SR_DONE), 1, 'next job accepted');
is($_->is_accepted, 1, 'job ' . $_->id . ' accepted') for ($jobs[0]);
is($_->is_skipped, 0, 'job ' . $_->id . ' not skipped') for @jobs;
is_deeply($worker->{_pending_jobs}, [[$jobs[1], $jobs[2]], $jobs[3]], 'next jobs still pending');

# assume the last job has been completed: accept the next job in the queue
$worker->_accept_or_skip_next_job_in_queue(WORKER_SR_DONE);
is($worker->_accept_or_skip_next_job_in_queue(WORKER_SR_DONE), 1, 'next job accepted');
is($_->is_accepted, 1, 'job ' . $_->id . ' accepted') for ($jobs[1]);
is($_->is_skipped, 0, 'job ' . $_->id . ' not skipped') for @jobs;
is_deeply($worker->{_pending_jobs}, [[$jobs[2]], $jobs[3]], 'next jobs still pending');

# assme the last job (job 0) failed: only the current sub queue (containing job 2) is skipped
combined_like { $worker->_accept_or_skip_next_job_in_queue(WORKER_SR_API_FAILURE) }
combined_like {
is $worker->_accept_or_skip_next_job_in_queue(WORKER_SR_API_FAILURE), 1, 'jobs skipped/accepted'
}
qr/Job 2.*finished.*skipped/s, 'skipping logged';
is($_->is_accepted, 0, 'job ' . $_->id . ' not accepted') for ($jobs[2]);
is($_->is_skipped, 1, 'job ' . $_->id . ' skipped') for ($jobs[2]);
Expand Down Expand Up @@ -508,9 +513,11 @@ subtest 'handle client status changes' => sub {

subtest 'handle job status changes' => sub {
# mock cleanup
my $worker_mock = Test::MockModule->new('OpenQA::Worker');
my $cleanup_called = 0;
my $worker_mock = Test::MockModule->new('OpenQA::Worker');
my ($cleanup_called, $stop_called) = (0, 0);
$worker_mock->redefine(_clean_pool_directory => sub { $cleanup_called = 1; });
$worker_mock->redefine(stop => sub { $stop_called = 1; });


# mock accepting and starting job
my $job_mock = Test::MockModule->new('OpenQA::Worker::Job');
Expand Down Expand Up @@ -560,6 +567,7 @@ subtest 'handle job status changes' => sub {
}
qr/some error message.*Job 42 from some-host finished - reason: test/s, 'status logged';
is($cleanup_called, 0, 'pool directory not cleaned up');
is($stop_called, 0, 'worker not stopped');
is($worker->current_job, undef, 'current job unassigned');
is($worker->current_webui_host, undef, 'current web UI host unassigned');

Expand All @@ -568,14 +576,22 @@ subtest 'handle job status changes' => sub {
$worker->check_availability;
is($cleanup_called, 0, 'pool directory not cleaned up within periodic availability check');

# stop job without error message and with cleanup enabled
# stop job without error message and with cleanup and TERMINATE_AFTER_JOBS_DONE enabled
$worker->current_job($fake_job);
$worker->current_webui_host('some-host');
$worker->settings->global_settings->{TERMINATE_AFTER_JOBS_DONE} = 1;
$worker->{_pending_jobs} = [$fake_job]; # assume there's another job in the queue
combined_like {
$worker->_handle_job_status_changed($fake_job, {status => 'stopped', reason => 'another test'});
}
qr/Job 42 from some-host finished - reason: another/, 'status logged';
is($cleanup_called, 1, 'pool directory cleaned up after job finished');
is($stop_called, 0, 'worker not stopped due to the other job added to queue');
combined_like {
$worker->_handle_job_status_changed($fake_job, {status => 'stopped', reason => 'yet another test'});
}
qr/Job 42 from some-host finished - reason: yet another/, 'status of 2nd job logged';
is($stop_called, 1, 'worker stopped after no more jobs left in the queue');

subtest 'availability recomputed' => sub {
# pretend QEMU is still running
Expand Down
8 changes: 7 additions & 1 deletion t/24-worker-settings.t
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use OpenQA::Worker::Settings;
use OpenQA::Worker::App;
use Test::MockModule;

$ENV{OPENQA_CONFIG} = "$FindBin::Bin/data/24-worker-settings";
$ENV{OPENQA_CONFIG} = "$FindBin::Bin/data/24-worker-settings";
$ENV{OPENQA_WORKER_TERMINATE_AFTER_JOBS_DONE} = 1;

my $settings = OpenQA::Worker::Settings->new;

Expand All @@ -36,6 +37,7 @@ is_deeply(
LOG_DIR => 'log/dir',
RETRY_DELAY => 5,
RETRY_DELAY_IF_WEBUI_BUSY => 60,
TERMINATE_AFTER_JOBS_DONE => 1,
},
'global settings, spaces trimmed'
) or diag explain $settings->global_settings;
Expand All @@ -59,6 +61,8 @@ is_deeply(
'web UI host specific settings'
) or diag explain $settings->webui_host_specific_settings;

delete $ENV{OPENQA_WORKER_TERMINATE_AFTER_JOBS_DONE};

subtest 'apply settings to app' => sub {
my ($setup_log_called, $setup_log_app);
my $mock = Test::MockModule->new('OpenQA::Worker::Settings');
Expand Down Expand Up @@ -87,6 +91,7 @@ subtest 'instance-specific settings' => sub {
LOG_DIR => 'log/dir',
RETRY_DELAY => 5,
RETRY_DELAY_IF_WEBUI_BUSY => 60,
TERMINATE_AFTER_JOBS_DONE => undef,
},
'global settings (instance 1)'
) or diag explain $settings1->global_settings;
Expand All @@ -102,6 +107,7 @@ subtest 'instance-specific settings' => sub {
FOO => 'bar',
RETRY_DELAY => 10,
RETRY_DELAY_IF_WEBUI_BUSY => 120,
TERMINATE_AFTER_JOBS_DONE => undef,
},
'global settings (instance 2)'
) or diag explain $settings2->global_settings;
Expand Down

0 comments on commit 3a581bf

Please sign in to comment.