Skip to content

Commit

Permalink
Add new job state (#848)
Browse files Browse the repository at this point in the history
* Avoid dead job detection for jobs uploading large files

When the job finishes, we set the job state to 'uploading' so the webui
can differ between those jobs and jobs that are supposed to update its
status every couple of seconds. This way we can have different timeouts
for these

* Give uploading jobs more time before considered dead

For this I reworked the dead worker detection not to work on timers, but
query every couple of minutes (I went from 20 minutes to 2 minutes not
to make this such an unlikely event) if the worker was updated in time.
And the timeout depends on the state of the job. 10 seconds for running,
1000 for uploading

* Prevent timeouts during uploading of files

by disabling keep alive for our webapi user agent

* Adopt the test case for dead worker detection

There is no timer trickery required anymore

* Send KILL to worker process group when not reacting to TERM

* Use a timeout for all jobs in execution

Make UPLOADING the exception. We used to ignore WAITING jobs for
dead workers, but this sounds unfair - these workers can die too
  • Loading branch information
coolo committed Sep 8, 2016
1 parent 9cf96db commit 3fe8be9
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 57 deletions.
2 changes: 1 addition & 1 deletion assets/stylesheets/openqa.scss
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ div.flags {
color: $color-state-cancelled;
}

.state_running.fa {
.state_running.fa, .state_waiting.fa, .state_uploading.fa {
color: $color-state-running;
}

Expand Down
16 changes: 12 additions & 4 deletions lib/OpenQA/Schema/Result/Jobs.pm
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ use constant {
CANCELLED => 'cancelled',
WAITING => 'waiting',
DONE => 'done',
UPLOADING => 'uploading',
# OBSOLETED => 'obsoleted',
};
use constant STATES => (SCHEDULED, RUNNING, CANCELLED, WAITING, DONE);
use constant PENDING_STATES => (SCHEDULED, RUNNING, WAITING);
use constant EXECUTION_STATES => (RUNNING, WAITING);
use constant FINAL_STATES => (DONE, CANCELLED);
use constant STATES => (SCHEDULED, RUNNING, CANCELLED, WAITING, DONE, UPLOADING);
use constant PENDING_STATES => (SCHEDULED, RUNNING, WAITING, UPLOADING);
use constant EXECUTION_STATES => (RUNNING, WAITING, UPLOADING);
use constant FINAL_STATES => (DONE, CANCELLED);

# Results
use constant {
Expand Down Expand Up @@ -1000,6 +1001,13 @@ sub update_status {

my $ret = {result => 1};

# that is a bit of an abuse as we don't have anything of the
# other payload
if ($status->{uploading}) {
$self->update({state => UPLOADING});
return $ret;
}

$self->append_log($status->{log});
# delete from the hash so it becomes dumpable for debugging
my $screen = delete $status->{screen};
Expand Down
2 changes: 1 addition & 1 deletion lib/OpenQA/WebAPI.pm
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ sub startup {
if ($ENV{OPENQA_SQL_DEBUG} // $self->config->{logging}->{sql_debug} // 'false' eq 'true') {
# avoid enabling the SQL debug unless we really want to see it
# it's rather expensive
db_profiler::enable_sql_debugging($self);
db_profiler::enable_sql_debugging($self, $self->schema);
}

unless ($ENV{MOJO_TMPDIR}) {
Expand Down
3 changes: 2 additions & 1 deletion lib/OpenQA/WebAPI/Controller/Main.pm
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ sub _group_result {
{
next; # ignore
}
if ($job->state eq OpenQA::Schema::Result::Jobs::SCHEDULED || $job->state eq OpenQA::Schema::Result::Jobs::RUNNING) {
my $state = $job->state;
if (grep { /$state/ } (OpenQA::Schema::Result::Jobs::EXECUTION_STATES)) {
$jr{inprogress}++;
next;
}
Expand Down
72 changes: 43 additions & 29 deletions lib/OpenQA/WebSockets/Server.pm
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2014 SUSE Linux Products GmbH
# Copyright (C) 2014-2016 SUSE LLC
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -21,6 +21,8 @@ use OpenQA::IPC;
use OpenQA::Utils qw/log_debug log_warning/;
use OpenQA::Schema;

use db_profiler;

require Exporter;
our (@ISA, @EXPORT, @EXPORT_OK);

Expand Down Expand Up @@ -202,36 +204,50 @@ sub _message {
sub _get_dead_worker_jobs {
my ($threshold) = @_;

my $schema = OpenQA::Schema::connect_db;

my $dtf = $schema->storage->datetime_parser;
my $dt = DateTime->from_epoch(epoch => time() - $threshold, time_zone => 'UTC');

my %cond = (
state => OpenQA::Schema::Result::Jobs::RUNNING,
'worker.t_updated' => {'<' => $threshold},
state => [OpenQA::Schema::Result::Jobs::EXECUTION_STATES],
'worker.t_updated' => {'<' => $dtf->format_datetime($dt)},
);
my %attrs = (join => 'worker',);

my $schema = OpenQA::Schema::connect_db;
return $schema->resultset("Jobs")->search(\%cond, \%attrs);
}

sub _is_job_considered_dead {
my ($job) = @_;

# much bigger timeout for uploading jobs
if ($job->state eq OpenQA::Schema::Result::Jobs::UPLOADING) {
my $delta = DateTime->now()->epoch() - $job->worker->t_updated->epoch();
return if $delta > 1000;
}

# default timeout for the rest
return 1;
}

# Running as recurring timer, each 20minutes check if worker with job has been updated in last 10s
sub _workers_checker {
my $dt = DateTime->now(time_zone => 'UTC');
my $threshold = join ' ', $dt->ymd, $dt->hms;

Mojo::IOLoop->timer(
10 => sub {
my $dead_jobs = _get_dead_worker_jobs($threshold);
my $ipc = OpenQA::IPC->ipc;
for my $job ($dead_jobs->all) {
$job->done(result => OpenQA::Schema::Result::Jobs::INCOMPLETE);
my $res = $ipc->scheduler('job_duplicate', {jobid => $job->id});
if ($res) {
log_warning(sprintf('dead job %d aborted and duplicated', $job->id));
}
else {
log_warning(sprintf('dead job %d aborted as incomplete', $job->id));
}
}
});

my $dead_jobs = _get_dead_worker_jobs(10);
my $ipc = OpenQA::IPC->ipc;
for my $job ($dead_jobs->all) {
next unless _is_job_considered_dead($job);

$job->done(result => OpenQA::Schema::Result::Jobs::INCOMPLETE);
my $res = $ipc->scheduler('job_duplicate', {jobid => $job->id});
if ($res) {
log_warning(sprintf('dead job %d aborted and duplicated', $job->id));
}
else {
log_warning(sprintf('dead job %d aborted as incomplete', $job->id));
}
}
}

# Mojolicious startup
Expand All @@ -247,11 +263,9 @@ sub setup {
# if ($logfile && $self->config->{logging}->{level}) {
# $self->log->level($self->config->{logging}->{level});
# }
# if ($ENV{OPENQA_SQL_DEBUG} // $self->config->{logging}->{sql_debug} // 'false' eq 'true') {
# # avoid enabling the SQL debug unless we really want to see it
# # it's rather expensive
# db_profiler::enable_sql_debugging($self);
# }

# db_profiler::enable_sql_debugging(app, OpenQA::Schema::connect_db);
# app->log->level('debug');

# use port one higher than WebAPI
my $port = 9527;
Expand All @@ -265,8 +279,8 @@ sub setup {
# no cookies for worker, no secrets to protect
app->secrets(['nosecretshere']);

# start worker checker - check workers each 20minutes
Mojo::IOLoop->recurring(1200 => \&_workers_checker);
# start worker checker - check workers each 2 minutes
Mojo::IOLoop->recurring(120 => \&_workers_checker);

return Mojo::Server::Daemon->new(app => app, listen => ["http://localhost:$port"]);
}
Expand Down
4 changes: 4 additions & 0 deletions lib/OpenQA/Worker/Common.pm
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ sub api_init {
apikey => $apikey,
apisecret => $apisecret
);
# disable keep alive to avoid time outs in strange places - we only reach the
# webapi once in a while so take the price of reopening the connection every time
# we do
$ua->max_connections(0);

unless ($ua->apikey && $ua->apisecret) {
unless ($apikey && $apisecret) {
Expand Down
4 changes: 3 additions & 1 deletion lib/OpenQA/Worker/Engines/isotovideo.pm
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ sub engine_workit($) {
die "failed to fork: $!\n" unless defined $child;

unless ($child) {
# create new process group
setpgrp(0, 0);
$ENV{TMPDIR} = $tmpdir;
printf "$$: WORKING %d\n", $job->{id};
if (open(my $log, '>', "autoinst-log.txt")) {
Expand Down Expand Up @@ -182,7 +184,7 @@ sub engine_check {
# check if the worker is still running
my $pid = waitpid($workerpid, WNOHANG);
if ($verbose) {
printf "waitpid %d returned %d\n", $workerpid, $pid;
printf "waitpid %d returned %d with status $?\n", $workerpid, $pid;
}
if ($pid == -1 && $!{ECHILD}) {
warn "we lost our child\n";
Expand Down
50 changes: 39 additions & 11 deletions lib/OpenQA/Worker/Jobs.pm
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use Fcntl;
use MIME::Base64;
use File::Basename qw/basename/;

use POSIX ':sys_wait_h';

use base qw/Exporter/;
our @EXPORT = qw/start_job stop_job check_job backend_running/;

Expand All @@ -52,15 +54,30 @@ sub _kill_worker($) {

return unless $worker->{pid};

warn "killing $worker->{pid}\n";
kill(SIGTERM, $worker->{pid});

# don't leave here before the worker is dead
my $pid = waitpid($worker->{pid}, 0);
if ($pid == -1) {
warn "waitpid returned error: $!\n";
if (kill('TERM', $worker->{pid})) {
warn "killed $worker->{pid}\n";
my $deadline = time + 40;
# don't leave here before the worker is dead
while ($worker) {
my $pid = waitpid($worker->{pid}, WNOHANG);
if ($pid == -1) {
warn "waitpid returned error: $!\n";
}
elsif ($pid == 0) {
sleep(.5);
if (time > $deadline) {
# if still running after the deadline, try harder
# to kill the worker
kill('KILL', -$worker->{pid});
# now loop again
$deadline = time + 20;
}
}
else {
last;
}
}
}

$worker = undef;
}

Expand Down Expand Up @@ -105,8 +122,6 @@ sub stop_job($;$) {
remove_timer('check_backend');
remove_timer('job_timeout');

_kill_worker($worker);

# XXX: we need to wait if there is an update_status in progress.
# we should have an event emitter that subscribes to update_status done
my $stop_job_check_status;
Expand Down Expand Up @@ -196,7 +211,20 @@ sub upload {
sub _stop_job($;$) {
my ($aborted, $job_id) = @_;

print "stop_job 2nd half\n" if $verbose;
# now tell the webui that we're about to finish, but the following
# process of killing the backend process and checksums uploads and
# checksums again can take a long while, so the webui needs to know
print "stop_job 2nd part\n" if $verbose;

# the update_status timers and such are gone by now (1st part), so we're
# basically "single threaded" and can block

my $status = {uploading => 1};
api_call('post', "jobs/$job_id/status", undef, {status => $status});

_kill_worker($worker);

print "stop_job 3rd part\n" if $verbose;

my $name = $job->{settings}->{NAME};
$aborted ||= 'done';
Expand Down
6 changes: 3 additions & 3 deletions lib/db_profiler.pm
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ sub query_end {
}


sub enable_sql_debugging($) {
my ($app) = @_;
my $storage = $app->schema->storage;
sub enable_sql_debugging {
my ($app, $schema) = @_;
my $storage = $schema->storage;
$storage->debugobj(new db_profiler());
$storage->debugfh(MojoDebugHandle->new($app));
$storage->debug(1);
Expand Down
12 changes: 6 additions & 6 deletions t/20-workers-ws.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ BEGIN {

use strict;
use warnings;
use DateTime;
use Test::More;
use Test::Warnings;
use OpenQA::IPC;
Expand Down Expand Up @@ -50,13 +51,12 @@ sub _check_job_incomplete {

subtest 'worker with job and not updated in last 10s is considered dead' => sub {
_check_job_running($_) for (99961, 99963);
# wait a second to be sure we fit into 10s check
sleep 1;
# now we can start the workers check
OpenQA::WebSockets::Server::_workers_checker();
# and wait for timer to kick in
Mojo::IOLoop->one_tick;
# move the updated timestamp of the workers to avoid sleeping
my $dtf = $schema->storage->datetime_parser;
my $dt = DateTime->from_epoch(epoch => time() - 20, time_zone => 'UTC');

$schema->resultset('Workers')->update_all({t_updated => $dtf->format_datetime($dt)});
OpenQA::WebSockets::Server::_workers_checker();
_check_job_incomplete($_) for (99961, 99963);
};

Expand Down

0 comments on commit 3fe8be9

Please sign in to comment.