Permalink
Browse files

Avoiding race condition on roster collection

Workers must not call save() when in the available=true state, because they would overwrite an assignment made by the Recruiter. Operations to be performed in any case like stillHere() have to execute an atomic query instead of storing a potentially stale full document.

Moreover, signals have to be managed immediately before starting to talk
with the database; in this way we can deal with very quick restarts by
using the graceful shutdown.
  • Loading branch information...
1 parent be13df0 commit 0c950595a475f36778641f49c8b369135a329f8a Giorgio Sironi committed Mar 9, 2016
Showing with 34 additions and 21 deletions.
  1. +10 −8 bin/recruiter
  2. +4 −3 bin/worker
  3. +1 −1 spec/Recruiter/Acceptance/AssignmentTest.php
  4. +11 −9 src/Recruiter/Worker.php
  5. +8 −0 src/Recruiter/Worker/Repository.php
View
@@ -19,6 +19,13 @@ $recruiter = $cli->get('recruiter');
$waitStrategy = $cli->get('wait-strategy');
$memoryLimit = $cli->get('memory-limit');
+$askedToStop = false;
+foreach ([SIGTERM, SIGQUIT, SIGINT] as $signal) {
+ pcntl_signal($signal, function($signal) use(&$askedToStop) {
+ $askedToStop = true;
+ });
+}
+
$recruiter->ensureIsTheOnlyOne(
$waitStrategy->timeToWaitAtMost(),
$otherwise = function() {
@@ -30,11 +37,6 @@ $recruiter->ensureIsTheOnlyOne(
}
);
-foreach ([SIGTERM, SIGQUIT, SIGINT] as $signal) {
- pcntl_signal($signal, function($signal) use(&$askedToStop) {
- $askedToStop = true;
- });
-}
printf(
'[RECRUITER][%d][%s] ready to recruit!' . PHP_EOL,
@@ -69,9 +71,9 @@ while (!askedToStop()) {
'[RECRUITER][%d][%s] unlocked %d jobs due to dead workers' . PHP_EOL,
posix_getpid(), date('c'), $unlockedJobs
);
- (count($assignment) === 0) ?
- $waitStrategy->wait()->backOff() :
- $waitStrategy->reset();
+ (count($assignment) > 0) ?
+ $waitStrategy->reset() :
+ $waitStrategy->wait()->backOff();
}
$recruiter->bye();
printf(
View
@@ -23,22 +23,23 @@ $waitStrategy = $cli->get('wait-strategy');
$filterJobToWorkOn = $cli->get('work-on');
$askedToStop = false;
-$worker = $filterJobToWorkOn->applyTo($recruiter->hire());
foreach ([SIGTERM, SIGQUIT, SIGINT] as $signal) {
pcntl_signal($signal, function($signal) use(&$askedToStop) {
$askedToStop = true;
});
}
+$worker = $filterJobToWorkOn->applyTo($recruiter->hire());
+
printf('[WORKER][%d][%s] worker %s ready to work!' . PHP_EOL, posix_getpid(), date('c'), $worker->id());
while (!askedToStop()) {
$doneSomeWork = $worker->work();
$memoryLimit->ensure(memory_get_usage());
if ($doneSomeWork) {
printf(
- '[WORKER][%d][%s] executed job' . PHP_EOL,
- posix_getpid(), date('c')
+ '[WORKER][%d][%s] executed job %s' . PHP_EOL,
+ posix_getpid(), date('c'), $doneSomeWork
);
}
($doneSomeWork) ?
@@ -15,6 +15,6 @@ public function testAJobCanBeAssignedAndExecuted()
$worker = $this->recruiter->hire();
$assignments = $this->recruiter->assignJobsToWorkers();
$this->assertEquals(1, count($assignments));
- $this->assertTrue($worker->work());
+ $this->assertTrue((bool) $worker->work());
}
}
@@ -53,14 +53,15 @@ public function work()
$this->refresh();
if ($this->hasBeenAssignedToDoSomething()) {
$this->workOn(
- $this->recruiter->scheduledJob(
+ $job = $this->recruiter->scheduledJob(
$this->status['assigned_to'][(string)$this->status['_id']]
)
);
- return true;
+ return (string) $job->id();
+ } else {
+ $this->stillHere();
+ return false;
}
- $this->stillHere();
- return false;
}
public function export()
@@ -89,8 +90,9 @@ public function retire()
private function stillHere()
{
- $this->status['last_seen_at'] = T\MongoDate::now();
- $this->save();
+ $lastSeenAt = T\MongoDate::now();
+ $this->status['last_seen_at'] = $lastSeenAt;
+ $this->repository->atomicUpdate($this, ['last_seen_at' => $lastSeenAt]);
}
private function workOn($job)
@@ -192,9 +194,9 @@ public static function assignJobsToWorkers(MongoCollection $collection, $jobs, $
Onebip\array_map($workers, function($id) {return (string)$id;}),
$jobs
);
- $collection->update(
- ['_id' => ['$in' => array_values($workers)]],
- ['$set' => [
+ $result = $collection->update(
+ $where = ['_id' => ['$in' => array_values($workers)]],
+ $update = ['$set' => [
'available' => false,
'assigned_to' => $assignment,
'assigned_since' => T\MongoDate::now()
@@ -25,6 +25,14 @@ public function save($worker)
$this->roster->save($worker->export());
}
+ public function atomicUpdate($worker, array $changeSet)
+ {
+ $this->roster->update(
+ ['_id' => $worker->id()],
+ ['$set' => $changeSet]
+ );
+ }
+
public function refresh($worker)
{
$worker->updateWith(

0 comments on commit 0c95059

Please sign in to comment.