diff --git a/composer.json b/composer.json
index 1150e48..41ea76b 100644
--- a/composer.json
+++ b/composer.json
@@ -4,17 +4,20 @@
"type": "cakephp-plugin",
"require": {
"php": "^7.0",
- "cakephp/cakephp": "^3.4",
+ "cakephp/cakephp": "^3.6",
"cakephp/migrations": "^1.5",
"php-amqplib/php-amqplib": "^2.5"
},
"require-dev": {
- "cakephp/app": "^3.4",
+ "cakephp/app": "^3.6",
"phpunit/phpunit": "^5.7",
"sizuhiko/cake_fabricate": "^0.2.1",
"squizlabs/php_codesniffer": "^3.0",
"cakephp/cakephp-codesniffer": "^3.0.0"
},
+ "suggest": {
+ "dereuromark/cakephp-ide-helper": "For maximum IDE support, especially around enqueue() usage."
+ },
"scripts": {
"cs-check": "phpcs --colors -p -s --extensions=php,ctp --standard=vendor/cakephp/cakephp-codesniffer/CakePHP src/",
"cs-fix": "phpcbf --colors -p -s --extensions=php,ctp --standard=vendor/cakephp/cakephp-codesniffer/CakePHP src/"
diff --git a/config/bootstrap.php b/config/bootstrap.php
deleted file mode 100644
index 6ec4ecf..0000000
--- a/config/bootstrap.php
+++ /dev/null
@@ -1,37 +0,0 @@
- [
- 'maxRetries' => 5,
- 'pulseTime' => 6 * 60 * 60,
- 'priority' => 100,
- ],
- 'default' => [
- 'maxRetries' => 5,
- ],
- 'archive' => [
- 'enabled' => false,
- 'tableName' => 'delayed_jobs_archive',
- 'recurring' => 'tomorrow 00:30',
- 'timeLimit' => '90 days',
- ]
-];
-
-$delayedJobsConfig = Configure::read('DelayedJobs');
-
-Configure::write('DelayedJobs', \Cake\Utility\Hash::merge($defaultConfig, $delayedJobsConfig));
-
-\Cake\Database\Type::map('serialize', \DelayedJobs\Database\Type\SerializeType::class);
-
-\DelayedJobs\RecurringJobBuilder::add([
- 'worker' => 'DelayedJobs.Archive',
- 'priority' => Configure::read('maximum.priority')
-]);
diff --git a/src/Broker/RabbitMqBroker.php b/src/Broker/RabbitMqBroker.php
index 45968cb..6ae42f7 100644
--- a/src/Broker/RabbitMqBroker.php
+++ b/src/Broker/RabbitMqBroker.php
@@ -71,7 +71,7 @@ public function publishJob(Job $job)
{
$delay = $job->getRunAt()->isFuture() ? Time::now()->diffInSeconds($job->getRunAt(), false) * 1000 : 0;
- $jobPriority = $this->_manager->config('maximum.priority') - $job->getPriority();
+ $jobPriority = $this->_manager->getConfig('maximum.priority') - $job->getPriority();
if ($jobPriority < 0) {
$jobPriority = 0;
} elseif ($jobPriority > 255) {
@@ -86,6 +86,7 @@ public function publishJob(Job $job)
];
$this->getDriver()->publishJob($jobData);
+ $job->setPushedToBroker(true);
}
/**
diff --git a/src/Datasource/TableDatasource.php b/src/Datasource/TableDatasource.php
index 66c62fe..2f0d3e6 100644
--- a/src/Datasource/TableDatasource.php
+++ b/src/Datasource/TableDatasource.php
@@ -26,7 +26,7 @@ class TableDatasource extends BaseDatasource
*/
protected function _table(): DatastoreInterface
{
- return $this->tableLocator()
+ return $this->getTableLocator()
->get($this->getConfig('tableName'));
}
diff --git a/src/DelayedJob/DebugKitJobManager.php b/src/DelayedJob/DebugKitJobManager.php
new file mode 100644
index 0000000..1d47866
--- /dev/null
+++ b/src/DelayedJob/DebugKitJobManager.php
@@ -0,0 +1,72 @@
+jobLog = $config['debugKitLog'];
+ unset($config['debugKitLog']);
+
+ parent::__construct($config, $datastore, $messageBroker);
+ }
+
+ /**
+ * @param \DelayedJobs\DelayedJob\Job $job The job instance
+ * @return void
+ */
+ protected function pushToLog(Job $job)
+ {
+ $jobData = [
+ 'id' => $job->getId(),
+ 'worker' => $job->getWorker(),
+ 'sequence' => $job->getSequence(),
+ 'payload' => $job->getPayload(),
+ 'priority' => $job->getPriority(),
+ 'pushedToBroker' => $job->isPushedToBroker(),
+ ];
+ $this->jobLog[] = $jobData;
+ }
+
+ /**
+ * @param \DelayedJobs\DelayedJob\Job $job Job that needs to be enqueued
+ * @param bool $skipPersist Skip the persistance step (e.g. it's already been persisted
+ * @return void
+ */
+ public function enqueue(Job $job, bool $skipPersist = false)
+ {
+ parent::enqueue($job, $skipPersist);
+
+ $this->pushToLog($job);
+ }
+
+ /**
+ * @param array $jobs
+ * @return void
+ */
+ public function enqueueBatch(array $jobs)
+ {
+ parent::enqueueBatch($jobs);
+
+ foreach ($jobs as $job) {
+ $this->pushToLog($job);
+ }
+ }
+}
diff --git a/src/DelayedJob/EnqueueTrait.php b/src/DelayedJob/EnqueueTrait.php
index 9cf3830..1290475 100644
--- a/src/DelayedJob/EnqueueTrait.php
+++ b/src/DelayedJob/EnqueueTrait.php
@@ -27,7 +27,7 @@ public function enqueue($worker, $payload = null, array $options = []): Job
->setData($options);
}
- JobManager::instance()
+ JobManager::getInstance()
->enqueue($job);
return $job;
@@ -62,7 +62,7 @@ public function enqueueBatch($worker, array $jobsToEnqueue, array $options = [])
$jobs[] = $job;
}
- JobManager::instance()
+ JobManager::getInstance()
->enqueueBatch($jobs);
return $jobs;
diff --git a/src/DelayedJob/Job.php b/src/DelayedJob/Job.php
index eb54bed..45a2960 100644
--- a/src/DelayedJob/Job.php
+++ b/src/DelayedJob/Job.php
@@ -120,6 +120,12 @@ class Job
* @var bool
*/
protected $_manualRun = false;
+ /**
+ * Indicates that this job instance was pushed to the job broker
+ *
+ * @var bool
+ */
+ protected $_pushedToBroker = false;
/**
* Job constructor.
@@ -754,4 +760,23 @@ public function setBrokerMessageBody($brokerMessageBody): Job
return $this;
}
+
+ /**
+ * @return bool
+ */
+ public function isPushedToBroker(): bool
+ {
+ return $this->_pushedToBroker;
+ }
+
+ /**
+ * @param bool $pushedToBroker
+ * @return $this
+ */
+ public function setPushedToBroker(bool $pushedToBroker): Job
+ {
+ $this->_pushedToBroker = $pushedToBroker;
+
+ return $this;
+ }
}
diff --git a/src/DelayedJob/JobManager.php b/src/DelayedJob/JobManager.php
index de8987a..bbe6087 100644
--- a/src/DelayedJob/JobManager.php
+++ b/src/DelayedJob/JobManager.php
@@ -94,18 +94,23 @@ public function __construct(
}
/**
- * Returns the globally available instance of a \DelayedJobs\DelayedJobs\DelayedJobsManager
+ * Set as the globally available instance
*
- * If called with the first parameter, it will be set as the globally available instance
+ * @param \DelayedJobs\DelayedJob\ManagerInterface|null $manager The manager interface to inject
+ * @return void
+ */
+ public static function setInstance(?ManagerInterface $manager)
+ {
+ static::$_instance = $manager;
+ }
+
+ /**
+ * Returns the globally available instance of a \DelayedJobs\DelayedJobs\JobsManager
*
- * @param \DelayedJobs\DelayedJob\ManagerInterface $manager Delayed jobs instance.
* @return \DelayedJobs\DelayedJob\ManagerInterface the global delayed jobs manager
*/
- public static function instance(ManagerInterface $manager = null): ManagerInterface
+ public static function getInstance(): ManagerInterface
{
- if ($manager instanceof ManagerInterface) {
- static::$_instance = $manager;
- }
if (empty(static::$_instance)) {
static::$_instance = new self(Configure::read('DelayedJobs'));
}
@@ -376,10 +381,10 @@ protected function _handleResult(ResultInterface $result, $duration)
protected function _dispatchWorkerEvent(JobWorkerInterface $jobWorker, $name, $data = null, $subject = null): Event
{
$event = new Event($name, $subject ?? $this, $data);
- $this->eventManager()
+ $this->getEventManager()
->dispatch($event);
if ($jobWorker instanceof EventDispatcherInterface) {
- $jobWorker->eventManager()
+ $jobWorker->getEventManager()
->dispatch($event);
}
diff --git a/src/Generator/Task/WorkerTask.php b/src/Generator/Task/WorkerTask.php
new file mode 100644
index 0000000..13902c1
--- /dev/null
+++ b/src/Generator/Task/WorkerTask.php
@@ -0,0 +1,46 @@
+collectWorkers();
+ $map = [];
+ foreach ($workers as $worker) {
+ $map[$worker] = '\\' . static::CLASS_JOB . '::class';
+ }
+
+ $result['\\' . static::CLASS_JOB . '::enqueue(0)'] = $map;
+
+ return $result;
+ }
+
+ /**
+ * @return string[]
+ */
+ protected function collectWorkers()
+ {
+ $result = [];
+ $workerFinder = new WorkerFinder();
+ $workers = $workerFinder->allAppAndPluginWorkers();
+ sort($workers);
+
+ return $workers;
+ }
+}
diff --git a/src/Model/Table/DelayedJobsTable.php b/src/Model/Table/DelayedJobsTable.php
index 3d3b48a..58b3e8a 100644
--- a/src/Model/Table/DelayedJobsTable.php
+++ b/src/Model/Table/DelayedJobsTable.php
@@ -37,9 +37,9 @@ public function initialize(array $config)
*/
protected function _initializeSchema(TableSchema $table)
{
- $table->columnType('payload', 'serialize');
- $table->columnType('options', 'serialize');
- $table->columnType('history', 'json');
+ $table->setColumnType('payload', 'serialize');
+ $table->setColumnType('options', 'serialize');
+ $table->setColumnType('history', 'json');
return parent::_initializeSchema($table);
}
@@ -144,11 +144,11 @@ public function persistJobs(array $jobs): array
$connection = $this->getConnection();
$quote = $connection
- ->driver()
- ->autoQuoting();
+ ->getDriver()
+ ->isAutoQuotingEnabled();
$connection
- ->driver()
- ->autoQuoting(true);
+ ->getDriver()
+ ->enableAutoQuoting();
$connection->transactional(function () use ($query, &$jobs) {
$statement = $query->execute();
$firstId = $statement->lastInsertId($this->getTable(), 'id');
@@ -159,8 +159,8 @@ public function persistJobs(array $jobs): array
return true;
});
- $connection->driver()
- ->autoQuoting($quote);
+ $connection->getDriver()
+ ->enableAutoQuoting($quote);
if (!$jobs) {
throw new EnqueueException('Job batch could not be persisted');
@@ -196,7 +196,6 @@ public function fetchNextSequence(Job $job)
return null;
}
- $this->getConnection()->driver()->autoQuoting(false);
$next = $this->find()
->select([
'id',
@@ -235,11 +234,11 @@ public function fetchNextSequence(Job $job)
public function isSimilarJob(Job $job): bool
{
$quoting = $this->getConnection()
- ->driver()
- ->autoQuoting();
+ ->getDriver()
+ ->isAutoQuotingEnabled();
$this->getConnection()
- ->driver()
- ->autoQuoting(true);
+ ->getDriver()
+ ->enableAutoQuoting();
$conditions = [
'worker' => $job->getWorker(),
@@ -259,8 +258,8 @@ public function isSimilarJob(Job $job): bool
$exists = $this->exists($conditions);
$this->getConnection()
- ->driver()
- ->autoQuoting($quoting);
+ ->getDriver()
+ ->enableAutoQuoting($quoting);
return $exists;
}
@@ -271,11 +270,11 @@ public function isSimilarJob(Job $job): bool
public function beforeSave()
{
$this->_quote = $this->getConnection()
- ->driver()
- ->autoQuoting();
+ ->getDriver()
+ ->isAutoQuotingEnabled();
$this->getConnection()
- ->driver()
- ->autoQuoting(true);
+ ->getDriver()
+ ->enableAutoQuoting();
}
/**
@@ -284,7 +283,7 @@ public function beforeSave()
public function afterSave()
{
$this->getConnection()
- ->driver()
- ->autoQuoting($this->_quote);
+ ->getDriver()
+ ->enableAutoQuoting($this->_quote);
}
}
diff --git a/src/Panel/JobsPanel.php b/src/Panel/JobsPanel.php
new file mode 100644
index 0000000..87ceff8
--- /dev/null
+++ b/src/Panel/JobsPanel.php
@@ -0,0 +1,63 @@
+jobLog = new \ArrayObject;
+ $jobManager = new DebugKitJobManager(Configure::read('DelayedJobs') + [
+ 'debugKitLog' => $log
+ ]);
+ JobManager::setInstance($jobManager);
+ }
+
+ /**
+ * Get the data this panel wants to store.
+ *
+ * @return array
+ */
+ public function data()
+ {
+ return [
+ 'jobs' => isset($this->jobLog) ? $this->jobLog->getArrayCopy() : []
+ ];
+ }
+
+ /**
+ * Get summary data from the queries run.
+ *
+ * @return string
+ */
+ public function summary()
+ {
+ if (empty($this->jobLog)) {
+ return '';
+ }
+
+ return (string)count($this->jobLog);
+ }
+}
diff --git a/src/Plugin.php b/src/Plugin.php
new file mode 100644
index 0000000..255f160
--- /dev/null
+++ b/src/Plugin.php
@@ -0,0 +1,56 @@
+ [
+ 'maxRetries' => 5,
+ 'pulseTime' => 6 * 60 * 60,
+ 'priority' => 100,
+ ],
+ 'default' => [
+ 'maxRetries' => 5,
+ ],
+ 'archive' => [
+ 'enabled' => false,
+ 'tableName' => 'delayed_jobs_archive',
+ 'timeLimit' => '90 days',
+ 'recurring' => 'tomorrow 00:30'
+ ]
+ ];
+
+ $delayedJobsConfig = Configure::read('DelayedJobs');
+
+ Configure::write('DelayedJobs', Hash::merge($defaultConfig, $delayedJobsConfig));
+
+ Type::map('serialize', SerializeType::class);
+
+ RecurringJobBuilder::add([
+ 'worker' => 'DelayedJobs.Archive',
+ 'priority' => Configure::read('maximum.priority')
+ ]);
+
+ // For IdeHelper plugin if in use - make sure to run `bin/cake phpstorm generate` then
+ $generatorTasks = (array)Configure::read('IdeHelper.generatorTasks');
+ $generatorTasks[] = WorkerTask::class;
+ Configure::write('IdeHelper.generatorTasks', $generatorTasks);
+ }
+}
diff --git a/src/Shell/MonitorShell.php b/src/Shell/MonitorShell.php
index d996865..00f20bd 100644
--- a/src/Shell/MonitorShell.php
+++ b/src/Shell/MonitorShell.php
@@ -281,7 +281,7 @@ protected function _basicStatsWithCharts()
protected function _rabbitStats()
{
- $rabbit_status = JobManager::instance()
+ $rabbit_status = JobManager::getInstance()
->getMessageBroker()
->queueStatus();
if (empty($rabbit_status)) {
@@ -304,7 +304,7 @@ protected function _rabbitStatsWithCharts()
$max_length = 50;
- $rabbit_status = JobManager::instance()
+ $rabbit_status = JobManager::getInstance()
->getMessageBroker()
->queueStatus();
if (empty($rabbit_status)) {
diff --git a/src/Shell/Task/ProcessManagerTask.php b/src/Shell/Task/ProcessManagerTask.php
index 40d94c6..bbea6fd 100644
--- a/src/Shell/Task/ProcessManagerTask.php
+++ b/src/Shell/Task/ProcessManagerTask.php
@@ -156,7 +156,7 @@ public function handleKillSignals()
{
$callback = function ($signo) {
$event = new Event('CLI.signal', null, compact('signo'));
- $this->eventManager()->dispatch($event);
+ $this->getEventManager()->dispatch($event);
};
$this->signal('SIGHUP', $callback);
diff --git a/src/Shell/Task/WorkerTask.php b/src/Shell/Task/WorkerTask.php
index b76c87b..04b9493 100644
--- a/src/Shell/Task/WorkerTask.php
+++ b/src/Shell/Task/WorkerTask.php
@@ -37,7 +37,7 @@ public function main()
$this->out('
= __d('debug_kit', 'ID') ?> | += __d('debug_kit', 'Worker') ?> | += __d('debug_kit', 'Sequence') ?> | += __d('debug_kit', 'Priority') ?> | += __d('debug_kit', 'Is queued?') ?> | +
---|---|---|---|---|
= $job['id'] ?> | += $job['worker'] ?> | += $job['sequence'] ?: 'None' ?> | += $job['priority'] ?> | += $job['pushedToBroker'] ? '✔' : '✕' ?> | +