Skip to content

Commit

Permalink
[backend] access eventdir and myeventdir through gctx
Browse files Browse the repository at this point in the history
  • Loading branch information
M0ses authored and mlschroe committed Nov 20, 2015
1 parent 518c6dc commit ec11d45
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions src/backend/bs_sched
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ BSUtil::set_fdatasync_before_rename() unless $BSConfig::disable_data_sync || $BS

my $_reporoot = "$bsdir/build";
my $_jobsdir = "$bsdir/jobs";
my $eventdir = "$bsdir/events";
my $_eventdir = "$bsdir/events";
my $extrepodir = "$bsdir/repos";
my $dodsdir = "$bsdir/dods";
my $rundir = $BSConfig::rundir || "$bsdir/run";
Expand All @@ -112,8 +112,6 @@ if (!$BSCando::knownarch{$myarch}) {
die("Architecture '$myarch' is unknown, please adapt BSCando.pm\n");
}

my $myeventdir = "$eventdir/$myarch";

my $historylay = [qw{versrel bcnt srcmd5 rev time duration}];

my %remoteprojs; # remote project cache
Expand All @@ -130,8 +128,9 @@ sub unify {
}

sub sendevent {
my ($ev, $arch, $evname) = @_;
my ($gctx, $ev, $arch, $evname) = @_;

my $eventdir = $gctx->{'eventdir'};
mkdir_p("$eventdir/$arch");
$evname = "$ev->{'type'}:::".Digest::MD5::md5_hex($evname) if length($evname) > 200;
writexml("$eventdir/$arch/.$evname$$", "$eventdir/$arch/$evname", $ev, $BSXML::event);
Expand Down Expand Up @@ -1040,15 +1039,15 @@ sub checkprpaccess {
# input: $prp - prp to be published
#
sub sendpublishevent {
my ($prp) = @_;
my ($gctx, $prp) = @_;

my ($projid, $repoid) = split('/', $prp, 2);
my $ev = {
'type' => 'publish',
'project' => $projid,
'repository' => $repoid,
};
sendevent($ev, 'publish', "${projid}::$repoid");
sendevent($gctx, $ev, 'publish', "${projid}::$repoid");
}

#
Expand All @@ -1059,7 +1058,7 @@ sub sendpublishevent {
# disk and the dispatcher will pick it up and send it for us.
#
sub sendrepochangeevent {
my ($prp, $type) = @_;
my ($gctx, $prp, $type) = @_;

my ($projid, $repoid) = split('/', $prp, 2);
my $ev = {
Expand All @@ -1068,22 +1067,22 @@ sub sendrepochangeevent {
'repository' => $repoid,
'arch' => $myarch,
};
sendevent($ev, 'repository', "${projid}::${repoid}::${myarch}");
sendevent($gctx, $ev, 'repository', "${projid}::${repoid}::${myarch}");
}

#
# sendunblockedevent - send an unblocked event to another scheduler
#
sub sendunblockedevent {
my ($prp, $arch) = @_;
my ($gctx, $prp, $arch) = @_;

my ($projid, $repoid) = split('/', $prp, 2);
my $ev = {
'type' => 'unblocked',
'project' => $projid,
'repository' => $repoid,
};
sendevent($ev, $arch, "unblocked::${projid}::${repoid}");
sendevent($gctx, $ev, $arch, "unblocked::${projid}::${repoid}");
}

sub set_repo_state {
Expand Down Expand Up @@ -1493,7 +1492,7 @@ sub prpfinished {
}
# release lock
close(F);
sendpublishevent($prp);
sendpublishevent($ctx->{'gctx'}, $prp);
return '';
}

Expand Down Expand Up @@ -1655,7 +1654,7 @@ sub prpfinished {

# release lock and ping publisher
close(F);
sendpublishevent($prp);
sendpublishevent($ctx->{'gctx'}, $prp);
return '';
}

Expand Down Expand Up @@ -1724,7 +1723,7 @@ sub createexportjob {
'type' => 'import',
'job' => $job,
};
sendevent($ev, $arch, "import.$job");
sendevent($gctx, $ev, $arch, "import.$job");
}


Expand Down Expand Up @@ -4954,9 +4953,9 @@ sub fakejobfinished {
close(F);
my $ev = {'type' => 'built', 'arch' => $myarch, 'job' => $job};
if ($needsign) {
sendevent($ev, 'signer', "finished:$myarch:$job");
sendevent($gctx, $ev, 'signer', "finished:$myarch:$job");
} else {
sendevent($ev, $myarch, "finished:$job");
sendevent($gctx, $ev, $myarch, "finished:$job");
}
$ourjobs{$job} = 1;
}
Expand Down Expand Up @@ -5502,10 +5501,10 @@ sub checkkiwiproduct {
$gbininfo = read_gbininfo("$reporoot/$aprp/$arch", $arch eq $myarch ? 0 : 1);
}
next if $delayed_errors;
if (!$gbininfo && $arch ne $myarch && -d "$eventdir/$arch") {
if (!$gbininfo && $arch ne $myarch && -d "$gctx->{'eventdir'}/$arch") {
# mis-use unblocked to tell other scheduler that it is missing
print " requesting :repoinfo for $aprp/$arch\n";
sendunblockedevent($aprp, $arch);
sendunblockedevent($gctx, $aprp, $arch);
}
@apackids = unify(@apackids, sort keys %$gbininfo) if $gbininfo;

Expand Down Expand Up @@ -5607,7 +5606,7 @@ sub checkkiwiproduct {
# looks good from our side. tell master arch to check it
if (-e "$markerdir/.waiting_for_$myarch") {
unlink("$markerdir/.waiting_for_$myarch");
sendunblockedevent($prp, $buildarch);
sendunblockedevent($gctx, $prp, $buildarch);
print " - $packid (kiwi-product)\n";
print " unblocked\n";
}
Expand Down Expand Up @@ -5872,7 +5871,7 @@ sub checkpatchinfo {
if (!$blocked) {
if (-e "$markerdir/.waiting_for_$myarch") {
unlink("$markerdir/.waiting_for_$myarch");
sendunblockedevent("$projid/$repoid", $buildarch);
sendunblockedevent($gctx, "$projid/$repoid", $buildarch);
print " - $packid (patchinfo)\n";
print " unblocked\n";
}
Expand All @@ -5881,7 +5880,7 @@ sub checkpatchinfo {
# hmm, we should be blocked. trigger build arch check
if (!-e "$markerdir/.waiting_for_$myarch") {
BSUtil::touch("$reporoot/$projid/$repoid/$buildarch/:schedulerstate.dirty") if -d "$reporoot/$projid/$repoid/$buildarch";
sendunblockedevent("$projid/$repoid", $buildarch);
sendunblockedevent($gctx, "$projid/$repoid", $buildarch);
print " - $packid (patchinfo)\n";
print " blocked\n";
}
Expand Down Expand Up @@ -8112,8 +8111,8 @@ if ($testmode && ($testmode eq 'exit' || $testmode eq 'restart')) {
'type' => $testmode eq 'restart' ? 'restart' : 'exitcomplete',
};
my $evname = "$ev->{'type'}::";
sendevent($ev, $myarch, $evname);
BSUtil::waituntilgone("$myeventdir/$evname");
sendevent({'eventdir' => $_eventdir}, $ev, $myarch, $evname);
BSUtil::waituntilgone("$_eventdir/$myarch/$evname");
if ($testmode eq 'exit') {
# scheduler saw the event, wait until the process is gone
local *F;
Expand All @@ -8132,17 +8131,18 @@ if (!$testprojid) {
utime undef, undef, "$rundir/bs_sched.$myarch.lock";
}

# setup event mechanism
for my $d ($eventdir, $myeventdir, "$_jobsdir/$myarch", $infodir) {
for my $d ("$_eventdir/$myarch", "$_jobsdir/$myarch", $infodir) {
next if -d $d;
mkdir($d) || die("$d: $!\n");
}
if (!-p "$myeventdir/.ping") {
POSIX::mkfifo("$myeventdir/.ping", 0666) || die("$myeventdir/.ping: $!");
chmod(0666, "$myeventdir/.ping");
# setup event mechanism
my $_myeventdir = "$_eventdir/$myarch";
if (!-p "$_myeventdir/.ping") {
POSIX::mkfifo("$_myeventdir/.ping", 0666) || die("$_myeventdir/.ping: $!");
chmod(0666, "$_myeventdir/.ping");
}

sysopen(PING, "$myeventdir/.ping", POSIX::O_RDWR) || die("$myeventdir/.ping: $!");
sysopen(PING, "$_myeventdir/.ping", POSIX::O_RDWR) || die("$_myeventdir/.ping: $!");
#fcntl(PING,F_SETFL,POSIX::O_NONBLOCK);


Expand All @@ -8167,8 +8167,11 @@ my %nextmed;
my $gctx = {
'arch' => $myarch,
'reporoot' => $_reporoot,

'jobsdir' => $_jobsdir,
'myjobsdir' => "$_jobsdir/$myarch",
'eventdir' => $_eventdir,
'myeventdir' => "$_myeventdir",

'changed_low' => \%changed_low,
'changed_med' => \%changed_med,
Expand Down Expand Up @@ -8523,20 +8526,20 @@ NEXTPRP:

my @events = @remoteevents;
# check eventdir for new events
for my $evfilename (sort(ls($myeventdir))) {
for my $evfilename (sort(ls($_myeventdir))) {
next if $evfilename =~ /^\./;
my $ev;
if ($evfilename =~ /^finished:(.*)/) {
$ev = {'type' => 'built', 'job' => $1};
} else {
$ev = readxml("$myeventdir/$evfilename", $BSXML::event, 1);
$ev = readxml("$_myeventdir/$evfilename", $BSXML::event, 1);
if (!$ev) {
print "$evfilename: bad xml\n";
unlink("$myeventdir/$evfilename");
unlink("$_myeventdir/$evfilename");
next;
}
}
$ev->{'evfilename'} = $evfilename;
$ev->{'evfilename'} = "$_myeventdir/$evfilename";
push @events, $ev;
}

Expand Down Expand Up @@ -8583,7 +8586,7 @@ NEXTPRP:
print "$estr\n";
}

unlink("$myeventdir/$ev->{'evfilename'}") if $ev->{'evfilename'};
unlink($ev->{'evfilename'}) if $ev->{'evfilename'};
delete $ev->{'evfilename'};

if ($ev->{'type'} ne 'built' && $ectx->{'fullcache'}) {
Expand Down Expand Up @@ -8773,7 +8776,7 @@ NEXTPRP:
}
}
$changed_med{$prp} = 2;
sendrepochangeevent($prp);
sendrepochangeevent($gctx, $prp);
killbuilding($gctx, $prp);
my $ctx = {'gctx' => $gctx, 'prp' => $prp};
prpfinished($ctx);
Expand Down Expand Up @@ -9002,7 +9005,7 @@ NEXTPRP:
my $importarch = $packs->{$packid} && @ifiles ? '' : undef;
update_dst_full($gctx, $prp, $packid, undef, undef, $useforbuildenabled, $prpsearchpath{$prp}, undef, $importarch);
$changed_med{$prp} = 2;
sendrepochangeevent($prp);
sendrepochangeevent($gctx, $prp);
# delete other files
unlink("$gdst/:logfiles.success/$packid");
unlink("$gdst/:logfiles.fail/$packid");
Expand Down Expand Up @@ -9531,10 +9534,10 @@ NEXTPRP:
# we alse send it if we finish a prp to give linked aggregates a
# chance to work
if (!$repounchanged{$prp} || (!%unfinished && !$prpfinished{$prp})) {
sendrepochangeevent($prp);
sendrepochangeevent($gctx, $prp);
$repounchanged{$prp} = 1;
} elsif ($repounchanged{$prp} == 2) {
sendrepochangeevent($prp, 'repoinfo');
sendrepochangeevent($gctx, $prp, 'repoinfo');
$repounchanged{$prp} = 1;
}

Expand Down

0 comments on commit ec11d45

Please sign in to comment.