Permalink
Browse files

Rolling back jobs locked but not assigned after crash

  • Loading branch information...
1 parent 70a8a27 commit ef490f4acde1b7d00d9b40aee5b2f02257f67d70 Giorgio Sironi committed Mar 8, 2016
View
@@ -40,6 +40,7 @@ printf(
'[RECRUITER][%d][%s] ready to recruit!' . PHP_EOL,
posix_getpid(), date('c')
);
+$recruiter->init();
while (!askedToStop()) {
$memoryUsage = ByteUnits\bytes(memory_get_usage());
$pickStartAt = microtime(true);
@@ -9,7 +9,9 @@ public function testRecruiterCrashAfterLockingJobsBeforeAssignmentAndIsRestarted
$this->enqueueJob();
$worker = $this->recruiter->hire();
- $assignments = $this->recruiter->assignJobsToWorkers1();
+ $assignments = $this->recruiter->bookJobsForWorkers();
+
+ $this->recruiter->rollbackLockedJobs();
$assignments = $this->recruiter->assignJobsToWorkers();
$this->assertEquals(1, $assignments);
}
View
@@ -175,7 +175,7 @@ public static function pickReadyJobsForWorkers(MongoCollection $collection, $wor
$jobs = Onebip\array_pluck(
iterator_to_array(
$collection
- ->find($query =
+ ->find(
(Worker::canWorkOnAnyJobs($worksOn) ?
[ 'scheduled_at' => ['$lt' => T\MongoDate::now()],
'active' => true,
@@ -200,6 +200,24 @@ public static function pickReadyJobsForWorkers(MongoCollection $collection, $wor
}
}
+ public static function rollbackLockedNotIn(MongoCollection $collection, array $excluded)
+ {
+ $collection->update(
+ [
+ 'locked' => true,
+ '_id' => ['$nin' => $excluded],
+ ],
+ [
+ '$set' => [
+ 'locked' => false,
+ ]
+ ],
+ [
+ 'multiple' => true,
+ ]
+ );
+ }
+
public static function lockAll(MongoCollection $collection, $jobs)
{
$collection->update(
@@ -50,24 +50,47 @@ public function ensureIsTheOnlyOne(Interval $timeToWaitAtMost, $otherwise)
}
}
+ public function init()
+ {
+ $this->rollbackLockedJobs();
+ }
+
+ /**
+ * @step
+ */
+ public function rollbackLockedJobs()
+ {
+ $assignedJobs = Worker::assignedJobs($this->db->selectCollection('roster'));
+ Job::rollbackLockedNotIn($this->db->selectCollection('scheduled'), $assignedJobs);
+ }
+
+ /**
+ * @step
+ */
public function stillHere(Interval $timeToWaitAtMost)
{
$this->lock->refresh($timeToWaitAtMost->seconds() * self::LOCK_FACTOR);
}
+ /**
+ * @step
+ */
public function bye()
{
$this->lock->release();
}
public function assignJobsToWorkers()
{
- $bookedJobs = $this->assignJobsToWorkers1();
+ $bookedJobs = $this->bookJobsForWorkers();
- return $this->assignJobsToWorkers2($bookedJobs);
+ return $this->assignLockedJobsToWorkers($bookedJobs);
}
- public function assignJobsToWorkers1()
+ /**
+ * @step
+ */
+ public function bookJobsForWorkers()
{
$roster = $this->db->selectCollection('roster');
$scheduled = $this->db->selectCollection('scheduled');
@@ -89,7 +112,10 @@ public function assignJobsToWorkers1()
return $bookedJobs;
}
- public function assignJobsToWorkers2($bookedJobs)
+ /**
+ * @step
+ */
+ public function assignLockedJobsToWorkers($bookedJobs)
{
$numberOfWorkersWithJobs = 0;
$roster = $this->db->selectCollection('roster');
@@ -106,6 +132,9 @@ public function scheduledJob($id)
return $this->jobs->scheduled($id);
}
+ /**
+ * @step
+ */
public function retireDeadWorkers(Clock $clock)
{
$this->jobs->releaseAll(
@@ -202,6 +202,18 @@ public static function assignJobsToWorkers(MongoCollection $collection, $jobs, $
);
}
+ public static function assignedJobs(MongoCollection $collection)
+ {
+ $cursor = $collection->find([], ['assigned_to' => 1]);
+ $jobs = [];
+ foreach ($cursor as $document) {
+ if (array_key_exists('assigned_to', $document)) {
+ $jobs = array_merge($jobs, array_values($document['assigned_to']));
+ }
+ }
+ return array_unique($jobs);
+ }
+
public static function retireDeadWorkers(Repository $roster, Clock $clock)
{
$now = $clock->current();

0 comments on commit ef490f4

Please sign in to comment.