Skip to content

Commit

Permalink
[backend] implement load forwarding to the master dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mlschroe committed Dec 2, 2013
1 parent 5bbc19a commit af1af83
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 2 deletions.
87 changes: 85 additions & 2 deletions src/backend/bs_dispatch
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ my %constraintscache;

my %lastbuild; # last time a job was build in that prpa

my %masterdispatched; # we masterdispatched those, prpa => [ starttime, ... ]

sub assignjob {
my ($job, $idlename, $arch) = @_;
local *F;
Expand Down Expand Up @@ -218,7 +220,10 @@ sub assignjob {
$worker->{'job'} = $job;
$worker->{'jobid'} = $jobid;
$worker->{'arch'} = $arch;
$worker->{'reposerver'} = $info->{'reposerver'} if $info->{'masterdispatched'};
if ($info->{'masterdispatched'}) {
$worker->{'reposerver'} = $info->{'reposerver'};
push @{$masterdispatched{"$info->{'project'}/$info->{'repository'}/$info->{'arch'}"}}, $jobstatus->{'starttime'};
}
mkdir_p("$workersdir/building");
writexml("$workersdir/building/.$idlename", "$workersdir/building/$idlename", $worker, $BSXML::worker);

Expand Down Expand Up @@ -473,6 +478,7 @@ sub oracle {

my %syncedjobs;
my %lastmastersync;
my $lastmdloadsync;

#
# XXX: should not be here! Use src server instead!
Expand All @@ -498,6 +504,9 @@ sub dispatchslave {
my $synced = 0;
my @archs = grep {!/^\./} sort(ls($jobsdir));
my %projid2repocache;

my %building;
my %building_time;
for my $arch (@archs) {
next unless -d "$jobsdir/$arch";
my $now = time();
Expand Down Expand Up @@ -534,6 +543,7 @@ sub dispatchslave {
my @jobs = sort(ls($jobsdir));
my @b = grep {!/^\./} ls("$jobsdir/$arch");
my %locked = map {$_ => 1} grep {/:status$/} @b;
my %notlocked = map {$_ => 1} grep {!$locked{$_}} @b;
my %seen;
for my $job (grep {!/:(?:dir|status|new)$/} @b) {
next if $locked{"$job:status"};
Expand Down Expand Up @@ -585,6 +595,47 @@ sub dispatchslave {
delete $syncedjobs{$arch}->{$job};
}
print "$arch: added $added, deleted $deleted\n" if $added || $deleted;
# adapt the load
my $load = {};
for my $job (keys %locked) {
my $jn = $job;
$jn =~ s/:status$//;
next unless $notlocked{$jn};
$jn =~ s/-[0-9a-f]{32}$//s;
my ($projid, $repoid, $packid) = split('::', $jn);
if (!defined($packid)) {
my $info = readxml("$jobsdir/$arch/$job", $BSXML::buildinfo, 1);
next unless $info;
($projid, $repoid, $packid) = ($info->{'project'}, $info->{'repository'}, $info->{'package'});
next unless defined $packid;
}
my $prpa = "$projid/$repoid/$arch";
$building{$prpa} ||= 0;
$building{$prpa} += 1;
$building_time{$prpa} = $now;
}
}
# upload the mdload from time to time
my $now = time();
if (!$lastmdloadsync || $lastmdloadsync + 120 < $now) {
print "uploading load to master dispatcher\n";
$lastmdloadsync = $now;
my $load = BSUtil::retrieve("$jobsdir/load", 1) || {};
for my $prpa (keys %$load) {
$load->{$prpa}->[2] = $building_time{$prpa} || $now;
$load->{$prpa}->[3] = $building{$prpa} || 0;
}
eval {
BSRPC::rpc({
'uri' => "$BSConfig::masterdispatcher/jobs/_mdload",
'request' => 'POST',
'data' => "pst0".Storable::nfreeze($load),
'timeout' => 60,
}, undef);
};
if ($@) {
warn($@);
}
}
return $synced;
}
Expand Down Expand Up @@ -615,7 +666,7 @@ sub forwardevents {
} elsif ($ev->{'type'} eq 'badhost') {
print "badhost event: $ev->{'project'}/$ev->{'package'}/$ev->{'arch'}/$ev->{'job'}\n";
if ($BSConfig::masterdispatcher && $BSConfig::masterdispatcher ne $BSConfig::reposerver) {
sendeventtoserver($BSConfig::masterdispatcher, $ev);
sendeventtoserver($BSConfig::masterdispatcher, $ev) unless $ev->{'package'} eq '_deltas'; # XXX
} else {
$badhost{"$ev->{'project'}/$ev->{'package'}/$ev->{'arch'}/$ev->{'job'}"} = time();
}
Expand Down Expand Up @@ -729,6 +780,10 @@ while (1) {
}
}
close F;
my $prunetime = time() - 50 * 86400;
for (keys %$load) {
delete $load->{$_} if $load->{$_}->[0] < $prunetime;
}
BSUtil::store("$jobsdir/load.new", "$jobsdir/load", $load);
}
}
Expand Down Expand Up @@ -779,6 +834,34 @@ while (1) {
$load->{$prpa} = $ll;
$lastbuild{$prpa} = $l->[0];
}

# adapt load for masterdispatched prpas
my $mdload = BSUtil::retrieve("$jobsdir/mdload", 1) || {};
for my $prpa (sort keys %$mdload) {
my $l = $mdload->{$prpa};
my $ll = $l->[1];
$ll *= exp($decay * ($now - $l->[0])) if $now > $l->[0];
$load->{$prpa} = $ll;
$lastbuild{$prpa} = $l->[0];
if ($l->[3]) {
$load->{$prpa} += $l->[3];
$lastbuild{$prpa} = $l->[2];
}
}
if (%masterdispatched) {
for my $prpa (sort keys %masterdispatched) {
my $md = $masterdispatched{$prpa};
if ($mdload->{$prpa}) {
shift(@$md) while @$md && $md->[0] < $mdload->{$prpa}->[2];
}
if (@$md) {
$load->{$prpa} += @$md;
$lastbuild{$prpa} = $now;
} else {
delete $masterdispatched{$prpa};
}
}
}

my @idle = grep {!/^\./} ls("$workersdir/idle");
my %idlearch;
Expand Down
31 changes: 31 additions & 0 deletions src/backend/bs_repserver
Original file line number Diff line number Diff line change
Expand Up @@ -3740,6 +3740,36 @@ sub deljob {
return $BSStdServer::return_ok;
}

sub postmdload {
my ($cgi) = @_;

my $newdata = BSServer::read_data(200000000);
my $newmdload = Storable::thaw(substr($newdata, 4));
die("no data\n") unless $newmdload;
return $BSStdServer::return_ok unless %$newmdload;
local *F;
BSUtil::lockopen(\*F, '+>>', "$jobsdir/mdload");
my $oldmdload = {};
if (-s "$jobsdir/mdload") {
$oldmdload = BSUtil::retrieve("$jobsdir/mdload");
}
for (keys %$newmdload) {
if (!$oldmdload->{$_} || $oldmdload->{$_}->[0] < $newmdload->{$_}->[0]) {
$oldmdload->{$_} = $newmdload->{$_};
} elsif ($newmdload->{$_}->[3] && $oldmdload->{$_}->[3] < $newmdload->{$_}->[3]) {
($oldmdload->{$_}->[2], $oldmdload->{$_}->[3]) = ($newmdload->{$_}->[2], $newmdload->{$_}->[3]);
}
}
my $prunetime = time() - 50 * 86400;
for (keys %$oldmdload) {
my $l = $oldmdload->{$_};
delete $oldmdload->{$_} if $l->[0] < $prunetime && $l->[2] < $prunetime;
}
BSUtil::store("$jobsdir/.mdload.$$", "$jobsdir/mdload", $oldmdload);
close F;
return $BSStdServer::return_ok;
}

sub idleworkerjob {
my ($cgi, $arch, $job) = @_;
local *F;
Expand Down Expand Up @@ -3851,6 +3881,7 @@ my $dispatches = [

# jobs
'/jobs' => \&listjobarchs,
'POST:/jobs/_mdload' => \&postmdload,
'/jobs/$arch' => \&listjobs,
'PUT:/jobs/$arch/$job' => \&addjob,
'POST:/jobs/$arch/$job cmd=idleworker workerid: jobid:md5?' => \&idleworkerjob,
Expand Down

0 comments on commit af1af83

Please sign in to comment.