Skip to content

Commit

Permalink
[backend] implement aggregating of containers
Browse files Browse the repository at this point in the history
  • Loading branch information
mlschroe committed Sep 26, 2018
1 parent d71bd8e commit d6c5c39
Showing 1 changed file with 143 additions and 77 deletions.
220 changes: 143 additions & 77 deletions src/backend/BSSched/BuildJob/Aggregate.pm
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use warnings;

use Storable; # for dclone
use Digest::MD5 ();
use JSON::XS (); # for containerinfo reading/writing

use BSUtil;
use BSXML;
Expand All @@ -30,10 +31,10 @@ use BSConfiguration; # for $BSConfig::sign
use BSVerify; # for verify_nevraquery
use Build; # for query

my @binsufs = qw{rpm deb pkg.tar.gz pkg.tar.xz AppImage};
my @binsufs = qw{rpm deb pkg.tar.gz pkg.tar.xz};
my $binsufsre = join('|', map {"\Q$_\E"} @binsufs);

my @binsufs_sign = qw{rpm pkg.tar.gz pkg.tar.xz AppImage};
my @binsufs_sign = qw{rpm pkg.tar.gz pkg.tar.xz};
my $binsufsre_sign = join('|', map {"\Q$_\E"} @binsufs_sign);

=head1 NAME
Expand Down Expand Up @@ -215,21 +216,33 @@ sub check {
}
for my $arepoid (@arepoids) {
for my $apackid (@apackids) {
next if $aprojid eq $projid && $arepoid eq $repoid && $apackid eq $packid;
my $m = '';
my $havecontainer;
if ($remoteprojs->{$aprojid}) {
my $bininfo = ($gbininfos{"$aprojid/$arepoid/$myarch"} || {})->{$apackid} || {};
for my $b (sort {$a->{'filename'} cmp $b->{'filename'}} values %$bininfo) {
next unless $b->{'filename'} && ($b->{'filename'} eq 'updateinfo.xml' || $b->{'filename'} =~ /\.(?:$binsufsre)$/);
$m .= $b->{'hdrmd5'} || $b->{'md5sum'} || '';
for my $bin (sort {$a->{'filename'} cmp $b->{'filename'}} values %$bininfo) {
my $filename = $bin->{'filename'};
next unless $filename;
next unless $filename eq 'updateinfo.xml' || $filename =~ /\.(?:$binsufsre)$/ || $filename =~ /\.obsbinlnk$/;
$havecontainer = 1 if $filename =~ /\.obsbinlnk$/;
$m .= $bin->{'hdrmd5'} || $bin->{'md5sum'} || '';
}
} else {
next if $aprojid eq $projid && $arepoid eq $repoid && $apackid eq $packid;
my $d = "$reporoot/$aprojid/$arepoid/$myarch/$apackid";
my @d = grep {$_ eq 'updateinfo.xml' || /\.(?:$binsufsre)$/} ls($d);
for my $b (sort @d) {
my @s = stat("$d/$b");
next unless @s;
$m .= "$b\0$s[9]/$s[7]/$s[1]\0";
for my $filename (sort @d) {
next unless $filename eq 'updateinfo.xml' || $filename =~ /\.(?:$binsufsre)$/ || $filename =~ /\.obsbinlnk$/;
$havecontainer = 1 if $filename =~ /\.obsbinlnk$/;
my @s = stat("$d/$filename");
$m .= "$filename\0$s[9]/$s[7]/$s[1]\0" if @s;
}
}
if ($havecontainer) {
# add state of tag
my $bconf = $ctx->{'conf'};
if ($bconf->{'substitute'}->{"aggregate-container-add-tag:$packid"}) {
$m .= "\0\0aggregate-container-add-tag\0".join("\0", @{$bconf->{'substitute'}->{"aggregate-container-add-tag:$packid"}});
}
}
$m = Digest::MD5::md5_hex($m)." $aprojid/$arepoid/$myarch/$apackid";
Expand Down Expand Up @@ -295,6 +308,7 @@ sub build {
$abinfilter = { map {$_ => 1} @{$aggregate->{'binary'}} } if $aggregate->{'binary'};
for my $arepoid (reverse @arepoids) {
for my $apackid (@apackids) {
next if $aprojid eq $projid && $arepoid eq $repoid && $apackid eq $packid;
my @d;
my $cpio;
my $nosource = exists($aggregate->{'nosources'}) ? 1 : 0;
Expand Down Expand Up @@ -325,53 +339,116 @@ sub build {
$gctx->{'retryevents'}->addretryevent({'type' => 'repository', 'project' => $aprojid, 'repository' => $arepoid, 'arch' => $myarch}) if BSSched::RPC::is_transient_error($error);
last;
}
for my $bin (@{$cpio || []}) {
$updateinfo = "$jobdatadir/$bin->{'name'}" if $bin->{'name'} eq 'upload:updateinfo.xml';
push @d, "$jobdatadir/$bin->{'name'}";
}
@d = map {"$jobdatadir/$_->{'name'}"} @{$cpio || []};
$nosource = 1 if -e "$jobdatadir/upload:.nosourceaccess";
} else {
next if $aprojid eq $projid && $arepoid eq $repoid && $apackid eq $packid;
my $d = "$reporoot/$aprojid/$arepoid/$myarch/$apackid";
$updateinfo = "$d/updateinfo.xml" if -f "$d/updateinfo.xml";
@d = grep {/\.(?:$binsufsre)$/} ls($d);
@d = map {"$d/$_"} sort(@d);
$nosource = 1 if -e "$d/.nosourceaccess";
my $dir = "$reporoot/$aprojid/$arepoid/$myarch/$apackid";
@d = map {"$dir/$_"} sort(ls($dir));
$nosource = 1 if -e "$dir/.nosourceaccess";
}
my $ajobrepo = bins2repo(@d);

my $copysources;
for my $abin (sort keys %$ajobrepo) {
my $r = $ajobrepo->{$abin};
next unless $r->{'source'};
my @sources;
for my $d (@d) {
my @s = stat($d);
next unless @s;
my $filename = $d;
$filename =~ s/.*\///;
$filename =~ s/^upload:// if $cpio;
if ($filename eq 'updateinfo.xml') {
next if $abinfilter && !$abinfilter->{$filename};
next if $jobbins{$filename}; # first one wins
$jobbins{$filename} = 1;
BSUtil::cp($d, "$jobdatadir/$filename");
next;
}
if ($filename =~ /\.obsbinlnk$/) {
my $r = BSUtil::retrieve($d, 1);
next unless $r;
next if $abinfilter && !$abinfilter->{$r->{'name'}};
next if $jobbins{$filename}; # first one wins
next unless $r->{'name'} =~ /^container:/;

my $dir = $d;
$dir =~ s/\/[^\/]*$//;
my $containerinfofile = $filename;
$containerinfofile =~ s/\.obsbinlnk$/\.containerinfo/;
next if $jobbins{$containerinfofile}; # oops?
my $prefix = $cpio ? 'upload:' : '';
my $containerinfo = readcontainerinfo($dir, "$prefix$containerinfofile");
next unless $containerinfo;
for my $blobid (@{$containerinfo->{'tar_blobids'} || []}) {
if (-e "$dir/${prefix}_blob.$blobid") {
next if $jobbins{"_blob.$blobid"}; # already have that blob
link("$dir/${prefix}_blob.$blobid", "$jobdatadir/_blob.$blobid") || die("link $dir/${prefix}_blob.$blobid $jobdatadir/_blob.$blobid: $!\n");
$jobbins{"_blob.$blobid"} = 1;
}
}
my $containerfile = $containerinfo->{'file'};
# do we need to copy the container file?
if (!$containerinfo->{'tar_blobids'} || grep {!$jobbins{"_blob.$_"}} @{$containerinfo->{'tar_blobids'}}) {
if (-e "$dir/$prefix$containerfile") {
BSUtil::cp("$dir/$prefix$containerfile", "$jobdatadir/$containerfile");
$jobbins{$containerfile} = 1;
}
if (-e "$dir/$prefix$containerfile.sha256") {
BSUtil::cp("$dir/$prefix$containerfile.sha256", "$jobdatadir/$containerfile.sha256");
$jobbins{"$containerinfofile.sha256"} = 1;
}
}
# hack to add a container tag with the attribute
my $bconf = $ctx->{'conf'};
if ($bconf->{'substitute'}->{"aggregate-container-add-tag:$packid"}) {
my @regtags = @{$bconf->{'substitute'}->{"aggregate-container-add-tag:$packid"}};
for my $tag (@regtags) {
$tag = "$tag:latest" unless $tag =~ /^[^\/]+:[^\/]+$/s;
push @{$r->{'provides'}}, "container:$tag";
push @{$containerinfo->{'tags'}}, $tag;
}
}
writecontainerinfo("$jobdatadir/$containerinfofile", undef, $containerinfo);
$jobbins{$containerinfofile} = 1;
$r->{'path'} = "../$packid/$containerfile";
BSUtil::store("$jobdatadir/$filename", undef, $r);
$jobbins{$filename} = 1;
next;
}
next unless $filename =~ /\.(?:$binsufsre)$/;
$filename =~ s/^::import::.*?:://;
my $r;
eval {
$r = Build::query($d, 'evra' => 1);
BSVerify::verify_nevraquery($r) if $r;
$r->{'id'} = "$s[9]/$s[7]/$s[1]";
};
next unless $r;
next if $abinfilter && !$abinfilter->{$r->{'name'}};
if (!$r->{'source'}) {
# this is a source binary
push @sources, [ $d, $r, $filename ];
next;
}
next unless $r->{'source'};
# FIXME: How is debian handling debug packages ?
next if $nosource && ($r->{'name'} =~ /-debug(:?info|source)?$/);
my $basename = $abin;
$basename =~ s/.*\///;
$basename =~ s/^upload:// if $cpio;
$basename =~ s/^::import::.*?:://;
next if $jobbins{$basename}; # first one wins
$jobbins{$basename} = 1;
BSUtil::cp($abin, "$jobdatadir/$basename");
$jobrepo->{"$jobdatadir/$basename"} = $r;
next if $jobbins{$filename}; # first one wins
$jobbins{$filename} = 1;
BSUtil::cp($d, "$jobdatadir/$filename");
$jobrepo->{"$jobdatadir/$filename"} = $r;
$copysources = 1 unless $nosource;
}
if ($updateinfo && !($abinfilter && !$abinfilter->{'updateinfo.xml'})) {
BSUtil::cp($updateinfo, "$jobdatadir/updateinfo.xml");
}
if ($copysources) {
for my $abin (sort keys %$ajobrepo) {
my $r = $ajobrepo->{$abin};
next if $r->{'source'};
my $basename = $abin;
$basename =~ s/.*\///;
$basename =~ s/^upload:// if $cpio;
BSUtil::cp($abin, "$jobdatadir/$basename");
$jobrepo->{"$jobdatadir/$basename"} = $r;
}
}
for my $bin (@{$cpio || []}) {
unlink("$jobdatadir/$bin->{'name'}");
@sources = () unless $copysources;
for my $d (@sources) {
my $r = $d->[1];
my $filename = $d->[2];
$d = $d->[0];
next if $jobbins{$filename}; # first one wins
$jobbins{$filename} = 1;
BSUtil::cp($d, "$jobdatadir/$filename");
$jobrepo->{"$jobdatadir/$filename"} = $r;
}
# delete upload files
unlink("$jobdatadir/$_->{'name'}") for @{$cpio || []};
}
last if $error;
}
Expand All @@ -391,35 +468,6 @@ sub build {
return ('scheduled', $job);
}

=head2 bins2repo - query a list of binaries
TODO: add description
=cut

sub bins2repo {
my (@bins) = @_;

@bins = grep {/\.(?:$binsufsre)$/} @bins;
my $repobins = {};
for my $bin (@bins) {
my @s = stat($bin);
next unless @s;
my $id = "$s[9]/$s[7]/$s[1]";
my $data = Build::query($bin, 'evra' => 1); # need arch
next unless $data;
eval {
BSVerify::verify_nevraquery($data);
};
next if $@;
delete $data->{'disttag'};
$data->{'id'} = $id;
$repobins->{$bin} = $data;
}
return $repobins;
}


=head2 jobfinished - job finished event handler for aggregates
TODO: add description
Expand Down Expand Up @@ -486,4 +534,22 @@ sub jobfinished {
BSSched::BuildJob::patchpackstatus($gctx, $prp, $packid, 'succeeded');
}

sub readcontainerinfo {
my ($dir, $containerinfo) = @_;
return undef unless -e "$dir/$containerinfo";
return undef unless (-s _) < 100000;
my $m = readstr("$dir/$containerinfo");
my $d;
eval { $d = JSON::XS::decode_json($m); };
return undef unless $d && ref($d) eq 'HASH';
return $d;
}

sub writecontainerinfo {
my ($fn, $fnf, $containerinfo) = @_;
my $containerinfo_json = JSON::XS->new->utf8->canonical->pretty->encode($containerinfo);
writestr($fn, $fnf, $containerinfo_json);
}


1;

0 comments on commit d6c5c39

Please sign in to comment.