diff --git a/Domains/Jobs/Job.php b/Domains/Jobs/Job.php
index 043b3e4dea6..c229da28e95 100644
--- a/Domains/Jobs/Job.php
+++ b/Domains/Jobs/Job.php
@@ -19,17 +19,14 @@
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Support\InteractsWithTime;
-
use PKP\config\Config;
use PKP\Domains\Jobs\Traits\Attributes;
-use PKP\Domains\Jobs\Traits\Worker;
use PKP\Support\Database\Model;
class Job extends Model
{
use Attributes;
use InteractsWithTime;
- use Worker;
protected const DEFAULT_MAX_ATTEMPTS = 3;
@@ -182,6 +179,11 @@ public function scopeNotQueue(Builder $query, string $queue): Builder
return $query->where('queue', '!=', $queue);
}
+ public function scopeOnQueue(Builder $query, string $queue): Builder
+ {
+ return $query->where('queue', '=', $queue);
+ }
+
public function scopeNonReserved(Builder $query): Builder
{
return $query->whereNull('reserved_at');
diff --git a/Domains/Jobs/JobRunner.php b/Domains/Jobs/JobRunner.php
new file mode 100644
index 00000000000..b1faf366bd7
--- /dev/null
+++ b/Domains/Jobs/JobRunner.php
@@ -0,0 +1,368 @@
+jobQueue = $jobQueue ?? app('pkpJobQueue');
+ }
+
+ /**
+ * Set the max job number running constrain
+ *
+ * @return self
+ */
+ public function withMaxJobsConstrain(): self
+ {
+ $this->hasMaxJobsConstrain = true;
+
+ if (! $this->getMaxJobsToProcess()) {
+ $this->setMaxJobsToProcess(Config::getVar('queues', 'job_runner_max_jobs', 30));
+ }
+
+ return $this;
+ }
+
+ /**
+ * Set the max allowed jobs number to process
+ *
+ * @param int $maxJobsToProcess
+ * @return self
+ */
+ public function setMaxJobsToProcess(int $maxJobsToProcess): self
+ {
+ $this->maxJobsToProcess = $maxJobsToProcess;
+
+ return $this;
+ }
+
+ /**
+ * Get the max allowed jobs number to process
+ *
+ * @return int|null
+ */
+ public function getMaxJobsToProcess(): ?int
+ {
+ return $this->maxJobsToProcess;
+ }
+
+ /**
+ * Set the max run time constrain when processing jobs
+ *
+ * @return self
+ */
+ public function withMaxExecutionTimeConstrain(): self
+ {
+ $this->hasMaxExecutionTimeConstrain = true;
+
+ if (! $this->getMaxTimeToProcessJobs()) {
+ $this->setMaxTimeToProcessJobs($this->deduceSafeMaxExecutionTime());
+ }
+
+ return $this;
+ }
+
+ /**
+ * Set the max allowed run time to process jobs in SECONDS
+ *
+ * @param int $maxTimeToProcessJobs
+ * @return self
+ */
+ public function setMaxTimeToProcessJobs(int $maxTimeToProcessJobs): self
+ {
+ $this->maxTimeToProcessJobs = $maxTimeToProcessJobs;
+
+ return $this;
+ }
+
+ /**
+ * Get the max allowed run time to process jobs in SECONDS
+ *
+ * @return int|null
+ */
+ public function getMaxTimeToProcessJobs(): ?int
+ {
+ return $this->maxTimeToProcessJobs;
+ }
+
+ /**
+ * Set the max consumable memory constrain when processing jobs
+ *
+ * @return self
+ */
+ public function withMaxMemoryConstrain(): self
+ {
+ $this->hasMaxMemoryConstrain = true;
+
+ if (! $this->getMaxMemoryToConsumed()) {
+ $this->setMaxMemoryToConsumed($this->deduceSafeMaxAllowedMemory());
+ }
+
+ return $this;
+ }
+
+ /**
+ * Set the max allowed consumable memory to process jobs in BYTES
+ *
+ * @param int $maxMemoryToConsumed
+ * @return self
+ */
+ public function setMaxMemoryToConsumed(int $maxMemoryToConsumed): self
+ {
+ $this->maxMemoryToConsumed = $maxMemoryToConsumed;
+
+ return $this;
+ }
+
+ /**
+ * get the max allowed consumable memory to process jobs in BYTES
+ *
+ * @return int|null
+ */
+ public function getMaxMemoryToConsumed(): ?int
+ {
+ return $this->maxMemoryToConsumed;
+ }
+
+ /**
+ * Set constrain to estimate next job possible processing time to allow next job to be processed/run
+ *
+ * @return self
+ */
+ public function withEstimatedTimeToProcessNextJobConstrain(): self
+ {
+ $this->hasEstimatedTimeToProcessNextJobConstrain = true;
+
+ return $this;
+ }
+
+ /**
+ * Process/Run/Execute jobs off CLI
+ *
+ * @param EloquentBuilder|null $jobBuilder
+ * @return bool
+ */
+ public function processJobs(EloquentBuilder $jobBuilder = null): bool
+ {
+ $jobBuilder ??= $this->jobQueue->getJobModelBuilder();
+
+ $jobProcessedCount = 1;
+ $jobProcessingStartTime = time();
+
+ while ($jobBuilder->count()) {
+ if ($this->exceededJobLimit($jobProcessedCount)) {
+ return true;
+ }
+
+ if ($this->exceededTimeLimit($jobProcessingStartTime)) {
+ return true;
+ }
+
+ if ($this->exceededMemoryLimit()) {
+ return true;
+ }
+
+ if ($this->mayExceedMemoryLimitAtNextJob($jobProcessedCount, $jobProcessingStartTime)) {
+ return true;
+ }
+
+ $this->jobQueue->runJobInQueue();
+
+ $jobProcessedCount = $jobProcessedCount + 1;
+ }
+
+ return true;
+ }
+
+ /**
+ * Check if max job has processed or not
+ *
+ * @param int $jobProcessedCount The number of jobs that has processed so far
+ * @return bool
+ */
+ protected function exceededJobLimit(int $jobProcessedCount): bool
+ {
+ if (! $this->hasMaxJobsConstrain) {
+ return false;
+ }
+
+ return $jobProcessedCount >= $this->getMaxJobsToProcess();
+ }
+
+ /**
+ * Check if max run time to process jobs has exceed or not
+ *
+ * @param int $jobProcessingStartTime The start time since job processing has started in seconds
+ * @return bool
+ */
+ protected function exceededTimeLimit(int $jobProcessingStartTime): bool
+ {
+ if (! $this->hasMaxExecutionTimeConstrain) {
+ return false;
+ }
+
+ return (time() - $jobProcessingStartTime) >= $this->getMaxTimeToProcessJobs();
+ }
+
+ /**
+ * Check if memory consumed since job processing started has exceed defined max memory
+ *
+ * @return bool
+ */
+ protected function exceededMemoryLimit(): bool
+ {
+ if (! $this->hasMaxMemoryConstrain) {
+ return false;
+ }
+
+ return memory_get_usage(true) >= $this->getMaxMemoryToConsumed();
+ }
+
+ /**
+ * Estimate if next job processing time will likely exceed defined max processing run time
+ *
+ * @param int $jobProcessedCount The number of jobs that has processed so far
+ * @param int $jobProcessingStartTime The start time since job processing has started in seconds
+ *
+ * @return bool
+ */
+ protected function mayExceedMemoryLimitAtNextJob(int $jobProcessedCount, int $jobProcessingStartTime): bool
+ {
+ if (! $this->hasEstimatedTimeToProcessNextJobConstrain) {
+ return false;
+ }
+
+ // if no max run time constrain set, no point in estimating the likely time exceed
+ if (!$this->hasMaxExecutionTimeConstrain) {
+ return false;
+ }
+
+ $currentTotalExecutionTime = time() - $jobProcessingStartTime;
+ $timePerJob = (int)($currentTotalExecutionTime / $jobProcessedCount);
+ $totalTimeByNextJobComplete = $currentTotalExecutionTime + ($timePerJob * 3);
+
+ return $totalTimeByNextJobComplete > $this->getMaxTimeToProcessJobs();
+ }
+
+ /**
+ * Dedcue possible safe max job processing run time in SECONDS
+ *
+ * It will consider both what defined in the ini file and application config file
+ * and will take a minimum one based on those two values
+ *
+ * @return int
+ */
+ protected function deduceSafeMaxExecutionTime(): int
+ {
+ $maxExecutionTimeSetToINI = ini_get('max_execution_time');
+ $maxExecutionTimeSetToConfig = Config::getVar('queues', 'job_runner_max_execution_time', 20);
+
+ return $maxExecutionTimeSetToINI <= 0
+ ? $maxExecutionTimeSetToConfig
+ : min($maxExecutionTimeSetToINI, $maxExecutionTimeSetToConfig);
+ }
+
+ /**
+ * Dedcue possible safe max consumable memory for job processing in BYTES
+ *
+ * It will consider both of what defined in the ini file and the application config file
+ * and will take a minimum one based on those two values
+ *
+ * In the application config file, max memory can be defined as INT or STRING value in following manner
+ *
+ * If defined as INT (e.g 90), it will be calculated as percentage value of
+ * what currently defined in the ini file.
+ *
+ * If defined as STRING (e.g 128M), it will try to calculate it as memory defined in megabytes
+ * but if failed, will try to cast to INT to apply percentage rule
+ *
+ * @return int
+ */
+ protected function deduceSafeMaxAllowedMemory(): int
+ {
+ $maxMemoryLimitSetToINI = function_exists('ini_get') ? ini_get('memory_limit') : '128M';
+
+ if (!$maxMemoryLimitSetToINI || -1 === $maxMemoryLimitSetToINI || '-1' === $maxMemoryLimitSetToINI) {
+ $maxMemoryLimitSetToINI = '128M'; // Unlimited, set to 128M.
+ }
+
+ $maxMemoryLimitSetToINIInBytes = convertHrToBytes($maxMemoryLimitSetToINI);
+
+ $maxMemoryLimitSetToConfig = Config::getVar('queues', 'job_runner_max_memory', 80);
+
+ $maxMemoryLimitSetToConfigInBytes = in_array(strtolower(substr((string)$maxMemoryLimitSetToConfig, -1)), ['k', 'm', 'g'], true)
+ ? convertHrToBytes($maxMemoryLimitSetToConfig)
+ : (int)($maxMemoryLimitSetToINIInBytes * ((float)((int)$maxMemoryLimitSetToConfig / 100)));
+
+ return min($maxMemoryLimitSetToConfigInBytes, $maxMemoryLimitSetToINIInBytes);
+ }
+}
diff --git a/Domains/Jobs/Providers/JobServiceProvider.php b/Domains/Jobs/Providers/JobServiceProvider.php
deleted file mode 100644
index 255b9add3ee..00000000000
--- a/Domains/Jobs/Providers/JobServiceProvider.php
+++ /dev/null
@@ -1,34 +0,0 @@
-app->bind(
- JobRepositoryInterface::class,
- Repository::class
- );
- }
-}
diff --git a/Domains/Jobs/Traits/Attributes.php b/Domains/Jobs/Traits/Attributes.php
index 321244d9bf3..6600821efcc 100644
--- a/Domains/Jobs/Traits/Attributes.php
+++ b/Domains/Jobs/Traits/Attributes.php
@@ -52,16 +52,16 @@ public function getMaxTriesAttribute(): ?string
}
/**
- * Return the job's delay value
+ * Return the job's backoff value
*
*/
- public function getDelayAttribute(): ?string
+ public function getBackoffAttribute(): ?string
{
- if (!$this->payload['delay']) {
+ if (!$this->payload['backoff']) {
return null;
}
- return $this->payload['delay'];
+ return $this->payload['backoff'];
}
/**
diff --git a/Domains/Jobs/Traits/Worker.php b/Domains/Jobs/Traits/Worker.php
deleted file mode 100644
index c39523a20bf..00000000000
--- a/Domains/Jobs/Traits/Worker.php
+++ /dev/null
@@ -1,171 +0,0 @@
-delay = $value;
-
- return $this;
- }
-
- /**
- * Get the Job's delay value
- */
- public function getDelay(): int
- {
- return $this->delay;
- }
-
- /**
- * The maximum amount of RAM the worker may consume.
- */
- public function setAllowedMemory(int $value): self
- {
- $this->allowedMemory = $value;
-
- return $this;
- }
-
- /**
- * Get Job's allowed memory value
- */
- public function getAllowedMemory(): int
- {
- return $this->allowedMemory;
- }
-
- /**
- * The maximum number of seconds a child worker may run.
- */
- public function setTimeout(int $value): self
- {
- $this->timeout = $value;
-
- return $this;
- }
-
- /**
- * Get Job's timeout value
- */
- public function getTimeout(): int
- {
- return $this->timeout;
- }
-
- /**
- * The number of seconds to wait in between polling the queue.
- */
- public function setSleep(int $value): self
- {
- $this->sleep = $value;
-
- return $this;
- }
-
- /**
- * Get Job's sleep value
- */
- public function getSleep(): int
- {
- return $this->sleep;
- }
-
- /**
- * Indicates if the worker should run in maintenance mode.
- */
- public function setForceFlag(bool $value = false): self
- {
- $this->forceFlag = $value;
-
- return $this;
- }
-
- /**
- * Get Job's force flag value
- */
- public function getForceFlag(): bool
- {
- return $this->forceFlag;
- }
-
- /**
- * Indicates if the worker should stop when queue is empty.
- */
- public function setStopWhenEmptyFlag(bool $value = false): self
- {
- $this->stopWhenEmptyFlag = $value;
-
- return $this;
- }
-
- /**
- * Get Job's stop when empty flag value
- */
- public function getStopWhenEmptyFlag(): bool
- {
- return $this->stopWhenEmptyFlag;
- }
-}
diff --git a/Domains/Jobs/WorkerConfiguration.php b/Domains/Jobs/WorkerConfiguration.php
new file mode 100644
index 00000000000..bde791b5bc2
--- /dev/null
+++ b/Domains/Jobs/WorkerConfiguration.php
@@ -0,0 +1,340 @@
+each(fn($value, $option) => method_exists($self, 'set' . ucfirst($option))
+ ? $self->{'set' . ucfirst($option)}($value)
+ : throw new Exception(sprintf('Unknown option "%s"', $option))
+ );
+
+ return $self;
+ }
+
+ /**
+ * Get the worker options
+ *
+ * @return array
+ */
+ public function getWorkerOptions(): array
+ {
+ return [
+ 'name' => $this->getName(),
+ 'backoff' => $this->getBackoff(),
+ 'memory' => $this->getMemory(),
+ 'timeout' => $this->getTimeout(),
+ 'sleep' => $this->getSleep(),
+ 'maxTries' => $this->getMaxTries(),
+ 'force' => $this->getForce(),
+ 'stopWhenEmpty' => $this->getStopWhenEmpty(),
+ 'maxJobs' => $this->getMaxJobs(),
+ 'maxTime' => $this->getMaxTime(),
+ 'rest' => $this->getRest(),
+ ];
+ }
+
+ /**
+ * Set the worker name
+ */
+ public function setName(string $name): self
+ {
+ $this->name = $name;
+
+ return $this;
+ }
+
+ /**
+ * Get the worker name
+ */
+ public function getName(): string
+ {
+ return $this->name;
+ }
+
+ /**
+ * The number of seconds to wait before retrying the job
+ */
+ public function setBackoff(int $value): self
+ {
+ $this->backoff = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get the number of seconds to wait before retrying the job
+ */
+ public function getBackoff(): int
+ {
+ return $this->backoff;
+ }
+
+ /**
+ * The maximum amount of RAM(in megabytes) the worker may consume.
+ */
+ public function setMemory(int $value): self
+ {
+ $this->memory = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get Job's allowed memory value(in megabytes)
+ */
+ public function getMemory(): int
+ {
+ return $this->memory;
+ }
+
+ /**
+ * The maximum number of seconds a child worker may run.
+ */
+ public function setTimeout(int $value): self
+ {
+ $this->timeout = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get Job's timeout value
+ */
+ public function getTimeout(): int
+ {
+ return $this->timeout;
+ }
+
+ /**
+ * The number of seconds to wait in between polling the queue.
+ */
+ public function setSleep(int $value): self
+ {
+ $this->sleep = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get Job's sleep value
+ */
+ public function getSleep(): int
+ {
+ return $this->sleep;
+ }
+
+ /**
+ * Set the Job's max attempts
+ */
+ public function setMaxTries(int $maxTries): self
+ {
+ $this->maxTries = $maxTries;
+
+ return $this;
+ }
+
+ /**
+ * Get the Job's max attempts
+ */
+ public function getMaxTries(): int
+ {
+ return $this->maxTries;
+ }
+
+ /**
+ * Indicates if the worker should run in maintenance mode.
+ */
+ public function setForce(bool $value = false): self
+ {
+ $this->force = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get Job's force flag value
+ */
+ public function getForce(): bool
+ {
+ return $this->force;
+ }
+
+ /**
+ * Indicates if the worker should stop when queue is empty.
+ */
+ public function setStopWhenEmpty(bool $value = false): self
+ {
+ $this->stopWhenEmpty = $value;
+
+ return $this;
+ }
+
+ /**
+ * Get Job's stop when empty settings value
+ */
+ public function getStopWhenEmpty(): bool
+ {
+ return $this->stopWhenEmpty;
+ }
+
+ /**
+ * Set max number of jobs to process before stopping
+ */
+ public function setMaxJobs(int $maxJobs): self
+ {
+ $this->maxJobs = $maxJobs;
+
+ return $this;
+ }
+
+ /**
+ * Get max number of jobs to process before stopping
+ */
+ public function getMaxJobs(): int
+ {
+ return $this->maxJobs;
+ }
+
+ /**
+ * Set maximum number of seconds the worker should run
+ */
+ public function setMaxTime(int $maxTime): self
+ {
+ $this->maxTime = $maxTime;
+
+ return $this;
+ }
+
+ /**
+ * Get maximum number of seconds the worker should run
+ */
+ public function getMaxTime(): int
+ {
+ return $this->maxTime;
+ }
+
+ /**
+ * Set number of seconds to rest between jobs
+ */
+ public function setRest(int $rest): self
+ {
+ $this->rest = $rest;
+
+ return $this;
+ }
+
+ /**
+ * Get number of seconds to rest between jobs
+ */
+ public function getRest(): int
+ {
+ return $this->rest;
+ }
+}
\ No newline at end of file
diff --git a/Jobs/Doi/DepositContext.php b/Jobs/Doi/DepositContext.php
index be0ce7a61c0..5cca9fa63cf 100644
--- a/Jobs/Doi/DepositContext.php
+++ b/Jobs/Doi/DepositContext.php
@@ -52,9 +52,7 @@ public function handle()
$context = $contextDao->getById($this->contextId);
if (!$context) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
// NB: Only run at context level if automatic deposit is enabled. Otherwise, automatic deposit will always run,
diff --git a/Jobs/Doi/DepositSubmission.php b/Jobs/Doi/DepositSubmission.php
index a3c7fabfe62..307116f9a68 100644
--- a/Jobs/Doi/DepositSubmission.php
+++ b/Jobs/Doi/DepositSubmission.php
@@ -56,10 +56,9 @@ public function handle()
$submission = Repo::submission()->get($this->submissionId);
if (!$submission || !$this->agency) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
- $retResults = $this->agency->depositSubmissions([$submission], $this->context);
+
+ $this->agency->depositSubmissions([$submission], $this->context);
}
}
diff --git a/Jobs/Metadata/BatchMetadataChangedJob.php b/Jobs/Metadata/BatchMetadataChangedJob.php
index 967018e0bd4..d3c874f52da 100644
--- a/Jobs/Metadata/BatchMetadataChangedJob.php
+++ b/Jobs/Metadata/BatchMetadataChangedJob.php
@@ -63,9 +63,7 @@ public function handle(): void
}
if (!$successful) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
$submissionSearchIndex->submissionChangesFinished();
diff --git a/Jobs/Metadata/MetadataChangedJob.php b/Jobs/Metadata/MetadataChangedJob.php
index 1b64c877d43..28293fcd084 100644
--- a/Jobs/Metadata/MetadataChangedJob.php
+++ b/Jobs/Metadata/MetadataChangedJob.php
@@ -50,9 +50,7 @@ public function handle(): void
$submission = Repo::submission()->get($this->submissionId);
if (!$submission) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
$submissionSearchIndex = Application::getSubmissionSearchIndex();
diff --git a/Jobs/Notifications/NewAnnouncementMailUsers.php b/Jobs/Notifications/NewAnnouncementMailUsers.php
index 37c10945b46..a5973fbbbbb 100644
--- a/Jobs/Notifications/NewAnnouncementMailUsers.php
+++ b/Jobs/Notifications/NewAnnouncementMailUsers.php
@@ -54,7 +54,7 @@ public function handle()
$announcement = Repo::announcement()->get($this->announcementId);
// Announcement was removed
if (!$announcement) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
+ throw new JobException(JobException::INVALID_PAYLOAD);
return;
}
diff --git a/Jobs/Notifications/NewAnnouncementNotifyUsers.php b/Jobs/Notifications/NewAnnouncementNotifyUsers.php
index f1c3c19dfe1..53bbc9f3106 100644
--- a/Jobs/Notifications/NewAnnouncementNotifyUsers.php
+++ b/Jobs/Notifications/NewAnnouncementNotifyUsers.php
@@ -45,8 +45,7 @@ public function handle()
$announcement = Repo::announcement()->get($this->announcementId);
// Announcement was removed
if (!$announcement) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
$announcementNotificationManager = new AnnouncementNotificationManager(Notification::NOTIFICATION_TYPE_NEW_ANNOUNCEMENT);
diff --git a/Jobs/Submissions/RemoveSubmissionFileFromSearchIndexJob.php b/Jobs/Submissions/RemoveSubmissionFileFromSearchIndexJob.php
index 1ac711be34f..dd901a01cca 100644
--- a/Jobs/Submissions/RemoveSubmissionFileFromSearchIndexJob.php
+++ b/Jobs/Submissions/RemoveSubmissionFileFromSearchIndexJob.php
@@ -51,9 +51,7 @@ public function handle(): void
$submissionFile = Repo::submissionFile()->get($this->submissionFileId);
if (!$submissionFile) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
$submissionSearchIndex = Application::getSubmissionSearchIndex();
diff --git a/Jobs/Submissions/RemoveSubmissionFromSearchIndexJob.php b/Jobs/Submissions/RemoveSubmissionFromSearchIndexJob.php
index cd4e9073d7b..bb8fef5696d 100644
--- a/Jobs/Submissions/RemoveSubmissionFromSearchIndexJob.php
+++ b/Jobs/Submissions/RemoveSubmissionFromSearchIndexJob.php
@@ -50,9 +50,7 @@ public function handle(): void
$submission = Repo::submission()->get($this->submissionId);
if (!$submission) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
$submissionSearchIndex = Application::getSubmissionSearchIndex();
diff --git a/Jobs/Submissions/UpdateSubmissionSearchJob.php b/Jobs/Submissions/UpdateSubmissionSearchJob.php
index 124c7a4d941..368fbaa88d2 100644
--- a/Jobs/Submissions/UpdateSubmissionSearchJob.php
+++ b/Jobs/Submissions/UpdateSubmissionSearchJob.php
@@ -51,9 +51,7 @@ public function handle(): void
$submission = Repo::submission()->get($this->submissionId);
if (!$submission) {
- $this->failed(new JobException(JobException::INVALID_PAYLOAD));
-
- return;
+ throw new JobException(JobException::INVALID_PAYLOAD);
}
if ($submission->getData('status') !== PKPSubmission::STATUS_PUBLISHED) {
diff --git a/Support/Jobs/BaseJob.php b/Support/Jobs/BaseJob.php
index 1574a8af070..b84b80db739 100644
--- a/Support/Jobs/BaseJob.php
+++ b/Support/Jobs/BaseJob.php
@@ -19,15 +19,12 @@
use APP\core\Application;
use Illuminate\Bus\Queueable;
-
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use PKP\config\Config;
-use Throwable;
-
abstract class BaseJob implements ShouldQueue
{
use Dispatchable;
@@ -65,20 +62,4 @@ protected function defaultConnection(): string
}
abstract public function handle();
-
- public function failed(Throwable $e)
- {
- $jobArr = '';
-
- if ($this->job) {
- $jobArr = (string) $this->job->getRawBody();
- }
-
- app('queue.failer')->log(
- $this->connection,
- $this->queue,
- $jobArr,
- $e
- );
- }
}
diff --git a/Support/Jobs/Entities/TestJob.php b/Support/Jobs/Entities/TestJobFailure.php
similarity index 55%
rename from Support/Jobs/Entities/TestJob.php
rename to Support/Jobs/Entities/TestJobFailure.php
index ab2e51d1cbd..55d395616d1 100644
--- a/Support/Jobs/Entities/TestJob.php
+++ b/Support/Jobs/Entities/TestJobFailure.php
@@ -3,27 +3,31 @@
declare(strict_types=1);
/**
- * @file Support/Jobs/Entities/TestJob.php
+ * @file Support/Jobs/Entities/TestJobFailure.php
*
- * Copyright (c) 2014-2021 Simon Fraser University
- * Copyright (c) 2000-2021 John Willinsky
+ * Copyright (c) 2014-2022 Simon Fraser University
+ * Copyright (c) 2000-2022 John Willinsky
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
*
- * @class TestJob
+ * @class TestJobFailure
* @ingroup support_jobs_entities
*
- * @brief Example TestJob with a valid FQN (@see https://www.php.net/manual/pt_BR/language.namespaces.rules.php)
+ * @brief Example failed TestJob with a valid FQN (@see https://www.php.net/manual/pt_BR/language.namespaces.rules.php)
*/
namespace PKP\Support\Jobs\Entities;
use Exception;
-
+use Illuminate\Bus\Batchable;
use PKP\Domains\Jobs\Job;
use PKP\Support\Jobs\BaseJob;
-class TestJob extends BaseJob
+class TestJobFailure extends BaseJob
{
+ use Batchable;
+
+ public $tries = 1;
+
public function __construct()
{
$this->connection = config('queue.default');
diff --git a/Support/Jobs/Entities/TestJobSuccess.php b/Support/Jobs/Entities/TestJobSuccess.php
new file mode 100644
index 00000000000..a4470a2303b
--- /dev/null
+++ b/Support/Jobs/Entities/TestJobSuccess.php
@@ -0,0 +1,40 @@
+connection = config('queue.default');
+ $this->queue = Job::TESTING_QUEUE;
+ }
+
+ public function handle(): void
+ {
+
+ }
+}
diff --git a/classes/core/PKPContainer.php b/classes/core/PKPContainer.php
index 6fe8d03b529..373fd61d8f3 100644
--- a/classes/core/PKPContainer.php
+++ b/classes/core/PKPContainer.php
@@ -16,10 +16,8 @@
namespace PKP\core;
use APP\core\AppServiceProvider;
-
use Exception;
use Illuminate\Config\Repository;
-
use Illuminate\Container\Container;
use Illuminate\Contracts\Console\Kernel as KernelContract;
use Illuminate\Contracts\Debug\ExceptionHandler;
@@ -28,7 +26,6 @@
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
use Illuminate\Support\Facades\Facade;
use PKP\config\Config;
-use PKP\Domains\Jobs\Providers\JobServiceProvider;
use PKP\i18n\LocaleServiceProvider;
use PKP\Support\ProxyParser;
@@ -91,6 +88,10 @@ public function renderForConsole($output, Throwable $e)
Kernel::class
);
+ $this->singleton('pkpJobQueue', function ($app) {
+ return new PKPQueueProvider($app);
+ });
+
$this->singleton(
'queue.failer',
function ($app) {
@@ -112,16 +113,13 @@ public function registerConfiguredProviders()
{
// Load main settings, this should be done before registering services, e.g., it's used by Database Service
$this->loadConfiguration();
-
$this->register(new PKPEventServiceProvider($this));
$this->register(new LogServiceProvider($this));
$this->register(new \Illuminate\Database\DatabaseServiceProvider($this));
$this->register(new \Illuminate\Bus\BusServiceProvider($this));
- $this->register(new \Illuminate\Queue\QueueServiceProvider($this));
- $this->register(new PKPQueueProvider());
+ $this->register(new PKPQueueProvider($this));
$this->register(new MailServiceProvider($this));
$this->register(new AppServiceProvider($this));
- $this->register(new JobServiceProvider($this));
$this->register(new \Illuminate\Cache\CacheServiceProvider($this));
$this->register(new \Illuminate\Filesystem\FilesystemServiceProvider($this));
$this->register(new \ElcoBvg\Opcache\ServiceProvider($this));
@@ -347,6 +345,16 @@ public function runningUnitTests()
{
return false;
}
+
+ /**
+ * Determine if the application is currently down for maintenance.
+ *
+ * @return bool
+ */
+ public function isDownForMaintenance()
+ {
+ return PKPApplication::isUnderMaintenance();
+ }
}
if (!PKP_STRICT_MODE) {
diff --git a/classes/core/PKPQueueProvider.php b/classes/core/PKPQueueProvider.php
index 63e71c7071f..3252c5825fc 100644
--- a/classes/core/PKPQueueProvider.php
+++ b/classes/core/PKPQueueProvider.php
@@ -17,54 +17,162 @@
namespace PKP\core;
-use APP\core\Application;
+use Illuminate\Support\Facades\Facade;
+use Illuminate\Queue\Events\JobFailed;
+use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\WorkerOptions;
-
-use PKP\config\Config;
+use Illuminate\Database\Eloquent\Builder;
+use Illuminate\Queue\QueueServiceProvider as IlluminateQueueServiceProvider;
+use Illuminate\Contracts\Debug\ExceptionHandler;
+use Illuminate\Queue\Worker;
use PKP\Domains\Jobs\Job as PKPJobModel;
+use PKP\Domains\Jobs\Interfaces\JobRepositoryInterface;
+use PKP\Domains\Jobs\Repositories\Job as JobRepository;
+use PKP\Domains\Jobs\WorkerConfiguration;
-class PKPQueueProvider
-{
- public function runJobsAtShutdown(): void
- {
- $disableRun = Config::getVar('queues', 'disable_jobs_run_at_shutdown', false);
+class PKPQueueProvider extends IlluminateQueueServiceProvider
+{
+ /**
+ * Specific queue to target to run the associated jobs
+ */
+ protected ?string $queue = null;
- if ($disableRun || Application::isUnderMaintenance()) {
- return;
- }
+ /**
+ * Set a specific queue to target to run the associated jobs
+ */
+ public function forQueue(string $queue) : self
+ {
+ $this->queue = $queue;
+
+ return $this;
+ }
- $job = PKPJobModel::isAvailable()
- ->notExceededAttempts()
+ /**
+ * Get a job model builder instance to query the jobs table
+ */
+ public function getJobModelBuilder() : Builder
+ {
+ return PKPJobModel::isAvailable()
->nonEmptyQueue()
- ->notQueue(PKPJobModel::TESTING_QUEUE)
- ->limit(1)
- ->first();
+ ->when($this->queue, fn($query) => $query->onQueue($this->queue))
+ ->when(is_null($this->queue), fn($query) => $query->notQueue(PKPJobModel::TESTING_QUEUE))
+ ->notExceededAttempts();
+ }
+
+ /**
+ * Get the worker options object
+ */
+ public function getWorkerOptions(array $options = []): WorkerOptions
+ {
+ return new WorkerOptions(
+ ...array_values(WorkerConfiguration::withOptions($options)->getWorkerOptions())
+ );
+ }
+
+ /**
+ * Run the queue worker via an infinite loop daemon
+ */
+ public function runJobsViaDaemon(string $connection, string $queue, array $workerOptions = []): void
+ {
+ $laravelContainer = PKPContainer::getInstance();
+
+ $laravelContainer['queue.worker']->daemon(
+ $connection,
+ $queue,
+ $this->getWorkerOptions($workerOptions)
+ );
+ }
+
+ /**
+ * Run the queue worker to process queue the jobs
+ */
+ public function runJobInQueue(): void
+ {
+ $job = $this->getJobModelBuilder()->limit(1)->first();
if ($job === null) {
return;
}
$laravelContainer = PKPContainer::getInstance();
- $options = new WorkerOptions(
- 'default',
- $job->getDelay(),
- $job->getAllowedMemory(),
- $job->getTimeout(),
- $job->getSleep(),
- $job->getMaxAttempts(),
- $job->getForceFlag(),
- $job->getStopWhenEmptyFlag(),
- );
$laravelContainer['queue.worker']->runNextJob(
'database',
$job->queue,
- $options
+ $this->getWorkerOptions()
);
}
+ /**
+ * Bootstrap any application services.
+ *
+ * @return void
+ */
+ public function boot()
+ {
+ Queue::failing(function (JobFailed $event) {
+
+ error_log($event->exception->__toString());
+
+ app('queue.failer')->log(
+ $event->connectionName,
+ $event->job->getQueue(),
+ $event->job->getRawBody(),
+ $event->exception
+ );
+ });
+ }
+
+ /**
+ * Register the service provider.
+ *
+ * @return void
+ */
public function register()
{
- register_shutdown_function([$this, 'runJobsAtShutdown']);
+ parent::register();
+
+ $this->registerDatabaseConnector(app()->get(\Illuminate\Queue\QueueManager::class));
+
+ $this->app->bind(
+ JobRepositoryInterface::class,
+ JobRepository::class
+ );
+ }
+
+ /**
+ * Register the queue worker.
+ *
+ * @return void
+ */
+ protected function registerWorker()
+ {
+ $this->app->singleton('queue.worker', function ($app) {
+ $isDownForMaintenance = function () {
+ return $this->app->isDownForMaintenance();
+ };
+
+ $resetScope = function () use ($app) {
+ if (method_exists($app['db'], 'getConnections')) {
+ foreach ($app['db']->getConnections() as $connection) {
+ $connection->resetTotalQueryDuration();
+ $connection->allowQueryDurationHandlersToRunAgain();
+ }
+ }
+
+ $app->forgetScopedInstances();
+
+ return Facade::clearResolvedInstances();
+ };
+
+ return new Worker(
+ $app['queue'],
+ $app['events'],
+ $app[ExceptionHandler::class],
+ $isDownForMaintenance,
+ $resetScope
+ );
+ });
}
+
}
diff --git a/classes/core/PKPRequest.php b/classes/core/PKPRequest.php
index d69dcb7b621..48d6971794a 100644
--- a/classes/core/PKPRequest.php
+++ b/classes/core/PKPRequest.php
@@ -103,6 +103,13 @@ public function setDispatcher($dispatcher)
*/
public function &getDispatcher()
{
+ if ( ! $this->_dispatcher ) {
+
+ $application = Application::get();
+
+ $this->setDispatcher($application->getDispatcher());
+ }
+
return $this->_dispatcher;
}
diff --git a/classes/scheduledTask/ScheduledTaskHelper.php b/classes/scheduledTask/ScheduledTaskHelper.php
index ea43a3f6d25..df744405eea 100644
--- a/classes/scheduledTask/ScheduledTaskHelper.php
+++ b/classes/scheduledTask/ScheduledTaskHelper.php
@@ -140,6 +140,14 @@ public static function checkFrequency($className, $frequency)
}
}
+ if ($isValid) {
+ // Check second
+ $second = $frequency->getAttribute('second');
+ if (isset($second)) {
+ $isValid = self::_isInRange($second, (int)date('s'), $lastRunTime, 'sec', strtotime('-1 minute'), strtotime('-1 second'));
+ }
+ }
+
return $isValid;
}
diff --git a/classes/task/ProcessQueueJobs.php b/classes/task/ProcessQueueJobs.php
new file mode 100644
index 00000000000..dd13a74f826
--- /dev/null
+++ b/classes/task/ProcessQueueJobs.php
@@ -0,0 +1,74 @@
+getJobModelBuilder();
+
+ if ($jobBuilder->count() <= 0) {
+ return true;
+ }
+
+ // Run queue jobs on CLI
+ if( runOnCLI('runScheduledTasks.php') ) {
+
+ while ($jobBuilder->count()) {
+ $jobQueue->runJobInQueue();
+ }
+
+ return true;
+ }
+
+ // Run queue jobs off CLI
+ if ( Config::getVar('queues', 'job_runner', false) ) {
+
+ (new JobRunner($jobQueue))
+ ->withMaxExecutionTimeConstrain()
+ ->withMaxJobsConstrain()
+ ->withMaxMemoryConstrain()
+ ->withEstimatedTimeToProcessNextJobConstrain()
+ ->processJobs($jobBuilder);
+ }
+
+ return true;
+ }
+}
diff --git a/cypress/tests/integration/Jobs.spec.js b/cypress/tests/integration/Jobs.spec.js
index 2c40c61285e..4089142ad4e 100644
--- a/cypress/tests/integration/Jobs.spec.js
+++ b/cypress/tests/integration/Jobs.spec.js
@@ -11,8 +11,10 @@
it('Check if Jobs page is alive and with contents', function() {
cy.login('admin', 'admin', 'publicknowledge');
- // Add two test jobs on queue
- cy.exec('php lib/pkp/tools/jobs.php test');
+ // purge all existing jobs in any of the queues
+ cy.exec('php lib/pkp/tools/jobs.php purge --all');
+
+ // Add two test jobs[successable and failable] on queue
cy.exec('php lib/pkp/tools/jobs.php test');
cy.get('a:contains("Administration")').click();
@@ -24,6 +26,14 @@
.should('have.length', 2)
.should('be.visible');
+ // purge all existing jobs in the test queue
cy.exec('php lib/pkp/tools/jobs.php purge --queue=queuedTestJob');
+
+ cy.reload();
+ cy.waitJQuery();
+
+ cy.get('.pkpTable')
+ .find('span:contains("queuedTestJob")')
+ .should('have.length', 0);
});
})
diff --git a/includes/functions.php b/includes/functions.php
index e363c88636b..eaf9964524f 100644
--- a/includes/functions.php
+++ b/includes/functions.php
@@ -15,6 +15,18 @@
use PKP\config\Config;
+/*
+ * Constants for expressing human-readable data sizes in their respective number of bytes.
+ */
+define( 'KB_IN_BYTES', 1024 );
+define( 'MB_IN_BYTES', 1024 * KB_IN_BYTES );
+define( 'GB_IN_BYTES', 1024 * MB_IN_BYTES );
+define( 'TB_IN_BYTES', 1024 * GB_IN_BYTES );
+define( 'PB_IN_BYTES', 1024 * TB_IN_BYTES );
+define( 'EB_IN_BYTES', 1024 * PB_IN_BYTES );
+define( 'ZB_IN_BYTES', 1024 * EB_IN_BYTES );
+define( 'YB_IN_BYTES', 1024 * ZB_IN_BYTES );
+
/**
* Emulate a Java-style import statement.
* Simply includes the associated PHP file (using require_once so multiple calls to include the same file have no effect).
@@ -345,3 +357,61 @@ function __p(string $key, int $number, array $replace = [], ?string $locale = nu
{
return trans_choice($key, $number, $replace, $locale);
}
+
+/**
+ * Check if run on CLI
+ */
+if (!function_exists('runOnCLI'))
+{
+ function runOnCLI(string $scriptPath = null): bool
+ {
+ if ( php_sapi_name() && strtolower(php_sapi_name()) === 'cli') {
+ return true;
+ }
+
+ if ($scriptPath) {
+
+ $serverVars = $_SERVER;
+
+ if (isset($serverVars['SCRIPT_NAME']) && strpos(strtolower($serverVars['SCRIPT_NAME']), strtolower($scriptPath)) !== false) {
+ return true;
+ }
+
+ if (isset($serverVars['SCRIPT_FILENAME']) && strpos(strtolower($serverVars['SCRIPT_FILENAME']), strtolower($scriptPath)) !== false) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
+
+/**
+ * Converts a shorthand byte value to an integer byte value.
+ *
+ * @link https://secure.php.net/manual/en/function.ini-get.php
+ * @link https://secure.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
+ *
+ * @param string A (PHP ini) byte value, either shorthand or ordinary.
+ * @return int An integer byte value.
+ */
+if (!function_exists('convertHrToBytes'))
+{
+ function convertHrToBytes(string $value): int
+ {
+ $value = strtolower( trim( $value ) );
+ $bytes = (int) $value;
+
+ if ( false !== strpos( $value, 'g' ) ) {
+ $bytes *= GB_IN_BYTES;
+ } elseif ( false !== strpos( $value, 'm' ) ) {
+ $bytes *= MB_IN_BYTES;
+ } elseif ( false !== strpos( $value, 'k' ) ) {
+ $bytes *= KB_IN_BYTES;
+ }
+
+ // Deal with large (float) values which run into the maximum integer size.
+ return min( $bytes, PHP_INT_MAX );
+ }
+}
+
diff --git a/locale/en_US/admin.po b/locale/en_US/admin.po
index a7e9a879f9a..91cb988956e 100644
--- a/locale/en_US/admin.po
+++ b/locale/en_US/admin.po
@@ -164,6 +164,9 @@ msgstr "Subscription expiry reminder"
msgid "admin.scheduledTask.removeUnvalidatedExpiredUsers"
msgstr "Remove unvalidated expired users"
+msgid "admin.scheduledTask.processQueueJobs"
+msgstr "Process pending queue jobs"
+
msgid "admin.scheduledTask.updateGeoDB"
msgstr "Update DB-IP city lite database"
@@ -457,6 +460,9 @@ msgstr "command [arguments]"
msgid "admin.cli.tool.available.commands"
msgstr "Available commands for the `{$namespace}` namespace:"
+msgid "admin.cli.tool.jobs.maintenance.message"
+msgstr "Can not run queue jobs because the application maintenance mode is enabled."
+
msgid "admin.cli.tool.jobs.available.options.list.description"
msgstr "List all queued jobs. If you want to paginate results, use the parameters --page= and --perPage="
@@ -464,7 +470,19 @@ msgid "admin.cli.tool.jobs.available.options.purge.description"
msgstr "Purge a specific queued job based on his Id. If you would like to purge all, pass the parameter --all. If you would like to purge all from a specific queue, pass the parameter --queue="
msgid "admin.cli.tool.jobs.available.options.test.description"
-msgstr "Add a test job into the default queue"
+msgstr "Add a test job into the test queue. use the optional parameters --only= and pass 'failed' or 'success' to dispatch failable or successful job."
+
+msgid "admin.cli.tool.jobs.available.options.work.description"
+msgstr "Run an infinite daemon worker process that will continue to process jobs. Use --help flag to see available options associated with this command."
+
+msgid "admin.cli.tool.jobs.available.options.run.description"
+msgstr "Dispatch available jobs into the queue. if you would like to dispatch a job into a specific queue, pass the parameter --queue=QUEUE_NAME. Also can just pass --test to run the test job and --once to run one job at a time"
+
+msgid "admin.cli.tool.jobs.available.options.run.completed.description"
+msgstr "Completed running {$jobCount} jobs in the queue named {$queueName}."
+
+msgid "admin.cli.tool.jobs.available.options.run.empty.description"
+msgstr "No jobs available to run in the queue named {$queueName}."
msgid "admin.cli.tool.jobs.available.options.total.description"
msgstr "Display the queued jobs quantity"
@@ -517,6 +535,15 @@ msgstr "Next"
msgid "admin.cli.tool.jobs.purge.without.id"
msgstr "You should pass at least a Job ID, '--all' or '--queue=' to use this command"
+msgid "admin.cli.tool.jobs.test.job.failed.dispatch.message"
+msgstr "Dispatched test job that is bound to failed in queue named {$queueName}"
+
+msgid "admin.cli.tool.jobs.test.job.success.dispatch.message"
+msgstr "Dispatched test job that is bound to success in queue named {$queueName}"
+
+msgid "admin.cli.tool.jobs.test.invalid.option"
+msgstr "Invalid test job option. It can only take 'failed' or 'success' for '--only=' option".
+
msgid "admin.cli.tool.jobs.purge.invalid.id"
msgstr "Invalid job ID"
@@ -648,3 +675,54 @@ msgstr ""
msgid "admin.settings.statistics.sushiPlatform.sushiPlatformID.required"
msgstr "A platform ID must be required when the site will be identified as the SUSHI platform."
+
+msgid "admin.cli.tool.jobs.work.options.title"
+msgstr "work command options"
+
+msgid "admin.cli.tool.jobs.work.options.usage"
+msgstr "usage : [--options1 --options2= ...]"
+
+msgid "admin.cli.tool.jobs.work.options.description"
+msgstr "Available options for work command"
+
+msgid "admin.cli.tool.jobs.work.option.connection.description"
+msgstr "The name of the queue connection to work[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.queue.description"
+msgstr "The name of the queue to work[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.name.description"
+msgstr "The name of the worker[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.backoff.description"
+msgstr "The number of seconds to wait before retrying a job that encountered an uncaught exception[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.memory.description"
+msgstr "The memory limit in megabytes[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.timeout.description"
+msgstr "The number of seconds a child process can run[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.sleep.description"
+msgstr "Number of seconds to sleep when no job is available[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.tries.description"
+msgstr "Number of times to attempt a job before logging it failed[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.force.description"
+msgstr "Force the worker to run even in maintenance mode[default : {$default}]"
+
+msgid "admin.cli.tool.jobs.work.option.stopWhenEmpty.description"
+msgstr "Stop when the queue is empty[default : {$default}]"
+
+msgid "admin.cli.tool.jobs.work.option.maxJobs.description"
+msgstr "The number of jobs to process before stopping[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.maxTime.description"
+msgstr "The maximum number of seconds the worker should run[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.rest.description"
+msgstr "Number of seconds to rest between jobs[default : '{$default}']"
+
+msgid "admin.cli.tool.jobs.work.option.test.description"
+msgstr "Run the worker deamon for test queue"
diff --git a/tests/classes/core/PKPJobTest.php b/tests/classes/core/PKPJobTest.php
new file mode 100644
index 00000000000..8e6dc273916
--- /dev/null
+++ b/tests/classes/core/PKPJobTest.php
@@ -0,0 +1,131 @@
+originalErrorLog = ini_get('error_log');
+ $this->tmpErrorLog = tmpfile();
+
+ ini_set('error_log', stream_get_meta_data($this->tmpErrorLog)['uri']);
+ }
+
+ /**
+ * @see PKPTestCase::tearDown()
+ */
+ protected function tearDown(): void
+ {
+ ini_set('error_log', $this->originalErrorLog);
+
+ parent::tearDown();
+ }
+
+ /**
+ * @covers Job exception handling
+ */
+ public function testJobExceptionOnSync()
+ {
+ $this->expectException(Exception::class);
+
+ TestJobFailure::dispatchSync();
+ }
+
+ /**
+ * @covers Job dispatching
+ */
+ public function testJobDispatch()
+ {
+ Bus::fake();
+
+ TestJobFailure::dispatch();
+ TestJobSuccess::dispatch();
+
+ Bus::assertDispatched(TestJobFailure::class);
+ Bus::assertDispatched(TestJobSuccess::class);
+ }
+
+ /**
+ * @covers Job dispatching in chain
+ */
+ public function testJobDispatchInChain()
+ {
+ Bus::fake();
+
+ Bus::chain([
+ new TestJobFailure(),
+ new TestJobSuccess(),
+ ])->dispatch();
+
+ Bus::assertChained([
+ TestJobFailure::class,
+ TestJobSuccess::class,
+ ]);
+ }
+
+ /**
+ * @covers Job dispatching in batch
+ */
+ public function testJobDispatchInBatch()
+ {
+ Bus::fake();
+
+ Bus::batch([
+ new TestJobSuccess(),
+ new TestJobSuccess(),
+ new TestJobFailure(),
+ new TestJobFailure(),
+ ])->name('test-jobs')->dispatch();
+
+ Bus::assertBatched(function(PendingBatch $batch) {
+ return $batch->name === 'test-jobs' && $batch->jobs->count() === 4;
+ });
+ }
+
+ /**
+ * @covers Queue Worker
+ */
+ public function testPuttingJobsAtQueue()
+ {
+ Queue::fake();
+
+ $queue = Config::getVar('queues', 'default_queue', 'php-unit');
+
+ $jobContent = 'exampleContent';
+
+ Queue::push($jobContent, [], $queue);
+
+ Queue::assertPushedOn($queue, $jobContent);
+ }
+}
\ No newline at end of file
diff --git a/tests/classes/queues/QueueTest.php b/tests/classes/queues/QueueTest.php
deleted file mode 100644
index c08ef0389fb..00000000000
--- a/tests/classes/queues/QueueTest.php
+++ /dev/null
@@ -1,77 +0,0 @@
-configData = Config::getData();
-
- if ($this->configData['queues']['disable_jobs_run_at_shutdown']) {
- $this->markTestSkipped("Cannot test queues with the config [queues].disable_jobs_run_at_shutdown enabled.");
- }
-
- $this->originalErrorLog = ini_get('error_log');
- $this->tmpErrorLog = tmpfile();
- ini_set(
- 'error_log',
- stream_get_meta_data($this->tmpErrorLog)['uri']
- );
- }
-
- /**
- * @see PKPTestCase::tearDown()
- */
- protected function tearDown(): void
- {
- ini_set(
- 'error_log',
- $this->originalErrorLog
- );
- parent::tearDown();
- }
-
- /**
- * @covers Queue Worker
- */
- public function testPuttingJobsAtQueue()
- {
- Queue::fake();
-
- $queue = $this->configData['queues']['default_queue'] ?? 'php-unit';
-
- $jobContent = 'exampleContent';
-
- Queue::push($jobContent, [], $queue);
-
- Queue::assertPushedOn($queue, $jobContent);
- }
-}
diff --git a/tools/jobs.php b/tools/jobs.php
index 4ff3da40f55..46b5ccaa181 100644
--- a/tools/jobs.php
+++ b/tools/jobs.php
@@ -17,11 +17,21 @@
namespace PKP\tools;
+use APP\core\Application;
use APP\facades\Repo;
+use Carbon\Carbon;
+use Illuminate\Queue\Events\JobFailed;
+use Illuminate\Queue\Events\JobProcessed;
+use Illuminate\Queue\Events\JobProcessing;
+use Illuminate\Contracts\Queue\Job;
use Illuminate\Console\Concerns\InteractsWithIO;
use Illuminate\Console\OutputStyle;
use PKP\cliTool\CommandLineTool;
-use PKP\Support\Jobs\Entities\TestJob;
+use PKP\config\Config;
+use PKP\Domains\Jobs\Job as PKPJobModel;
+use PKP\Domains\Jobs\WorkerConfiguration;
+use PKP\Support\Jobs\Entities\TestJobFailure;
+use PKP\Support\Jobs\Entities\TestJobSuccess;
use Symfony\Component\Console\Exception\CommandNotFoundException;
use Symfony\Component\Console\Exception\InvalidArgumentException as CommandInvalidArgumentException;
use Symfony\Component\Console\Exception\LogicException;
@@ -70,6 +80,8 @@ class commandJobs extends CommandLineTool
'test' => 'admin.cli.tool.jobs.available.options.test.description',
'total' => 'admin.cli.tool.jobs.available.options.total.description',
'help' => 'admin.cli.tool.jobs.available.options.help.description',
+ 'run' => 'admin.cli.tool.jobs.available.options.run.description',
+ 'work' => 'admin.cli.tool.jobs.available.options.work.description',
'usage' => 'admin.cli.tool.jobs.available.options.usage.description',
];
@@ -155,14 +167,23 @@ public function setParameterList(array $items): self
/**
* Get the parameter list passed on CLI
- *
+ *
+ * @return array|null
*/
public function getParameterList(): ?array
{
return $this->parameterList;
}
- protected function getParameterValue(string $parameter, string $default = null): ?string
+ /**
+ * Get the value of a specific parameter
+ *
+ * @param string $parameter
+ * @param mixed $default
+ *
+ * @return mixed
+ */
+ protected function getParameterValue(string $parameter, mixed $default = null): mixed
{
if (!isset($this->getParameterList()[$parameter])) {
return $default;
@@ -180,19 +201,7 @@ public function usage()
$this->getCommandInterface()->line(__('admin.cli.tool.usage.parameters') . PHP_EOL);
$this->getCommandInterface()->line('' . __('admin.cli.tool.available.commands', ['namespace' => 'jobs']) . '');
- $width = $this->getColumnWidth(array_keys(self::AVAILABLE_OPTIONS));
-
- foreach (self::AVAILABLE_OPTIONS as $commandName => $description) {
- $spacingWidth = $width - Helper::width($commandName);
- $this->getCommandInterface()->line(
- sprintf(
- ' %s%s%s',
- $commandName,
- str_repeat(' ', $spacingWidth),
- __($description)
- )
- );
- }
+ $this->printUsage(self::AVAILABLE_OPTIONS);
}
/**
@@ -292,6 +301,95 @@ protected function list(): void
);
}
+ /**
+ * Run daemon worker process to continue handle jobs
+ */
+ protected function work(): void
+ {
+ $parameterList = $this->getParameterList();
+
+ if (in_array('--help', $parameterList)) {
+ $this->workerOptionsHelp();
+ return;
+ }
+
+ if ( Application::isUnderMaintenance() ) {
+ $this->getCommandInterface()->getOutput()->error(__('admin.cli.tool.jobs.maintenance.message'));
+ return;
+ }
+
+ $connection = $parameterList['connection'] ?? Config::getVar('queues', 'default_connection', 'database');
+ $queue = $parameterList['queue'] ?? Config::getVar('queues', 'default_queue', 'queue');
+
+ if (in_array('--test', $parameterList)) {
+ $queue = PKPJobModel::TESTING_QUEUE;
+ }
+
+ $this->listenForEvents();
+
+ app('pkpJobQueue')->runJobsViaDaemon(
+ $connection,
+ $queue,
+ $this->gatherWorkerOptions($parameterList)
+ );
+ }
+
+ /**
+ * Dispatch jobs into the queue
+ */
+ protected function run(): void
+ {
+ if ( Application::isUnderMaintenance() ) {
+ $this->getCommandInterface()->getOutput()->error(__('admin.cli.tool.jobs.maintenance.message'));
+ return;
+ }
+
+ $parameterList = $this->getParameterList();
+
+ $queue = $parameterList['queue'] ?? Config::getVar('queues', 'default_queue', 'queue');
+
+ if (in_array('--test', $parameterList)) {
+ $queue = PKPJobModel::TESTING_QUEUE;
+ }
+
+ $jobQueue = app('pkpJobQueue');
+
+ if ($queue && is_string($queue)) {
+ $jobQueue = $jobQueue->forQueue($queue);
+ }
+
+ $jobBuilder = $jobQueue->getJobModelBuilder();
+
+ if (($jobCount = $jobBuilder->count()) <= 0) {
+ $this->getCommandInterface()->getOutput()->info(
+ __(
+ 'admin.cli.tool.jobs.available.options.run.empty.description',
+ ['queueName' => $queue,]
+ )
+ );
+
+ return;
+ }
+
+ $this->listenForEvents();
+
+ while ($jobBuilder->count()) {
+ $jobQueue->runJobInQueue();
+
+ if ( in_array('--once', $parameterList) ) {
+ $jobCount = 1;
+ break;
+ }
+ }
+
+ $this->getCommandInterface()->getOutput()->success(
+ __(
+ 'admin.cli.tool.jobs.available.options.run.completed.description',
+ ['jobCount' => $jobCount, 'queueName' => $queue,]
+ )
+ );
+ }
+
/**
* Purge queued jobs
*/
@@ -356,9 +454,157 @@ protected function purgeAllJobsFromQueue(string $queue): void
*/
protected function test(): void
{
- dispatch(new TestJob());
+ $queue = PKPJobModel::TESTING_QUEUE;
+ $runnableJob = $this->getParameterList()['only'] ?? null;
+
+ if ($runnableJob && !in_array($runnableJob, ['failed', 'success'])) {
+ throw new CommandInvalidArgumentException(__('admin.cli.tool.jobs.test.invalid.option'));
+ }
+
+ if (!$runnableJob || $runnableJob === 'failed') {
+ dispatch(new TestJobFailure());
+
+ $this->getCommandInterface()->getOutput()->success(__('admin.cli.tool.jobs.test.job.failed.dispatch.message', ['queueName' => $queue]));
+ }
+
+ if (!$runnableJob || $runnableJob === 'success') {
+ dispatch(new TestJobSuccess());
+
+ $this->getCommandInterface()->getOutput()->success(__('admin.cli.tool.jobs.test.job.success.dispatch.message', ['queueName' => $queue]));
+ }
+ }
+
+ /**
+ * Gather worker daemon options
+ *
+ * @param array $parameters
+ * @return array
+ */
+ protected function gatherWorkerOptions(array $parameters = []): array
+ {
+ $workerConfig = new WorkerConfiguration;
+
+ return [
+ 'name' => $this->getParameterValue('--name', $workerConfig->getName()),
+ 'backoff' => $this->getParameterValue('--backoff', $workerConfig->getBackoff()),
+ 'memory' => $this->getParameterValue('--memory', $workerConfig->getMemory()),
+ 'timeout' => $this->getParameterValue('--timeout', $workerConfig->getTimeout()),
+ 'sleep' => $this->getParameterValue('--sleep', $workerConfig->getSleep()),
+ 'maxTries' => $this->getParameterValue('--tries', $workerConfig->getMaxTries()),
+ 'force' => $this->getParameterValue('--force', in_array('--force', $parameters) ? true : $workerConfig->getForce()),
+ 'stopWhenEmpty' => $this->getParameterValue('--stop-when-empty', in_array('--stop-when-empty', $parameters) ? true : $workerConfig->getStopWhenEmpty()),
+ 'maxJobs' => $this->getParameterValue('--max-jobs', $workerConfig->getMaxJobs()),
+ 'maxTime' => $this->getParameterValue('--max-time', $workerConfig->getMaxTime()),
+ 'rest' => $this->getParameterValue('--rest', $workerConfig->getRest()),
+ ];
+ }
+
+ /**
+ * Listen for the queue events in order to update the console output.
+ *
+ * @return void
+ */
+ protected function listenForEvents(): void
+ {
+ $events = app()['events'];
+
+ $events->listen(JobProcessing::class, function ($event) {
+ $this->writeOutput($event->job, 'starting');
+ });
+
+ $events->listen(JobProcessed::class, function ($event) {
+ $this->writeOutput($event->job, 'success');
+ });
+
+ $events->listen(JobFailed::class, function ($event) {
+ $this->writeOutput($event->job, 'failed');
+ });
+ }
+
+ /**
+ * Write the status output for the queue worker.
+ *
+ * @param \Illuminate\Contracts\Queue\Job $job
+ * @param string $status
+ *
+ * @return void
+ */
+ protected function writeOutput(Job $job, $status): void
+ {
+ match($status) {
+ 'starting' => $this->writeStatus($job, 'Processing', 'comment'),
+ 'success' => $this->writeStatus($job, 'Processed', 'info'),
+ 'failed' => $this->writeStatus($job, 'Failed', 'error'),
+ };
+ }
+
+ /**
+ * Format the status output for the queue worker.
+ *
+ * @param \Illuminate\Contracts\Queue\Job $job
+ * @param string $status
+ * @param string $type
+ * @return void
+ */
+ protected function writeStatus(Job $job, $status, $type): void
+ {
+ $this->getCommandInterface()->getOutput()->writeln(sprintf(
+ "<{$type}>[%s][%s] %s{$type}> %s",
+ Carbon::now()->format('Y-m-d H:i:s'),
+ $job->getJobId(),
+ str_pad("{$status}:", 11), $job->resolveName()
+ ));
+ }
+
+ /**
+ * Print work command options information.
+ */
+ protected function workerOptionsHelp(): void
+ {
+ $this->getCommandInterface()->line('' . __('admin.cli.tool.jobs.work.options.title') . '');
+ $this->getCommandInterface()->line(__('admin.cli.tool.jobs.work.options.usage') . PHP_EOL);
+ $this->getCommandInterface()->line('' . __('admin.cli.tool.jobs.work.options.description') . '');
+
+ $workerConfig = new WorkerConfiguration;
+
+ $options = [
+ '--connection[=CONNECTION]' => __('admin.cli.tool.jobs.work.option.connection.description', ['default' => Config::getVar('queue', 'default_connection', 'database')]),
+ '--queue[=QUEUE]' => __('admin.cli.tool.jobs.work.option.queue.description', ['default' => Config::getVar('queue', 'default_queue', 'queue')]),
+ '--name[=NAME]' => __('admin.cli.tool.jobs.work.option.name.description', ['default' => $workerConfig->getName()]),
+ '--backoff[=BACKOFF]' => __('admin.cli.tool.jobs.work.option.backoff.description', ['default' => $workerConfig->getBackoff()]),
+ '--memory[=MEMORY]' => __('admin.cli.tool.jobs.work.option.memory.description', ['default' => $workerConfig->getMemory()]),
+ '--timeout[=TIMEOUT]' => __('admin.cli.tool.jobs.work.option.timeout.description', ['default' => $workerConfig->getTimeout()]),
+ '--sleep[=SLEEP]' => __('admin.cli.tool.jobs.work.option.sleep.description', ['default' => $workerConfig->getSleep()]),
+ '--tries[=TRIES]' => __('admin.cli.tool.jobs.work.option.tries.description', ['default' => $workerConfig->getMaxTries()]),
+ '--force' => __('admin.cli.tool.jobs.work.option.force.description', ['default' => $workerConfig->getForce() ? 'true' : 'false']),
+ '--stop-when-empty' => __('admin.cli.tool.jobs.work.option.stopWhenEmpty.description', ['default' => $workerConfig->getStopWhenEmpty() ? 'true' : 'false']),
+ '--max-jobs[=MAX-JOBS]' => __('admin.cli.tool.jobs.work.option.maxJobs.description', ['default' => $workerConfig->getMaxJobs()]),
+ '--max-time[=MAX-TIME]' => __('admin.cli.tool.jobs.work.option.maxTime.description', ['default' => $workerConfig->getMaxTime()]),
+ '--rest[=REST]' => __('admin.cli.tool.jobs.work.option.rest.description', ['default' => $workerConfig->getRest()]),
+ '--test' => __('admin.cli.tool.jobs.work.option.test.description'),
+ ];
+
+ $this->printUsage($options, false);
+ }
+
+ /**
+ * Print given options in a pretty way.
+ */
+ protected function printUsage(array $options, bool $shouldTranslate = true): void
+ {
+ $width = $this->getColumnWidth(array_keys($options));
- $this->getCommandInterface()->getOutput()->success('Dispatched job!');
+ foreach ($options as $commandName => $description) {
+ $spacingWidth = $width - Helper::width($commandName);
+ $this->getCommandInterface()->line(
+ sprintf(
+ ' %s%s%s',
+ $commandName,
+ str_repeat(' ', $spacingWidth),
+ $shouldTranslate ? __($description) : $description
+ )
+ );
+ }
}
/**