From 56b36b5be1882f091af5e2e1c110bde470773b60 Mon Sep 17 00:00:00 2001 From: M0ses Date: Tue, 1 Dec 2015 15:22:02 +0100 Subject: [PATCH] [backend] Split Remote handling code into BSSched/Remote.pm --- src/backend/BSSched/Remote.pm | 416 ++++++++++++++++++++++++++++++++++ src/backend/bs_sched | 372 +++--------------------------- 2 files changed, 445 insertions(+), 343 deletions(-) create mode 100644 src/backend/BSSched/Remote.pm diff --git a/src/backend/BSSched/Remote.pm b/src/backend/BSSched/Remote.pm new file mode 100644 index 00000000000..027a122fd54 --- /dev/null +++ b/src/backend/BSSched/Remote.pm @@ -0,0 +1,416 @@ +# Copyright (c) 2015 SUSE LLC +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program (see the file COPYING); if not, write to the +# Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA +# +package BSSched::Remote; + +use strict; +use warnings; + +use BSRPC; +use BSSched::RPC; + +=head2 beginwatchcollection - TODO: add summary + + TODO: add description + +=cut + +sub beginwatchcollection { + my ($gctx) = @_; + %{$gctx->{'watchremote'}} = (); + $gctx->{'watchremoteprojs'} = {}; # tmp +} + +=head2 endwatchcollection - TODO: add summary + + TODO: add description + +=cut + +sub endwatchcollection { + my ($gctx) = @_; + updateremoteprojs($gctx); + delete $gctx->{'watchremoteprojs'}; # clean up +} + +=head2 addwatchremote - register for a possibly remote resource + + input: $type: type of resource (project/package/repository) + $projid: local name of the project + $watch: extra data to match +=cut + +sub addwatchremote { + my ($gctx, $type, $projid, $watch) = @_; + + my $projpacks = $gctx->{'projpacks'}; + return undef if $projpacks->{$projid} && !$projpacks->{$projid}->{'remoteurl'}; + my $proj = remoteprojid($gctx, $projid); + my $watchremoteprojs = $gctx->{'watchremoteprojs'} || {}; + $watchremoteprojs->{$projid} = $proj; + return undef unless $proj; + my $watchremote = $gctx->{'watchremote'}; + if ($proj->{'partition'}) { + $watchremote->{$BSConfig::srcserver}->{"$type/$proj->{'remoteproject'}$watch"} = $projid; + } else { + $watchremote->{$proj->{'remoteurl'}}->{"$type/$proj->{'remoteproject'}$watch"} = $projid; + } + return $proj; +} + +=head2 updateremoteprojs - TODO: add summary + + TODO: add description + +=cut + +sub updateremoteprojs { + my ($gctx) = @_; + + my $remoteprojs = $gctx->{'remoteprojs'}; + my $watchremoteprojs = $gctx->{'watchremoteprojs'}; + for my $projid (keys %$remoteprojs) { + my $r = $watchremoteprojs->{$projid}; + if (!$r) { + delete $remoteprojs->{$projid}; + next; + } + my $or = $remoteprojs->{$projid}; + next if $or && $or->{'partition'}; # XXX how do we update them? + next if $or && $or->{'remoteurl'} eq $r->{'remoteurl'} && $or->{'remoteproject'} eq $r->{'remoteproject'}; + delete $remoteprojs->{$projid}; + } + for my $projid (sort keys %$watchremoteprojs) { + my $r = $watchremoteprojs->{$projid}; + fetchremoteproj($gctx, $r, $projid) if $r; + } +} + +=head2 remoteprojid - TODO: add summary + + TODO: add description + +=cut + +sub remoteprojid { + my ($gctx, $projid) = @_; + my $rsuf = ''; + my $origprojid = $projid; + + my $projpacks = $gctx->{'projpacks'}; + my $remoteprojs = $gctx->{'remoteprojs'}; + return $remoteprojs->{$projid} if $remoteprojs->{$projid} && $remoteprojs->{$projid}->{'partition'}; + my $proj = $projpacks->{$projid}; + if ($proj) { + return undef unless $proj->{'remoteurl'}; + if (!$proj->{'remoteproject'}) { + $proj = { %$proj }; + delete $proj->{'remoteurl'}; + return $proj; + } + return { + 'name' => $projid, + 'root' => $projid, + 'remoteroot' => $proj->{'remoteproject'}, + 'remoteurl' => $proj->{'remoteurl'}, + 'remoteproject' => $proj->{'remoteproject'}, + }; + } + while ($projid =~ /^(.*)(:.*?)$/) { + $projid = $1; + $rsuf = "$2$rsuf"; + $proj = $projpacks->{$projid}; + if ($proj) { + return undef unless $proj->{'remoteurl'}; + if ($proj->{'remoteproject'}) { + $rsuf = "$proj->{'remoteproject'}$rsuf"; + } else { + $rsuf =~ s/^://; + } + return { + 'name' => $origprojid, + 'root' => $projid, + 'remoteroot' => $proj->{'remoteproject'}, + 'remoteurl' => $proj->{'remoteurl'}, + 'remoteproject' => $rsuf, + }; + } + } + return undef; +} + +=head2 maptoremote - TODO: add summary + + TODO: add description + +=cut + +sub maptoremote { + my ($proj, $projid) = @_; + return $projid if $proj->{'partition'}; + return "$proj->{'root'}:$projid" unless $proj->{'remoteroot'}; + return $proj->{'root'} if $projid eq $proj->{'remoteroot'}; + return '_unavailable' if $projid !~ /^\Q$proj->{'remoteroot'}\E:(.*)$/; + return "$proj->{'root'}:$1"; +} + +=head2 fetchremoteproj - TODO: add summary + + TODO: add description + +=cut + +sub fetchremoteproj { + my ($gctx, $proj, $projid) = @_; + return undef unless $proj && $proj->{'remoteurl'} && $proj->{'remoteproject'}; + $projid ||= $proj->{'name'}; + my $remoteprojs = $gctx->{'remoteprojs'}; + return $remoteprojs->{$projid} if exists $remoteprojs->{$projid}; + print "fetching remote project data for $projid\n"; + my $rproj; + my $param = { + 'uri' => "$proj->{'remoteurl'}/source/$proj->{'remoteproject'}/_meta", + 'timeout' => 30, + 'proxy' => $gctx->{'proxy'}, + }; + eval { + $rproj = BSRPC::rpc($param, $BSXML::proj); + }; + if ($@) { + warn($@); + my $error = $@; + $error =~ s/\n$//s; + $rproj = {'error' => $error}; + main::addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if BSSched::RPC::is_transient_error($error); + } + return undef unless $rproj; + for (qw{name root remoteroot remoteurl remoteproject}) { + $rproj->{$_} = $proj->{$_}; + } + # map remote project names to local names + for my $repo (@{$rproj->{'repository'} || []}) { + for my $pathel (@{$repo->{'path'} || []}) { + $pathel->{'project'} = maptoremote($proj, $pathel->{'project'}); + } + for my $pathel (@{$repo->{'releasetarget'} || []}) { + $pathel->{'project'} = maptoremote($proj, $pathel->{'project'}); + } + } + for my $link (@{$rproj->{'link'} || []}) { + $link->{'project'} = maptoremote($proj, $link->{'project'}); + } + $remoteprojs->{$projid} = $rproj; + return $rproj; +} + +=head2 fetchremoteconfig - TODO: add summary + + TODO: add description + +=cut + +sub fetchremoteconfig { + my ($gctx, $projid) = @_; + + my $remoteprojs = $gctx->{'remoteprojs'}; + my $proj = $remoteprojs->{$projid}; + return undef if !$proj || $proj->{'error'}; + return $proj->{'config'} if exists $proj->{'config'}; + return '' if $proj->{'partition'}; + print " fetching remote project config for $projid\n"; + my $c; + my $param = { + 'uri' => "$proj->{'remoteurl'}/source/$proj->{'remoteproject'}/_config", + 'timeout' => 30, + 'proxy' => $gctx->{'proxy'}, + }; + eval { + $c = BSRPC::rpc($param); + }; + if ($@) { + warn($@); + $proj->{'error'} = $@; + $proj->{'error'} =~ s/\n$//s; + main::addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if BSSched::RPC::is_transient_error($proj->{'error'}); + return undef; + } + $proj->{'config'} = $c; + return $c; +} + +=head2 remotemap2remoteprojs - update remoteprojs with the remotemap data + + TODO: add description + +=cut + +sub remotemap2remoteprojs { + my ($gctx, $remotemap) = @_; + + my $remoteprojs = $gctx->{'remoteprojs'}; + for my $proj (@{$remotemap || []}) { + my $projid = delete $proj->{'project'}; + if (!$proj->{'remoteurl'} && !$proj->{'error'}) { + # remote project is gone (partition case) + delete $remoteprojs->{$projid}; + next; + } + my $oproj = $remoteprojs->{$projid}; + undef $oproj if $oproj && ($oproj->{'remoteurl'} ne $proj->{'remoteurl'} || $oproj->{'remoteproject'} ne $proj->{'remoteproject'}); + my $c = $proj->{'config'}; + $c = $oproj->{'config'} if !defined($c) && $oproj; + my $error = $proj->{'error'}; + delete $proj->{'error'}; + $proj = $oproj if $proj->{'proto'} && $oproj && !$oproj->{'proto'}; + delete $proj->{'config'}; + $proj->{'config'} = $c if defined $c; + if ($error) { + $proj->{'error'} = $error; + main::addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if $error =~ /interconnect error:/; + } + $remoteprojs->{$projid} = $proj; + } +} + +=head2 setupremotewatcher - TODO: add summary + + TODO: add description + +=cut + +sub setupremotewatcher { + my ($gctx, $remoteurl, $watchremote, $start) = @_; + my $myarch = $gctx->{'arch'}; + if ($start) { + print "setting up watcher for $remoteurl, start=$start\n"; + } else { + print "setting up watcher for $remoteurl\n"; + } + # collaps filter list, watch complete project if more than 3 packages are watched + my @filter; + my %filterpackage; + for (sort keys %$watchremote) { + if (substr($_, 0, 8) eq 'package/') { + my @s = split('/', $_); + if (!defined($s[2])) { + unshift @{$filterpackage{$s[1]}}, undef; + } else { + push @{$filterpackage{$s[1]}}, $_; + } + } else { + push @filter, $_; + } + } + for (sort keys %filterpackage) { + if (!defined($filterpackage{$_}->[0]) || @{$filterpackage{$_}} > 3) { + push @filter, "package/$_"; + } else { + push @filter, @{$filterpackage{$_}}; + } + } + my $param = { + 'uri' => "$remoteurl/lastevents", + 'async' => 1, + 'request' => 'POST', + 'headers' => [ 'Content-Type: application/x-www-form-urlencoded' ], + 'proxy' => $gctx->{'remoteproxy'}, + }; + my @args; + my $obsname = $gctx->{'obsname'}; + push @args, "obsname=$obsname/$myarch" if $obsname; + push @args, map {"filter=$_"} @filter; + push @args, "start=$start" if $start; + my $ret; + eval { + $ret = BSRPC::rpc($param, $BSXML::events, @args); + }; + if ($@) { + warn($@); + print "retrying in 60 seconds\n"; + $ret = {'retry' => time() + 60}; + } + $ret->{'remoteurl'} = $remoteurl; + return $ret; +} + +=head2 setupremotewatcher - TODO: add summary + + TODO: add description + +=cut + +sub getremoteevents { + my ($gctx, $watcher, $watchremote, $starthash) = @_; + + my $myarch = $gctx->{'arch'}; + my $remoteurl = $watcher->{'remoteurl'}; + my $start = $starthash->{$remoteurl}; + print "response from watcher for $remoteurl\n"; + my $ret; + eval { + $ret = BSRPC::rpc($watcher); + }; + if ($@) { + warn $@; + close($watcher->{'socket'}) if defined $watcher->{'socket'}; + delete $watcher->{'socket'}; + $watcher->{'retry'} = time() + 60; + print "retrying in 60 seconds\n"; + return (); + } + my @remoteevents; + if ($ret->{'sync'} && $ret->{'sync'} eq 'lost') { + # ok to lose sync on call with no start (actually not, FIXME) + if ($start) { + print "lost sync with server, was at $start\n"; + print "next: $ret->{'next'}\n" if $ret->{'next'}; + # synthesize all events we watch + for my $watch (sort keys %$watchremote) { + my $projid = $watchremote->{$watch}; + next unless defined $projid; + my @s = split('/', $watch); + if ($s[0] eq 'project') { + push @remoteevents, {'type' => 'project', 'project' => $projid}; + } elsif ($s[0] eq 'package') { + push @remoteevents, {'type' => 'package', 'project' => $projid, 'package' => $s[2]}; + } elsif ($s[0] eq 'repository' || $s[0] eq 'repoinfo') { + push @remoteevents, {'type' => $s[0], 'project' => $projid, 'repository' => $s[2], 'arch' => $s[3]}; + } + } + } + } + for my $ev (@{$ret->{'event'} || []}) { + next unless $ev->{'project'}; + my $watch; + if ($ev->{'type'} eq 'project') { + $watch = "project/$ev->{'project'}"; + } elsif ($ev->{'type'} eq 'package') { + $watch = "package/$ev->{'project'}/$ev->{'package'}"; + $watch = "package/$ev->{'project'}" unless defined $watchremote->{$watch}; + } elsif ($ev->{'type'} eq 'repository' || $ev->{'type'} eq 'repoinfo') { + $watch = "$ev->{'type'}/$ev->{'project'}/$ev->{'repository'}/$myarch"; + } else { + next; + } + my $projid = $watchremote->{$watch}; + next unless defined $projid; + push @remoteevents, {%$ev, 'project' => $projid}; + } + $starthash->{$remoteurl} = $ret->{'next'} if $ret->{'next'}; + return @remoteevents; +} + +1; diff --git a/src/backend/bs_sched b/src/backend/bs_sched index b7156ce3913..9ac8d577103 100755 --- a/src/backend/bs_sched +++ b/src/backend/bs_sched @@ -50,6 +50,7 @@ use BSSolv; use BSCando; use BSSched::RPC; +use BSSched::Remote; if ($BSConfig::enable_download_on_demand) { require BSDoD; @@ -458,9 +459,17 @@ sub getconfig { for my $prp (reverse @$path) { my ($p, $r) = split('/', $prp, 2); my $c; - if ($remoteprojs->{$p}) { - $c = fetchremoteconfig($gctx, $p); - return undef unless defined $c; + my $rproj = $remoteprojs->{$p}; + if ($rproj) { + return undef if $rproj->{'error'}; + if (exists($rproj->{'config'})) { + $c = $rproj->{'config'}; + } elsif ($rproj->{'partition'}) { + $c = ''; + } else { + $c = BSSched::Remote::fetchremoteconfig($gctx, $p); + return undef unless defined $c; + } } elsif ($projpacks->{$p}) { $c = $projpacks->{$p}->{'config'}; } @@ -3080,9 +3089,6 @@ sub addjobhist { #my %prpnoleaf; # is this prp referenced by another prp? #my @projpacks_linked; # data of all linked sources -my %_watchremote; # remote_url => { eventdescr => projid } -my %_watchremote_start; # remote_url => lasteventno - my %_repounchanged; my %_prpfinished; my %_prpnotready; # maps prp => { packid => 1, ... } @@ -3471,7 +3477,7 @@ sub update_projpacks { delete $remoteprojs->{$projid}; } } - remotemap2remoteprojs($gctx, $projpacksin->{'remotemap'}); + BSSched::Remote::remotemap2remoteprojs($gctx, $projpacksin->{'remotemap'}); } # like update_projpacks, but leaves the packages untouched @@ -3495,37 +3501,9 @@ sub update_projpacks_meta { if (!grep {$_->{'project'} eq $projid} @{$projpacksin->{'remotemap'} || []}) { delete $remoteprojs->{$projid}; } - remotemap2remoteprojs($gctx, $projpacksin->{'remotemap'}); + BSSched::Remote::remotemap2remoteprojs($gctx, $projpacksin->{'remotemap'}); } -# update remoteprojs with the remotemap data -sub remotemap2remoteprojs { - my ($gctx, $remotemap) = @_; - - my $remoteprojs = $gctx->{'remoteprojs'}; - for my $proj (@{$remotemap || []}) { - my $projid = delete $proj->{'project'}; - if (!$proj->{'remoteurl'} && !$proj->{'error'}) { - # remote project is gone (partition case) - delete $remoteprojs->{$projid}; - next; - } - my $oproj = $remoteprojs->{$projid}; - undef $oproj if $oproj && ($oproj->{'remoteurl'} ne $proj->{'remoteurl'} || $oproj->{'remoteproject'} ne $proj->{'remoteproject'}); - my $c = $proj->{'config'}; - $c = $oproj->{'config'} if !defined($c) && $oproj; - my $error = $proj->{'error'}; - delete $proj->{'error'}; - $proj = $oproj if $proj->{'proto'} && $oproj && !$oproj->{'proto'}; - delete $proj->{'config'}; - $proj->{'config'} = $c if defined $c; - if ($error) { - $proj->{'error'} = $error; - addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if $error =~ /interconnect error:/; - } - $remoteprojs->{$projid} = $proj; - } -} # -> BSUtil sub identical { @@ -3708,40 +3686,14 @@ sub postprocess_needed_check { # sub get_projpacks_postprocess { my ($gctx) = @_; - %{$gctx->{'watchremote'}} = (); - $gctx->{'watchremoteprojs'} = {}; # tmp + + BSSched::Remote::beginwatchcollection($gctx); #print Dumper($projpacks); calc_projpacks_linked($gctx); # modifies watchremote/watchremoteprojs calc_prps($gctx); # modifies watchremote/watchremoteprojs - updateremoteprojs($gctx); - delete $gctx->{'watchremoteprojs'}; # clean up -} - -# -# addwatchremote: register for a possibly remote resource -# -# input: $type: type of resource (project/package/repository) -# $projid: local name of the project -# $watch: extra data to match -# -sub addwatchremote { - my ($gctx, $type, $projid, $watch) = @_; - - my $projpacks = $gctx->{'projpacks'}; - return undef if $projpacks->{$projid} && !$projpacks->{$projid}->{'remoteurl'}; - my $proj = remoteprojid($gctx, $projid); - my $watchremoteprojs = $gctx->{'watchremoteprojs'} || {}; - $watchremoteprojs->{$projid} = $proj; - return undef unless $proj; - my $watchremote = $gctx->{'watchremote'}; - if ($proj->{'partition'}) { - $watchremote->{$BSConfig::srcserver}->{"$type/$proj->{'remoteproject'}$watch"} = $projid; - } else { - $watchremote->{$proj->{'remoteurl'}}->{"$type/$proj->{'remoteproject'}$watch"} = $projid; - } - return $proj; + BSSched::Remote::endwatchcollection($gctx); } sub addretryevent { @@ -3789,7 +3741,7 @@ sub calc_projpacks_linked { my @li = @{$pack->{'linked'}}; for my $li (@li) { $li = { %$li }; # clone so that we don't change projpack - addwatchremote($gctx, 'package', $li->{'project'}, "/$li->{'package'}"); + BSSched::Remote::addwatchremote($gctx, 'package', $li->{'project'}, "/$li->{'package'}"); $li->{'myproject'} = $projid; $li->{'mypackage'} = $mypackid; } @@ -3798,7 +3750,7 @@ sub calc_projpacks_linked { if ($proj->{'link'}) { my @li = expandprojlink($gctx, $projid); for my $li (@li) { - addwatchremote($gctx, 'package', $li->{'project'}, ''); # watch all packages + BSSched::Remote::addwatchremote($gctx, 'package', $li->{'project'}, ''); # watch all packages $li->{'package'} = ':*'; $li->{'myproject'} = $projid; } @@ -3864,13 +3816,13 @@ sub expandsearchpath { my $prp = "$t->{'project'}/$t->{'repository'}"; push @ret, $t unless $done{$prp}; $done{$prp} = 1; - addwatchremote($gctx, 'repository', $t->{'project'}, "/$t->{'repository'}/$myarch") unless $t->{'repository'} eq '_unavailable'; + BSSched::Remote::addwatchremote($gctx, 'repository', $t->{'project'}, "/$t->{'repository'}/$myarch") unless $t->{'repository'} eq '_unavailable'; if (!@path) { last if $done{"/$prp"}; my ($pid, $rid) = ($t->{'project'}, $t->{'repository'}); - my $proj = addwatchremote($gctx, 'project', $pid, ''); + my $proj = BSSched::Remote::addwatchremote($gctx, 'project', $pid, ''); if ($proj) { - $proj = fetchremoteproj($gctx, $proj, $pid); + $proj = BSSched::Remote::fetchremoteproj($gctx, $proj, $pid); } else { $proj = $projpacks->{$pid}; } @@ -3896,9 +3848,9 @@ sub expandprojlink { next if $seen{$lprojid}; push @ret, {'project' => $lprojid}; $seen{$lprojid} = 1; - my $lproj = addwatchremote($gctx, 'project', $lprojid, ''); + my $lproj = BSSched::Remote::addwatchremote($gctx, 'project', $lprojid, ''); if ($lproj) { - $lproj = fetchremoteproj($gctx, $lproj, $lprojid); + $lproj = BSSched::Remote::fetchremoteproj($gctx, $lproj, $lprojid); } else { $lproj = $projpacks->{$lprojid}; } @@ -4019,7 +3971,7 @@ sub calc_prps { my ($aprojid, $arepoid) = split('/', $xsp, 2); # we just watch the repository as it costs too much to # watch every single package - addwatchremote($gctx, 'repository', $aprojid, "/$arepoid/$myarch"); + BSSched::Remote::addwatchremote($gctx, 'repository', $aprojid, "/$arepoid/$myarch"); } my %xsp = map {$_ => 1} (@sp, @xsp); delete $xsp{$prp}; @@ -4048,155 +4000,6 @@ sub calc_prps { #################################################################### -sub updateremoteprojs { - my ($gctx) = @_; - - my $remoteprojs = $gctx->{'remoteprojs'}; - my $watchremoteprojs = $gctx->{'watchremoteprojs'}; - for my $projid (keys %$remoteprojs) { - my $r = $watchremoteprojs->{$projid}; - if (!$r) { - delete $remoteprojs->{$projid}; - next; - } - my $or = $remoteprojs->{$projid}; - next if $or && $or->{'partition'}; # XXX how do we update them? - next if $or && $or->{'remoteurl'} eq $r->{'remoteurl'} && $or->{'remoteproject'} eq $r->{'remoteproject'}; - delete $remoteprojs->{$projid}; - } - for my $projid (sort keys %$watchremoteprojs) { - my $r = $watchremoteprojs->{$projid}; - fetchremoteproj($gctx, $r, $projid) if $r; - } -} - -sub remoteprojid { - my ($gctx, $projid) = @_; - my $rsuf = ''; - my $origprojid = $projid; - - my $projpacks = $gctx->{'projpacks'}; - my $remoteprojs = $gctx->{'remoteprojs'}; - return $remoteprojs->{$projid} if $remoteprojs->{$projid} && $remoteprojs->{$projid}->{'partition'}; - my $proj = $projpacks->{$projid}; - if ($proj) { - return undef unless $proj->{'remoteurl'}; - if (!$proj->{'remoteproject'}) { - $proj = { %$proj }; - delete $proj->{'remoteurl'}; - return $proj; - } - return { - 'name' => $projid, - 'root' => $projid, - 'remoteroot' => $proj->{'remoteproject'}, - 'remoteurl' => $proj->{'remoteurl'}, - 'remoteproject' => $proj->{'remoteproject'}, - }; - } - while ($projid =~ /^(.*)(:.*?)$/) { - $projid = $1; - $rsuf = "$2$rsuf"; - $proj = $projpacks->{$projid}; - if ($proj) { - return undef unless $proj->{'remoteurl'}; - if ($proj->{'remoteproject'}) { - $rsuf = "$proj->{'remoteproject'}$rsuf"; - } else { - $rsuf =~ s/^://; - } - return { - 'name' => $origprojid, - 'root' => $projid, - 'remoteroot' => $proj->{'remoteproject'}, - 'remoteurl' => $proj->{'remoteurl'}, - 'remoteproject' => $rsuf, - }; - } - } - return undef; -} - -sub maptoremote { - my ($proj, $projid) = @_; - return $projid if $proj->{'partition'}; - return "$proj->{'root'}:$projid" unless $proj->{'remoteroot'}; - return $proj->{'root'} if $projid eq $proj->{'remoteroot'}; - return '_unavailable' if $projid !~ /^\Q$proj->{'remoteroot'}\E:(.*)$/; - return "$proj->{'root'}:$1"; -} - -sub fetchremoteproj { - my ($gctx, $proj, $projid) = @_; - return undef unless $proj && $proj->{'remoteurl'} && $proj->{'remoteproject'}; - $projid ||= $proj->{'name'}; - my $remoteprojs = $gctx->{'remoteprojs'}; - return $remoteprojs->{$projid} if exists $remoteprojs->{$projid}; - print "fetching remote project data for $projid\n"; - my $rproj; - my $param = { - 'uri' => "$proj->{'remoteurl'}/source/$proj->{'remoteproject'}/_meta", - 'timeout' => 30, - 'proxy' => $gctx->{'remoteproxy'}, - }; - eval { - $rproj = BSRPC::rpc($param, $BSXML::proj); - }; - if ($@) { - warn($@); - my $error = $@; - $error =~ s/\n$//s; - $rproj = {'error' => $error}; - addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if BSSched::RPC::is_transient_error($error); - } - return undef unless $rproj; - for (qw{name root remoteroot remoteurl remoteproject}) { - $rproj->{$_} = $proj->{$_}; - } - # map remote project names to local names - for my $repo (@{$rproj->{'repository'} || []}) { - for my $pathel (@{$repo->{'path'} || []}) { - $pathel->{'project'} = maptoremote($proj, $pathel->{'project'}); - } - for my $pathel (@{$repo->{'releasetarget'} || []}) { - $pathel->{'project'} = maptoremote($proj, $pathel->{'project'}); - } - } - for my $link (@{$rproj->{'link'} || []}) { - $link->{'project'} = maptoremote($proj, $link->{'project'}); - } - $remoteprojs->{$projid} = $rproj; - return $rproj; -} - -sub fetchremoteconfig { - my ($gctx, $projid) = @_; - - my $remoteprojs = $gctx->{'remoteprojs'}; - my $proj = $remoteprojs->{$projid}; - return undef if !$proj || $proj->{'error'}; - return $proj->{'config'} if exists $proj->{'config'}; - return '' if $proj->{'partition'}; - print " fetching remote project config for $projid\n"; - my $c; - my $param = { - 'uri' => "$proj->{'remoteurl'}/source/$proj->{'remoteproject'}/_config", - 'timeout' => 30, - 'proxy' => $gctx->{'remoteproxy'}, - }; - eval { - $c = BSRPC::rpc($param); - }; - if ($@) { - warn($@); - $proj->{'error'} = $@; - $proj->{'error'} =~ s/\n$//s; - addretryevent($gctx, {'type' => 'project', 'project' => $projid}) if BSSched::RPC::is_transient_error($proj->{'error'}); - return undef; - } - $proj->{'config'} = $c; - return $c; -} ### remote repository handling @@ -7356,123 +7159,6 @@ sub updaterelsyncmax { return $changed; } - -sub setupremotewatcher { - my ($gctx, $remoteurl, $watchremote, $start) = @_; - my $myarch = $gctx->{'arch'}; - if ($start) { - print "setting up watcher for $remoteurl, start=$start\n"; - } else { - print "setting up watcher for $remoteurl\n"; - } - # collaps filter list, watch complete project if more than 3 packages are watched - my @filter; - my %filterpackage; - for (sort keys %$watchremote) { - if (substr($_, 0, 8) eq 'package/') { - my @s = split('/', $_); - if (!defined($s[2])) { - unshift @{$filterpackage{$s[1]}}, undef; - } else { - push @{$filterpackage{$s[1]}}, $_; - } - } else { - push @filter, $_; - } - } - for (sort keys %filterpackage) { - if (!defined($filterpackage{$_}->[0]) || @{$filterpackage{$_}} > 3) { - push @filter, "package/$_"; - } else { - push @filter, @{$filterpackage{$_}}; - } - } - my $param = { - 'uri' => "$remoteurl/lastevents", - 'async' => 1, - 'request' => 'POST', - 'headers' => [ 'Content-Type: application/x-www-form-urlencoded' ], - 'proxy' => $gctx->{'remoteproxy'}, - }; - my @args; - my $obsname = $gctx->{'obsname'}; - push @args, "obsname=$obsname/$myarch" if $obsname; - push @args, map {"filter=$_"} @filter; - push @args, "start=$start" if $start; - my $ret; - eval { - $ret = BSRPC::rpc($param, $BSXML::events, @args); - }; - if ($@) { - warn($@); - print "retrying in 60 seconds\n"; - $ret = {'retry' => time() + 60}; - } - $ret->{'remoteurl'} = $remoteurl; - return $ret; -} - -sub getremoteevents { - my ($gctx, $watcher, $watchremote, $starthash) = @_; - - my $myarch = $gctx->{'arch'}; - my $remoteurl = $watcher->{'remoteurl'}; - my $start = $starthash->{$remoteurl}; - print "response from watcher for $remoteurl\n"; - my $ret; - eval { - $ret = BSRPC::rpc($watcher); - }; - if ($@) { - warn $@; - close($watcher->{'socket'}) if defined $watcher->{'socket'}; - delete $watcher->{'socket'}; - $watcher->{'retry'} = time() + 60; - print "retrying in 60 seconds\n"; - return (); - } - my @remoteevents; - if ($ret->{'sync'} && $ret->{'sync'} eq 'lost') { - # ok to lose sync on call with no start (actually not, FIXME) - if ($start) { - print "lost sync with server, was at $start\n"; - print "next: $ret->{'next'}\n" if $ret->{'next'}; - # synthesize all events we watch - for my $watch (sort keys %$watchremote) { - my $projid = $watchremote->{$watch}; - next unless defined $projid; - my @s = split('/', $watch); - if ($s[0] eq 'project') { - push @remoteevents, {'type' => 'project', 'project' => $projid}; - } elsif ($s[0] eq 'package') { - push @remoteevents, {'type' => 'package', 'project' => $projid, 'package' => $s[2]}; - } elsif ($s[0] eq 'repository' || $s[0] eq 'repoinfo') { - push @remoteevents, {'type' => $s[0], 'project' => $projid, 'repository' => $s[2], 'arch' => $s[3]}; - } - } - } - } - for my $ev (@{$ret->{'event'} || []}) { - next unless $ev->{'project'}; - my $watch; - if ($ev->{'type'} eq 'project') { - $watch = "project/$ev->{'project'}"; - } elsif ($ev->{'type'} eq 'package') { - $watch = "package/$ev->{'project'}/$ev->{'package'}"; - $watch = "package/$ev->{'project'}" unless defined $watchremote->{$watch}; - } elsif ($ev->{'type'} eq 'repository' || $ev->{'type'} eq 'repoinfo') { - $watch = "$ev->{'type'}/$ev->{'project'}/$ev->{'repository'}/$myarch"; - } else { - next; - } - my $projid = $watchremote->{$watch}; - next unless defined $projid; - push @remoteevents, {%$ev, 'project' => $projid}; - } - $starthash->{$remoteurl} = $ret->{'next'} if $ret->{'next'}; - return @remoteevents; -} - sub no_expander { return 1, splice(@_, 2); } @@ -8280,8 +7966,8 @@ my $gctx = { 'prpnotready' => \%_prpnotready, # remote watchers - 'watchremote' => \%_watchremote, - 'watchremote_start' => \%_watchremote_start, + 'watchremote' => {}, # remote_url => { eventdescr => projid } + 'watchremote_start' => {}, # remote_url => lasteventno 'changed_low' => \%changed_low, 'changed_med' => \%changed_med, @@ -8597,7 +8283,7 @@ NEXTPRP: # create watchers for my $remoteurl (sort keys %$watchremote) { if (!$remotewatchers{$remoteurl}) { - my $watcher = setupremotewatcher($gctx, $remoteurl, $watchremote->{$remoteurl}, $watchremote_start->{$remoteurl}); + my $watcher = BSSched::Remote::setupremotewatcher($gctx, $remoteurl, $watchremote->{$remoteurl}, $watchremote_start->{$remoteurl}); $watcher->{'watchlist'} = join("\0", sort keys %{$watchremote->{$remoteurl}}); $remotewatchers{$remoteurl} = $watcher; } @@ -8637,7 +8323,7 @@ NEXTPRP: print "retrying watcher for $remoteurl\n"; delete $remotewatchers{$remoteurl}; } else { - push @events, getremoteevents($gctx, $watcher, $watchremote->{$remoteurl}, $watchremote_start); + push @events, BSSched::Remote::getremoteevents($gctx, $watcher, $watchremote->{$remoteurl}, $watchremote_start); delete $remotewatchers{$remoteurl} unless $watcher->{'retry'}; } }