From 032da0715bac89b5ffaa93ab68d429fa15c3d695 Mon Sep 17 00:00:00 2001 From: Evgeniy Moiseenko Date: Mon, 4 Dec 2023 17:42:28 +0300 Subject: [PATCH] Get Psalm error level to 1 (#505) --- psalm.xml | 10 ++++-- src/JobEvent.php | 4 +-- src/LogBehavior.php | 18 ++++++++-- src/Queue.php | 2 +- src/cli/Action.php | 1 + src/cli/Command.php | 9 +++-- src/cli/Queue.php | 13 +++---- src/cli/SignalLoop.php | 6 ++-- src/cli/VerboseBehavior.php | 11 +++--- src/closure/Behavior.php | 3 +- src/closure/Job.php | 6 +++- src/debug/Panel.php | 14 +++++--- src/debug/views/detail.php | 4 ++- src/debug/views/summary.php | 5 ++- src/drivers/amqp_interop/Command.php | 3 +- src/drivers/amqp_interop/Queue.php | 14 ++++---- src/drivers/beanstalk/Command.php | 4 +-- src/drivers/beanstalk/InfoAction.php | 2 +- src/drivers/beanstalk/Queue.php | 2 +- src/drivers/db/Command.php | 3 +- src/drivers/db/InfoAction.php | 3 +- src/drivers/db/Queue.php | 4 ++- src/drivers/file/Command.php | 3 +- src/drivers/file/InfoAction.php | 7 ++-- src/drivers/file/Queue.php | 33 +++++++++++++++-- src/drivers/gearman/Command.php | 3 +- src/drivers/redis/Command.php | 3 +- src/drivers/redis/InfoAction.php | 13 ++++--- src/drivers/redis/Queue.php | 12 ++++++- src/drivers/sqs/Command.php | 3 +- src/drivers/sqs/Payload.php | 47 +++++++++++++++++++++++++ src/drivers/sqs/Queue.php | 38 +++++++++++--------- src/drivers/stomp/Command.php | 3 +- src/drivers/stomp/Queue.php | 6 ++-- src/drivers/sync/Queue.php | 7 +++- src/gii/Generator.php | 18 ++++++---- src/gii/default/job.php | 8 +++-- src/gii/form.php | 11 ++++-- src/serializers/IgbinarySerializer.php | 2 +- src/serializers/JsonSerializer.php | 2 +- src/serializers/PhpSerializer.php | 2 +- src/serializers/SerializerInterface.php | 2 +- 42 files changed, 253 insertions(+), 111 deletions(-) create mode 100644 src/drivers/sqs/Payload.php diff --git a/psalm.xml b/psalm.xml index c9de05bc3..d92109732 100644 --- a/psalm.xml +++ b/psalm.xml @@ -1,6 +1,6 @@ - - + + + @@ -24,4 +25,7 @@ + + + diff --git a/src/JobEvent.php b/src/JobEvent.php index e994161ef..fab622150 100644 --- a/src/JobEvent.php +++ b/src/JobEvent.php @@ -25,9 +25,9 @@ abstract class JobEvent extends Event */ public $name = ''; /** - * @var Queue|null|object + * @var Queue * @inheritdoc - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public $sender; /** diff --git a/src/LogBehavior.php b/src/LogBehavior.php index 5f44acf4a..08df14df8 100644 --- a/src/LogBehavior.php +++ b/src/LogBehavior.php @@ -97,9 +97,14 @@ public function afterError(ExecEvent $event): void */ public function workerStart(cli\WorkerEvent $event): void { - $title = 'Worker ' . $event->sender->getWorkerPid(); + $workerPid = $event->sender->getWorkerPid(); + if (null === $workerPid) { + $workerPid = '{PID not found}'; + } + $title = 'Worker ' . $workerPid; Yii::info("$title is started.", Queue::class); Yii::beginProfile($title, Queue::class); + if ($this->autoFlush) { Yii::getLogger()->flush(true); } @@ -111,9 +116,14 @@ public function workerStart(cli\WorkerEvent $event): void */ public function workerStop(cli\WorkerEvent $event): void { - $title = 'Worker ' . $event->sender->getWorkerPid(); + $workerPid = $event->sender->getWorkerPid(); + if (null === $workerPid) { + $workerPid = '{PID not found}'; + } + $title = 'Worker ' . $workerPid; Yii::endProfile($title, Queue::class); Yii::info("$title is stopped.", Queue::class); + if ($this->autoFlush) { Yii::getLogger()->flush(true); } @@ -139,7 +149,9 @@ protected function getExecTitle(ExecEvent $event): string { $title = $this->getJobTitle($event); $extra = "attempt: $event->attempt"; - if ($pid = $event->sender?->getWorkerPid()) { + + $pid = $event->sender->getWorkerPid(); + if (null !== $pid) { $extra .= ", PID: $pid"; } return "$title ($extra)"; diff --git a/src/Queue.php b/src/Queue.php index 93c01f744..212886fa9 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -224,7 +224,7 @@ protected function handleMessage(int|string $id, string $message, int $ttr, int return $this->handleError($event); } try { - /** @psalm-suppress PossiblyUndefinedMethod */ + /** @psalm-suppress PossiblyUndefinedMethod, MixedMethodCall */ $event->result = $event->job?->execute($this); } catch (Throwable $error) { $event->error = $error; diff --git a/src/cli/Action.php b/src/cli/Action.php index 5e90ac7b8..081a6a159 100644 --- a/src/cli/Action.php +++ b/src/cli/Action.php @@ -51,6 +51,7 @@ public function init(): void /** * @param string $string * @return string + * @psalm-suppress MixedInferredReturnType, MixedReturnStatement */ protected function format(string $string): string { diff --git a/src/cli/Command.php b/src/cli/Command.php index f5fd0e9da..af7cd5e75 100644 --- a/src/cli/Command.php +++ b/src/cli/Command.php @@ -124,8 +124,12 @@ public function beforeAction($action): bool if ($this->phpBinary === null) { $this->phpBinary = PHP_BINARY; } - /** @psalm-suppress MissingClosureReturnType */ - $this->queue->messageHandler = function (int|string|null $id, string $message, int $ttr, int $attempt) { + $this->queue->messageHandler = function ( + int|string|null $id, + string $message, + int $ttr, + int $attempt + ): bool { return $this->handleMessage($id, $message, $ttr, $attempt); }; } @@ -178,6 +182,7 @@ protected function handleMessage(int|string|null $id, string $message, ?int $ttr foreach ($this->getPassedOptions() as $name) { if (in_array($name, $this->options('exec'), true)) { + /** @psalm-suppress MixedOperand */ $cmd[] = '--' . $name . '=' . $this->$name; } } diff --git a/src/cli/Queue.php b/src/cli/Queue.php index 9dac8a5bd..c041dfff1 100644 --- a/src/cli/Queue.php +++ b/src/cli/Queue.php @@ -63,7 +63,7 @@ abstract class Queue extends BaseQueue implements BootstrapInterface * @var int|null current process ID of a worker. * @since 2.0.2 */ - private ?int $_workerPid = null; + private ?int $workerPid = null; /** * @return string command id @@ -72,7 +72,7 @@ protected function getCommandId(): string { foreach (Yii::$app->getComponents(false) as $id => $component) { if ($component === $this) { - return Inflector::camel2id($id); + return Inflector::camel2id((string)$id); } } throw new InvalidConfigException('Queue must be an application component.'); @@ -100,7 +100,7 @@ public function bootstrap($app): void */ protected function runWorker(callable $handler): ?int { - $this->_workerPid = getmypid(); + $this->workerPid = getmypid(); /** @var LoopInterface $loop */ $loop = Yii::createObject($this->loopConfig, [$this]); @@ -117,7 +117,7 @@ protected function runWorker(callable $handler): ?int }); } finally { $this->trigger(self::EVENT_WORKER_STOP, $event); - $this->_workerPid = null; + $this->workerPid = null; } return $event->exitCode; @@ -132,11 +132,12 @@ protected function runWorker(callable $handler): ?int */ public function getWorkerPid(): ?int { - return $this->_workerPid; + return $this->workerPid; } /** * @inheritdoc + * @psalm-suppress MixedReturnStatement, MixedInferredReturnType */ protected function handleMessage(int|string $id, string $message, int $ttr, int $attempt): bool { @@ -158,7 +159,7 @@ protected function handleMessage(int|string $id, string $message, int $ttr, int */ public function execute(string $id, string $message, int $ttr, int $attempt, ?int $workerPid): bool { - $this->_workerPid = $workerPid; + $this->workerPid = $workerPid; return parent::handleMessage($id, $message, $ttr, $attempt); } } diff --git a/src/cli/SignalLoop.php b/src/cli/SignalLoop.php index 46d8897ae..5165117f3 100644 --- a/src/cli/SignalLoop.php +++ b/src/cli/SignalLoop.php @@ -21,7 +21,7 @@ class SignalLoop extends BaseObject implements LoopInterface { /** - * @var array of signals to exit from listening of the queue. + * @var array of signals to exit from listening of the queue. */ public array $exitSignals = [ 15, // SIGTERM @@ -30,12 +30,12 @@ class SignalLoop extends BaseObject implements LoopInterface 1, // SIGHUP ]; /** - * @var array of signals to suspend listening of the queue. + * @var array of signals to suspend listening of the queue. * For example: SIGTSTP */ public array $suspendSignals = []; /** - * @var array of signals to resume listening of the queue. + * @var array of signals to resume listening of the queue. * For example: SIGCONT */ public array $resumeSignals = []; diff --git a/src/cli/VerboseBehavior.php b/src/cli/VerboseBehavior.php index 184930bc7..e4dd27613 100644 --- a/src/cli/VerboseBehavior.php +++ b/src/cli/VerboseBehavior.php @@ -36,13 +36,13 @@ class VerboseBehavior extends Behavior public Controller $command; /** - * @var float|null timestamp + * @var float timestamp */ - private ?float $jobStartedAt = null; + private float $jobStartedAt = 0; /** - * @var int|null timestamp + * @var int timestamp */ - private ?int $workerStartedAt = null; + private int $workerStartedAt = 0; /** * @inheritdoc @@ -121,7 +121,8 @@ protected function jobTitle(ExecEvent $event): string { $name = $event->job instanceof JobInterface ? get_class($event->job) : 'unknown job'; $extra = "attempt: $event->attempt"; - if ($pid = $event->sender?->getWorkerPid()) { + $pid = $event->sender->getWorkerPid(); + if (null !== $pid) { $extra .= ", pid: $pid"; } return " [$event->id] $name ($extra)"; diff --git a/src/closure/Behavior.php b/src/closure/Behavior.php index dd0fe3183..75b051181 100644 --- a/src/closure/Behavior.php +++ b/src/closure/Behavior.php @@ -33,8 +33,7 @@ class Behavior extends \yii\base\Behavior { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public $owner; diff --git a/src/closure/Job.php b/src/closure/Job.php index 5bcab6984..eb6897c7d 100644 --- a/src/closure/Job.php +++ b/src/closure/Job.php @@ -10,6 +10,7 @@ namespace yii\queue\closure; +use Laravel\SerializableClosure\SerializableClosure; use Laravel\SerializableClosure\Serializers\Native; use yii\queue\JobInterface; use yii\queue\Queue; @@ -32,13 +33,16 @@ class Job implements JobInterface */ public function execute(Queue $queue) { - $closure = unserialize($this->serialized)->getClosure(); + /** @var SerializableClosure $unserialize */ + $unserialize = unserialize($this->serialized); + $closure = $unserialize->getClosure(); $nativeClosure = $closure(); if ($nativeClosure instanceof Native) { return $nativeClosure(); } + /** @psalm-var JobInterface $nativeClosure */ return $nativeClosure->execute($queue); } } diff --git a/src/debug/Panel.php b/src/debug/Panel.php index bb35b86d3..2dcabeb8d 100644 --- a/src/debug/Panel.php +++ b/src/debug/Panel.php @@ -14,6 +14,7 @@ use Yii; use yii\base\NotSupportedException; use yii\base\ViewContextInterface; +use yii\debug\Panel as BasePanel; use yii\helpers\VarDumper; use yii\queue\JobInterface; use yii\queue\PushEvent; @@ -23,10 +24,11 @@ * Debug Panel. * * @author Roman Zhuravlev + * @psalm-suppress PropertyNotSetInConstructor */ -class Panel extends \yii\debug\Panel implements ViewContextInterface +class Panel extends BasePanel implements ViewContextInterface { - private array $_jobs = []; + private array $jobs = []; /** * @inheritdoc @@ -43,7 +45,7 @@ public function init(): void { parent::init(); PushEvent::on(Queue::class, Queue::EVENT_AFTER_PUSH, function (PushEvent $event) { - $this->_jobs[] = $this->getPushData($event); + $this->jobs[] = $this->getPushData($event); }); } @@ -82,7 +84,7 @@ protected function getPushData(PushEvent $event): array */ public function save() { - return ['jobs' => $this->_jobs]; + return ['jobs' => $this->jobs]; } /** @@ -98,6 +100,7 @@ public function getViewPath(): string */ public function getSummary(): string { + /** @psalm-var array{jobs: array} $this->data */ return Yii::$app->view->render('summary', [ 'url' => $this->getUrl(), 'count' => isset($this->data['jobs']) ? count($this->data['jobs']) : 0, @@ -109,12 +112,15 @@ public function getSummary(): string */ public function getDetail(): string { + /** @psalm-var array{jobs: array} $this->data */ $jobs = $this->data['jobs'] ?? []; foreach ($jobs as &$job) { + /** @psalm-var array{sender: string, id: string|int} $job */ $job['status'] = 'unknown'; /** @var Queue $queue */ if ($queue = Yii::$app->get($job['sender'], false)) { try { + /** @psalm-var Queue $queue */ if ($queue->isWaiting($job['id'])) { $job['status'] = 'waiting'; } elseif ($queue->isReserved($job['id'])) { diff --git a/src/debug/views/detail.php b/src/debug/views/detail.php index 2957a4548..fc3480854 100644 --- a/src/debug/views/detail.php +++ b/src/debug/views/detail.php @@ -3,10 +3,12 @@ declare(strict_types=1); /** - * @var \yii\web\View $this + * @var View $this * @var array $jobs */ + use yii\helpers\Html; +use yii\web\View; $styles = [ 'unknown' => 'default', diff --git a/src/debug/views/summary.php b/src/debug/views/summary.php index c87f88f44..4c25b7ffb 100644 --- a/src/debug/views/summary.php +++ b/src/debug/views/summary.php @@ -3,10 +3,13 @@ declare(strict_types=1); /** - * @var \yii\web\View $this + * @var View $this * @var string $url * @var int $count */ + +use yii\web\View; + ?>
diff --git a/src/drivers/amqp_interop/Command.php b/src/drivers/amqp_interop/Command.php index e2007fd27..1dcd2b5cd 100644 --- a/src/drivers/amqp_interop/Command.php +++ b/src/drivers/amqp_interop/Command.php @@ -23,8 +23,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; diff --git a/src/drivers/amqp_interop/Queue.php b/src/drivers/amqp_interop/Queue.php index d38ccaaa9..603e3d1ca 100644 --- a/src/drivers/amqp_interop/Queue.php +++ b/src/drivers/amqp_interop/Queue.php @@ -24,6 +24,7 @@ use Interop\Amqp\AmqpTopic; use Interop\Amqp\Impl\AmqpBind; use Interop\Queue\Context; +use LogicException; use yii\base\Application as BaseApp; use yii\base\Event; use yii\base\NotSupportedException; @@ -328,8 +329,7 @@ public function listen(): void $queue = $this->getContext()->createQueue($this->queueName); $consumer = $this->getContext()->createConsumer($queue); - /** @psalm-suppress MissingClosureReturnType */ - $callback = function (AmqpMessage $message, AmqpConsumer $consumer) { + $callback = function (AmqpMessage $message, AmqpConsumer $consumer): callable|bool { if ($message->isRedelivered()) { $consumer->acknowledge($message); @@ -338,8 +338,8 @@ public function listen(): void return true; } - $ttr = $message->getProperty(self::TTR); - $attempt = $message->getProperty(self::ATTEMPT, 1); + $ttr = (int)$message->getProperty(self::TTR); + $attempt = (int)$message->getProperty(self::ATTEMPT, 1); $messageId = $message->getMessageId(); if ( @@ -382,7 +382,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $topic = $this->getContext()->createTopic($this->exchangeName); - /** @var AmqpMessage $message */ + /** @psalm-var AmqpMessage $message */ $message = $this->getContext()->createMessage($payload); $message->setDeliveryMode(AmqpMessage::DELIVERY_MODE_PERSISTENT); $message->setMessageId(uniqid('', true)); @@ -402,6 +402,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $producer->setDeliveryDelay($delay * 1000); } + /** @var int|null $priority */ if ($priority) { $message->setProperty(self::PRIORITY, $priority); $producer->setPriority($priority); @@ -438,7 +439,7 @@ protected function open(): void self::ENQUEUE_AMQP_LIB => AmqpLibConnectionFactory::class, self::ENQUEUE_AMQP_EXT => AmqpExtConnectionFactory::class, self::ENQUEUE_AMQP_BUNNY => AmqpBunnyConnectionFactory::class, - default => throw new \LogicException( + default => throw new LogicException( sprintf( 'The given driver "%s" is not supported. Drivers supported are "%s"', $this->driver, @@ -526,6 +527,7 @@ protected function close(): void protected function redeliver(AmqpMessage $message): void { + /** @var int $attempt */ $attempt = $message->getProperty(self::ATTEMPT, 1); /** @var AmqpMessage $newMessage */ diff --git a/src/drivers/beanstalk/Command.php b/src/drivers/beanstalk/Command.php index 757a50052..ebb54b4bf 100644 --- a/src/drivers/beanstalk/Command.php +++ b/src/drivers/beanstalk/Command.php @@ -23,9 +23,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantPropertyType - * @psalm-suppress PropertyNotSetInConstructor - * @psalm-suppress NonInvariantDocblockPropertyType + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantPropertyType, NonInvariantDocblockPropertyType */ public CliQueue $queue; /** diff --git a/src/drivers/beanstalk/InfoAction.php b/src/drivers/beanstalk/InfoAction.php index 2d320d934..0445780ad 100644 --- a/src/drivers/beanstalk/InfoAction.php +++ b/src/drivers/beanstalk/InfoAction.php @@ -42,7 +42,7 @@ public function run(): void /** @psalm-suppress RawObjectIteration */ foreach ($this->queue->getStatsTube() as $key => $value) { Console::stdout($this->format("- $key: ", BaseConsole::FG_YELLOW)); - Console::output($value); + Console::output((string)$value); } } catch (Throwable) { Console::stdout( diff --git a/src/drivers/beanstalk/Queue.php b/src/drivers/beanstalk/Queue.php index e3d7ec4f4..09b76d12c 100644 --- a/src/drivers/beanstalk/Queue.php +++ b/src/drivers/beanstalk/Queue.php @@ -148,7 +148,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $result = $pheanstalk ->put( $payload, - $priority ?: PheanstalkPublisherInterface::DEFAULT_PRIORITY, + (int)$priority ?: PheanstalkPublisherInterface::DEFAULT_PRIORITY, $delay, // Seconds to wait before job becomes ready $ttr // Time To Run: seconds a job can be reserved for ); diff --git a/src/drivers/db/Command.php b/src/drivers/db/Command.php index f06af1347..41ca3f434 100644 --- a/src/drivers/db/Command.php +++ b/src/drivers/db/Command.php @@ -23,8 +23,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; /** diff --git a/src/drivers/db/InfoAction.php b/src/drivers/db/InfoAction.php index 43007526f..dbc5b4f7a 100644 --- a/src/drivers/db/InfoAction.php +++ b/src/drivers/db/InfoAction.php @@ -26,8 +26,7 @@ class InfoAction extends Action { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; diff --git a/src/drivers/db/Queue.php b/src/drivers/db/Queue.php index 8b82364da..a98f88a4c 100644 --- a/src/drivers/db/Queue.php +++ b/src/drivers/db/Queue.php @@ -82,6 +82,7 @@ public function run(bool $repeat, int $timeout = 0) return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if ($payload = $this->reserve()) { + /** @psalm-var array{id: int|string, job:string, ttr:int|string, attempt:int|string} $payload */ if ($this->handleMessage( $payload['id'], $payload['job'], @@ -177,7 +178,8 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri /** * Takes one message from waiting list and reserves it for handling. * - * @return array|false payload + * @return array|false + * @psalm-suppress MixedInferredReturnType, MixedReturnStatement * @throws Exception in case it hasn't waited the lock */ protected function reserve(): bool|array diff --git a/src/drivers/file/Command.php b/src/drivers/file/Command.php index 222b644b0..354c373c9 100644 --- a/src/drivers/file/Command.php +++ b/src/drivers/file/Command.php @@ -23,8 +23,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; /** diff --git a/src/drivers/file/InfoAction.php b/src/drivers/file/InfoAction.php index 8c786fdec..c91d895fd 100644 --- a/src/drivers/file/InfoAction.php +++ b/src/drivers/file/InfoAction.php @@ -24,8 +24,7 @@ class InfoAction extends Action { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress NonInvariantDocblockPropertyType, PropertyNotSetInConstructor */ public CliQueue $queue; @@ -54,6 +53,7 @@ public function run(): void */ protected function getWaitingCount(): int { + /** @var array{waiting: array} $data */ $data = $this->getIndexData(); return !empty($data['waiting']) ? count($data['waiting']) : 0; } @@ -63,6 +63,7 @@ protected function getWaitingCount(): int */ protected function getDelayedCount(): int { + /** @var array{delayed: array} $data */ $data = $this->getIndexData(); return !empty($data['delayed']) ? count($data['delayed']) : 0; } @@ -72,6 +73,7 @@ protected function getDelayedCount(): int */ protected function getReservedCount(): int { + /** @var array{reserved: array} $data */ $data = $this->getIndexData(); return !empty($data['reserved']) ? count($data['reserved']) : 0; } @@ -81,6 +83,7 @@ protected function getReservedCount(): int */ protected function getDoneCount(): int { + /** @var array{lastId: int} $data */ $data = $this->getIndexData(); $total = $data['lastId'] ?? 0; return $total - $this->getDelayedCount() - $this->getWaitingCount(); diff --git a/src/drivers/file/Queue.php b/src/drivers/file/Queue.php index b415972ad..5a7ee35b4 100644 --- a/src/drivers/file/Queue.php +++ b/src/drivers/file/Queue.php @@ -79,6 +79,12 @@ public function run(bool $repeat, int $timeout = 0): ?int while ($canContinue()) { if (($payload = $this->reserve()) !== null) { [$id, $message, $ttr, $attempt] = $payload; + /** + * @var int|string $id + * @var string $message + * @var int $ttr + * @var int $attempt + */ if ($this->handleMessage($id, $message, $ttr, $attempt)) { $this->delete($payload); } @@ -127,13 +133,16 @@ public function clear(): void * @param int $id of a job * @return bool * @since 2.0.1 + * @psalm-suppress MixedInferredReturnType, MixedReturnStatement */ public function remove(int $id): bool { $removed = false; $this->touchIndex(function (array &$data) use ($id, &$removed) { + /** @var array{waiting: array, delayed: array, reserved: array} $data */ if (!empty($data['waiting'])) { foreach ($data['waiting'] as $key => $payload) { + /** @psalm-var array $payload */ if ($payload[0] === $id) { unset($data['waiting'][$key]); $removed = true; @@ -143,6 +152,7 @@ public function remove(int $id): bool } if (!$removed && !empty($data['delayed'])) { foreach ($data['delayed'] as $key => $payload) { + /** @psalm-var array $payload */ if ($payload[0] === $id) { unset($data['delayed'][$key]); $removed = true; @@ -152,6 +162,7 @@ public function remove(int $id): bool } if (!$removed && !empty($data['reserved'])) { foreach ($data['reserved'] as $key => $payload) { + /** @psalm-var array $payload */ if ($payload[0] === $id) { unset($data['reserved'][$key]); $removed = true; @@ -178,20 +189,30 @@ protected function reserve(): ?array $ttr = null; $attempt = null; $this->touchIndex(function (array &$data) use (&$id, &$ttr, &$attempt) { + /** @var array{reserved: array, delayed: array, waiting: array} $data */ if (!empty($data['reserved'])) { foreach ($data['reserved'] as $key => $payload) { - if ($payload[1] + $payload[3] < time()) { + /** @psalm-var array $payload */ + if ((int)$payload[1] + (int)$payload[3] < time()) { + /** + * @psalm-var int $attempt + */ [$id, $ttr, $attempt, $time] = $payload; + /** @psalm-suppress MixedArrayAssignment */ $data['reserved'][$key][2] = ++$attempt; + /** @psalm-suppress MixedArrayAssignment */ $data['reserved'][$key][3] = time(); return; } } } - if (!empty($data['delayed']) && $data['delayed'][0][2] <= time()) { + /** @psalm-suppress MixedArrayAssignment, MixedArrayAccess */ + if (!empty($data['delayed']) && (int)$data['delayed'][0][2] <= time()) { + /** @psalm-suppress MixedArrayAssignment, MixedArrayAccess */ [$id, $ttr, $time] = array_shift($data['delayed']); } elseif (!empty($data['waiting'])) { + /** @psalm-suppress MixedArrayAssignment, MixedArrayAccess */ [$id, $ttr] = array_shift($data['waiting']); } if ($id) { @@ -216,7 +237,9 @@ protected function delete(array $payload): void { $id = $payload[0]; $this->touchIndex(function (array &$data) use ($id) { + /** @var array{reserved: array} $data */ foreach ($data['reserved'] as $key => $payload) { + /** @psalm-var array $payload */ if ($payload[0] === $id) { unset($data['reserved'][$key]); break; @@ -236,6 +259,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri } $this->touchIndex(function (array &$data) use ($payload, $ttr, $delay, &$id) { + /** @var array{lastId: int, waiting: array, delayed: array} $data */ if (!isset($data['lastId'])) { $data['lastId'] = 0; } @@ -249,7 +273,8 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri $data['waiting'][] = [$id, $ttr, 0]; } else { $data['delayed'][] = [$id, $ttr, time() + $delay]; - usort($data['delayed'], static function ($a, $b) { + /** @psalm-suppress MixedArgumentTypeCoercion */ + usort($data['delayed'], static function (array $a, array $b) { if ($a[2] < $b[2]) { return -1; } @@ -267,6 +292,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri } }); + /** @psalm-var int|string|null $id */ return $id; } @@ -296,6 +322,7 @@ private function touchIndex(callable $callback): void } try { $callback($data); + /** @var string $newContent */ $newContent = call_user_func($this->indexSerializer, $data); if ($newContent !== $content) { ftruncate($file, 0); diff --git a/src/drivers/gearman/Command.php b/src/drivers/gearman/Command.php index f16c47c4e..3bf256cac 100644 --- a/src/drivers/gearman/Command.php +++ b/src/drivers/gearman/Command.php @@ -22,8 +22,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; diff --git a/src/drivers/redis/Command.php b/src/drivers/redis/Command.php index 95ed09e45..0872e319c 100644 --- a/src/drivers/redis/Command.php +++ b/src/drivers/redis/Command.php @@ -23,8 +23,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; /** diff --git a/src/drivers/redis/InfoAction.php b/src/drivers/redis/InfoAction.php index 213e4cc28..e052feb4a 100644 --- a/src/drivers/redis/InfoAction.php +++ b/src/drivers/redis/InfoAction.php @@ -23,8 +23,7 @@ class InfoAction extends Action { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; @@ -38,20 +37,20 @@ public function run(): void $delayed = $this->queue->redis->zcount("$prefix.delayed", '-inf', '+inf'); $reserved = $this->queue->redis->zcount("$prefix.reserved", '-inf', '+inf'); $total = $this->queue->redis->get("$prefix.message_id"); - $done = $total - $waiting - $delayed - $reserved; + $done = (int)$total - (int)$waiting - (int)$delayed - (int)$reserved; Console::output($this->format('Jobs', Console::FG_GREEN)); Console::stdout($this->format('- waiting: ', Console::FG_YELLOW)); - Console::output($waiting); + Console::output((string)$waiting); Console::stdout($this->format('- delayed: ', Console::FG_YELLOW)); - Console::output($delayed); + Console::output((string)$delayed); Console::stdout($this->format('- reserved: ', Console::FG_YELLOW)); - Console::output($reserved); + Console::output((string)$reserved); Console::stdout($this->format('- done: ', Console::FG_YELLOW)); - Console::output($done); + Console::output((string)$done); } } diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index b1ecc9f43..f9004c941 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -63,6 +63,10 @@ public function run(bool $repeat, int $timeout = 0): ?int while ($canContinue()) { if (($payload = $this->reserve($timeout)) !== null) { [$id, $message, $ttr, $attempt] = $payload; + /** + * @psalm-var int|string $id + * @psalm-var string $message + */ if ($this->handleMessage($id, $message, (int)$ttr, (int)$attempt)) { $this->delete($id); } @@ -103,6 +107,7 @@ public function clear(): void while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) { usleep(10000); } + /** @psalm-suppress MixedArgument */ $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); } @@ -147,6 +152,7 @@ protected function reserve(int $timeout): ?array if (!$timeout) { $id = $this->redis->rpop("$this->channel.waiting"); } elseif ($result = $this->redis->brpop("$this->channel.waiting", $timeout)) { + /** @psalm-var array $result */ $id = $result[1]; } if (!$id) { @@ -158,7 +164,10 @@ protected function reserve(int $timeout): ?array return null; } - /** @psalm-suppress PossiblyUndefinedArrayOffset */ + /** + * @psalm-suppress PossiblyUndefinedArrayOffset + * @psalm-var string $payload + */ [$ttr, $message] = explode(';', $payload, 2); $this->redis->zadd("$this->channel.reserved", time() + (int)$ttr, $id); $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1); @@ -201,6 +210,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri throw new NotSupportedException('Job priority is not supported in the driver.'); } + /** @var string|int $id */ $id = $this->redis->incr("$this->channel.message_id"); $this->redis->hset("$this->channel.messages", $id, "$ttr;$payload"); if (!$delay) { diff --git a/src/drivers/sqs/Command.php b/src/drivers/sqs/Command.php index cc9998768..e5f90c490 100644 --- a/src/drivers/sqs/Command.php +++ b/src/drivers/sqs/Command.php @@ -24,8 +24,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; diff --git a/src/drivers/sqs/Payload.php b/src/drivers/sqs/Payload.php new file mode 100644 index 000000000..d719fc9d1 --- /dev/null +++ b/src/drivers/sqs/Payload.php @@ -0,0 +1,47 @@ +messages = $response['Messages']??[]; + if (!empty($this->messages)) { + $messages = $this->messages; + /** @var array{ + * MessageAttributes: array, + * Body: string, + * MessageId: string|int, + * ReceiptHandle: mixed, + * Attributes: array + * } $message + */ + $message = reset($messages); + + $this->ttr = (int)$message['MessageAttributes']['TTR']['StringValue']; + $this->body = $message['Body']; + $this->messageId = $message['MessageId']; + $this->attempt = (int)$message['Attributes']['ApproximateReceiveCount']; + $this->receiptHandle = $message['ReceiptHandle']; + } + } +} diff --git a/src/drivers/sqs/Queue.php b/src/drivers/sqs/Queue.php index c7127db8d..47dbe4b81 100644 --- a/src/drivers/sqs/Queue.php +++ b/src/drivers/sqs/Queue.php @@ -85,11 +85,14 @@ public function run(bool $repeat, int $timeout = 0): ?int return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if (($payload = $this->reserve($timeout)) !== null) { - $id = $payload['MessageId']; - $message = $payload['Body']; - $ttr = (int) $payload['MessageAttributes']['TTR']['StringValue']; - $attempt = (int) $payload['Attributes']['ApproximateReceiveCount']; - if ($this->handleMessage($id, $message, $ttr, $attempt)) { + if ( + $this->handleMessage( + $payload->messageId, + $payload->body, + $payload->ttr, + $payload->attempt + ) + ) { $this->delete($payload); } } elseif (!$repeat) { @@ -103,10 +106,11 @@ public function run(bool $repeat, int $timeout = 0): ?int * Gets a single message from SQS queue and sets the visibility to reserve message. * * @param int $timeout number of seconds for long polling. Must be between 0 and 20. - * @return null|array payload. + * @return Payload|null */ - protected function reserve(int $timeout): ?array + protected function reserve(int $timeout): ?Payload { + /** @var array{Messages: array} $response */ $response = $this->getClient()->receiveMessage([ 'QueueUrl' => $this->url, 'AttributeNames' => ['ApproximateReceiveCount'], @@ -114,18 +118,18 @@ protected function reserve(int $timeout): ?array 'MaxNumberOfMessages' => 1, 'VisibilityTimeout' => $this->ttr, 'WaitTimeSeconds' => $timeout, - ]); - if (!$response['Messages']) { + ])->toArray(); + + $payload = new Payload($response); + if (empty($payload->messages)) { return null; } - $payload = reset($response['Messages']); - - $ttr = (int) $payload['MessageAttributes']['TTR']['StringValue']; + $ttr = $payload->ttr; if ($ttr !== $this->ttr) { $this->getClient()->changeMessageVisibility([ 'QueueUrl' => $this->url, - 'ReceiptHandle' => $payload['ReceiptHandle'], + 'ReceiptHandle' => $payload->receiptHandle, 'VisibilityTimeout' => $ttr, ]); } @@ -136,13 +140,13 @@ protected function reserve(int $timeout): ?array /** * Deletes the message after successfully handling. * - * @param array $payload + * @param Payload $payload */ - protected function delete(array $payload): void + protected function delete(Payload $payload): void { $this->getClient()->deleteMessage([ 'QueueUrl' => $this->url, - 'ReceiptHandle' => $payload['ReceiptHandle'], + 'ReceiptHandle' => $payload->receiptHandle, ]); } @@ -206,7 +210,7 @@ protected function pushMessage(string $payload, int $ttr, int $delay, mixed $pri } $response = $this->getClient()->sendMessage($request); - return $response['MessageId']; + return null === $response['MessageId']?null:(string)$response['MessageId']; } /** diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php index 312eac87f..71bbfd9d1 100644 --- a/src/drivers/stomp/Command.php +++ b/src/drivers/stomp/Command.php @@ -24,8 +24,7 @@ class Command extends CliCommand { /** * @var Queue - * @psalm-suppress NonInvariantDocblockPropertyType - * @psalm-suppress PropertyNotSetInConstructor + * @psalm-suppress PropertyNotSetInConstructor, NonInvariantDocblockPropertyType */ public CliQueue $queue; diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php index a98eaad35..16885a136 100644 --- a/src/drivers/stomp/Queue.php +++ b/src/drivers/stomp/Queue.php @@ -177,8 +177,8 @@ public function run(bool $repeat, int $timeout = 0): ?int continue; } - $ttr = $message->getProperty(self::TTR, $this->ttr); - $attempt = $message->getProperty(self::ATTEMPT, 1); + $ttr = (int)$message->getProperty(self::TTR, $this->ttr); + $attempt = (int)$message->getProperty(self::ATTEMPT, 1); $messageId = $message->getMessageId(); if (null !== $messageId && $this->handleMessage($messageId, $message->getBody(), $ttr, $attempt)) { @@ -267,7 +267,7 @@ public function status($id): int */ protected function redeliver(StompMessage $message): void { - $attempt = $message->getProperty(self::ATTEMPT, 1); + $attempt = (int)$message->getProperty(self::ATTEMPT, 1); $newMessage = $this->getContext()->createMessage( $message->getBody(), diff --git a/src/drivers/sync/Queue.php b/src/drivers/sync/Queue.php index d83d8eee5..6e4bcb6c5 100644 --- a/src/drivers/sync/Queue.php +++ b/src/drivers/sync/Queue.php @@ -27,7 +27,7 @@ class Queue extends BaseQueue */ public bool $handle = false; /** - * @var array of payloads + * @var array $payloads of payloads */ private array $payloads = []; /** @@ -64,6 +64,11 @@ public function init(): void public function run(): void { while (($payload = array_shift($this->payloads)) !== null) { + /** + * @var int $ttr + * @var string $message + * @psalm-suppress MixedArrayAccess + */ [$ttr, $message] = $payload; $this->startedId = $this->finishedId + 1; $this->handleMessage($this->startedId, $message, $ttr, 1); diff --git a/src/gii/Generator.php b/src/gii/Generator.php index 3e67cd579..1fe051564 100644 --- a/src/gii/Generator.php +++ b/src/gii/Generator.php @@ -24,11 +24,11 @@ */ class Generator extends BaseGenerator { - public $jobClass; - public $properties; - public $retryable = false; - public $ns = 'app\jobs'; - public $baseClass = BaseObject::class; + public string $jobClass = ''; + public string $properties = ''; + public bool $retryable = false; + public string $ns = 'app\jobs'; + public string $baseClass = BaseObject::class; /** * @inheritdoc @@ -128,8 +128,12 @@ public function generate(): array } $params['properties'] = array_unique(preg_split('/[\s,]+/', $this->properties, -1, PREG_SPLIT_NO_EMPTY)); + $alias = Yii::getAlias('@' . str_replace('\\', '/', $this->ns)); + if (false === $alias) { + return []; + } $jobFile = new CodeFile( - Yii::getAlias('@' . str_replace('\\', '/', $this->ns)) . '/' . $this->jobClass . '.php', + $alias . '/' . $this->jobClass . '.php', $this->render('job.php', $params) ); @@ -143,6 +147,7 @@ public function generate(): array */ public function validateJobClass(string $attribute): void { + /** @psalm-suppress MixedArgument */ if ($this->isReservedKeyword($this->$attribute)) { $this->addError($attribute, 'Class name cannot be a reserved PHP keyword.'); } @@ -155,6 +160,7 @@ public function validateJobClass(string $attribute): void */ public function validateNamespace(string $attribute): void { + /** @psalm-var string $value */ $value = $this->$attribute; $value = ltrim($value, '\\'); $path = Yii::getAlias('@' . str_replace('\\', '/', $value), false); diff --git a/src/gii/default/job.php b/src/gii/default/job.php index ece107a74..d269b4ee7 100644 --- a/src/gii/default/job.php +++ b/src/gii/default/job.php @@ -3,14 +3,18 @@ declare(strict_types=1); /** - * @var \yii\web\View $this - * @var \yii\queue\gii\Generator $generator + * @var View $this + * @var Generator $generator * @var string $jobClass * @var string $ns * @var string $baseClass * @var string[] $interfaces * @var string[] $properties */ + +use yii\web\View; +use yii\queue\gii\Generator; + if ($interfaces) { $implements = 'implements ' . implode(', ', $interfaces); } else { diff --git a/src/gii/form.php b/src/gii/form.php index a28b40b9f..03a05b710 100644 --- a/src/gii/form.php +++ b/src/gii/form.php @@ -3,10 +3,15 @@ declare(strict_types=1); /** - * @var \yii\web\View $this - * @var \yii\widgets\ActiveForm $form - * @var \yii\queue\gii\Generator $generator + * @var View $this + * @var ActiveForm $form + * @var Generator $generator */ + +use yii\queue\gii\Generator; +use yii\web\View; +use yii\widgets\ActiveForm; + ?> field($generator, 'jobClass')->textInput(['autofocus' => true]) ?> field($generator, 'properties') ?> diff --git a/src/serializers/IgbinarySerializer.php b/src/serializers/IgbinarySerializer.php index 21db11fdb..98cb58bc3 100644 --- a/src/serializers/IgbinarySerializer.php +++ b/src/serializers/IgbinarySerializer.php @@ -33,7 +33,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize(string $serialized) + public function unserialize(string $serialized): mixed { return igbinary_unserialize($serialized); } diff --git a/src/serializers/JsonSerializer.php b/src/serializers/JsonSerializer.php index 0cc85a91e..0483dcbfb 100644 --- a/src/serializers/JsonSerializer.php +++ b/src/serializers/JsonSerializer.php @@ -42,7 +42,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize(string $serialized) + public function unserialize(string $serialized): mixed { return $this->fromArray(Json::decode($serialized)); } diff --git a/src/serializers/PhpSerializer.php b/src/serializers/PhpSerializer.php index ac3e05791..95eb0b0ad 100644 --- a/src/serializers/PhpSerializer.php +++ b/src/serializers/PhpSerializer.php @@ -30,7 +30,7 @@ public function serialize($job): string /** * @inheritdoc */ - public function unserialize(string $serialized) + public function unserialize(string $serialized): mixed { return unserialize($serialized); } diff --git a/src/serializers/SerializerInterface.php b/src/serializers/SerializerInterface.php index d10b668a5..376502e14 100644 --- a/src/serializers/SerializerInterface.php +++ b/src/serializers/SerializerInterface.php @@ -27,7 +27,7 @@ public function serialize($job): string; /** * @param string $serialized - * @return JobInterface + * @return JobInterface|array|mixed */ public function unserialize(string $serialized); }