From 576f057c0c298b7b84218f95d4cf31163c52bd12 Mon Sep 17 00:00:00 2001 From: aigarszuika Date: Tue, 25 May 2021 17:59:46 +0300 Subject: [PATCH 1/4] Changed: Clear Doctrine cache before calculate Root Job Status Before determine Root Job Status Doctrine cache for Job entity must be cleared, otherwise if this root job was processed in previous request by the same enqueue, then it will falsely return old status, which will cause infinite running root jobs --- pkg/job-queue/CalculateRootJobStatusProcessor.php | 1 + pkg/job-queue/Doctrine/JobStorage.php | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/pkg/job-queue/CalculateRootJobStatusProcessor.php b/pkg/job-queue/CalculateRootJobStatusProcessor.php index b9b0c4dfb..bbb40bf3b 100644 --- a/pkg/job-queue/CalculateRootJobStatusProcessor.php +++ b/pkg/job-queue/CalculateRootJobStatusProcessor.php @@ -48,6 +48,7 @@ public function __construct( public function process(Message $message, Context $context) { + $this->jobStorage->clearJobCache(); $data = JSON::decode($message->getBody()); if (!isset($data['jobId'])) { diff --git a/pkg/job-queue/Doctrine/JobStorage.php b/pkg/job-queue/Doctrine/JobStorage.php index 385ee4d38..8268dd791 100644 --- a/pkg/job-queue/Doctrine/JobStorage.php +++ b/pkg/job-queue/Doctrine/JobStorage.php @@ -179,6 +179,11 @@ public function saveJob(Job $job, \Closure $lockCallback = null) } } + public function clearJobCache() + { + $this->getEntityManager()->clear($this->entityClass); + } + /** * @return EntityRepository */ From c4da15e78a7c5b73f1266b2c7a5a9a64ef73befd Mon Sep 17 00:00:00 2001 From: aigarszuika Date: Wed, 26 May 2021 10:09:37 +0300 Subject: [PATCH 2/4] Changed: Refresh child entity when checking root job status --- pkg/job-queue/CalculateRootJobStatusProcessor.php | 1 - pkg/job-queue/CalculateRootJobStatusService.php | 1 + pkg/job-queue/Doctrine/JobStorage.php | 7 +++++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/job-queue/CalculateRootJobStatusProcessor.php b/pkg/job-queue/CalculateRootJobStatusProcessor.php index bbb40bf3b..b9b0c4dfb 100644 --- a/pkg/job-queue/CalculateRootJobStatusProcessor.php +++ b/pkg/job-queue/CalculateRootJobStatusProcessor.php @@ -48,7 +48,6 @@ public function __construct( public function process(Message $message, Context $context) { - $this->jobStorage->clearJobCache(); $data = JSON::decode($message->getBody()); if (!isset($data['jobId'])) { diff --git a/pkg/job-queue/CalculateRootJobStatusService.php b/pkg/job-queue/CalculateRootJobStatusService.php index 4268158b1..a9d62e3b1 100644 --- a/pkg/job-queue/CalculateRootJobStatusService.php +++ b/pkg/job-queue/CalculateRootJobStatusService.php @@ -69,6 +69,7 @@ protected function calculateRootJobStatus(array $jobs) $success = 0; foreach ($jobs as $job) { + $this->jobStorage->refreshedJobEntity($job); switch ($job->getStatus()) { case Job::STATUS_NEW: $new++; diff --git a/pkg/job-queue/Doctrine/JobStorage.php b/pkg/job-queue/Doctrine/JobStorage.php index 8268dd791..6d20da46f 100644 --- a/pkg/job-queue/Doctrine/JobStorage.php +++ b/pkg/job-queue/Doctrine/JobStorage.php @@ -179,9 +179,12 @@ public function saveJob(Job $job, \Closure $lockCallback = null) } } - public function clearJobCache() + /** + * @param Job $job + */ + public function refreshedJobEntity($job) { - $this->getEntityManager()->clear($this->entityClass); + $this->getEntityManager()->refresh($job); } /** From ca6bd302e1125e1801ba164706ef0cc8965d50f2 Mon Sep 17 00:00:00 2001 From: aigarszuika Date: Wed, 26 May 2021 10:15:15 +0300 Subject: [PATCH 3/4] Changed: Procedure rename --- pkg/job-queue/CalculateRootJobStatusService.php | 2 +- pkg/job-queue/Doctrine/JobStorage.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/job-queue/CalculateRootJobStatusService.php b/pkg/job-queue/CalculateRootJobStatusService.php index a9d62e3b1..41dd350b9 100644 --- a/pkg/job-queue/CalculateRootJobStatusService.php +++ b/pkg/job-queue/CalculateRootJobStatusService.php @@ -69,7 +69,7 @@ protected function calculateRootJobStatus(array $jobs) $success = 0; foreach ($jobs as $job) { - $this->jobStorage->refreshedJobEntity($job); + $this->jobStorage->refreshJobEntity($job); switch ($job->getStatus()) { case Job::STATUS_NEW: $new++; diff --git a/pkg/job-queue/Doctrine/JobStorage.php b/pkg/job-queue/Doctrine/JobStorage.php index 6d20da46f..b584e4022 100644 --- a/pkg/job-queue/Doctrine/JobStorage.php +++ b/pkg/job-queue/Doctrine/JobStorage.php @@ -182,7 +182,7 @@ public function saveJob(Job $job, \Closure $lockCallback = null) /** * @param Job $job */ - public function refreshedJobEntity($job) + public function refreshJobEntity($job) { $this->getEntityManager()->refresh($job); } From 29e0260ca0ec4b0a147cdca5e183f1591f76ddad Mon Sep 17 00:00:00 2001 From: aigarszuika Date: Wed, 26 May 2021 16:40:11 +0300 Subject: [PATCH 4/4] Changed: Fix findJobById return cached value. This issue can affect dependent job schedule if it is done by other enqueue job --- pkg/job-queue/Doctrine/JobStorage.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/job-queue/Doctrine/JobStorage.php b/pkg/job-queue/Doctrine/JobStorage.php index b584e4022..0ee459e50 100644 --- a/pkg/job-queue/Doctrine/JobStorage.php +++ b/pkg/job-queue/Doctrine/JobStorage.php @@ -58,13 +58,18 @@ public function findJobById($id) { $qb = $this->getEntityRepository()->createQueryBuilder('job'); - return $qb + $job = $qb ->addSelect('rootJob') ->leftJoin('job.rootJob', 'rootJob') ->where('job = :id') ->setParameter('id', $id) ->getQuery()->getOneOrNullResult() ; + if ($job) { + $this->refreshJobEntity($job); + } + + return $job; } /**