Skip to content

Commit

Permalink
Add Config and Migration plus review changes (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
aphraoh committed Apr 3, 2024
1 parent 0a408ed commit 707c126
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 20 deletions.
4 changes: 1 addition & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"require": {
"php": "^8.1",
"yiisoft/db": "^1.0",
"yiisoft/mutex": "^1.0",
"yiisoft/queue": "dev-master"
},
"require-dev": {
Expand All @@ -60,9 +61,6 @@
},
"config-plugin-options": {
"source-directory": "config"
},
"config-plugin": {
"di": "di.php"
}
},
"config": {
Expand Down
66 changes: 49 additions & 17 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@

use InvalidArgumentException;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Db\Connection\ConnectionInterface;
use Yiisoft\Db\Query\Query;
use Yiisoft\Mutex\MutexFactoryInterface;
use Yiisoft\Mutex\MutexInterface;

final class Adapter implements AdapterInterface
{
/**
* @var int timeout
* @var MutexInterface Mutex interface.
*/
public MutexInterface $mutex;
/**
* @var int Mutex timeout.
*/
public $mutexTimeout = 3;
/**
* @var string table name
* @var string Table name.
*/
public $tableName = '{{%queue}}';
/**
* @var bool ability to delete released messages from table
* @var bool Ability to delete released messages from table.
*/
public $deleteReleased = true;


public function __construct(
private ConnectionInterface $db,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private MutexFactoryInterface $mutexFactory,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
$this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel);
}

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (($payload = $this->reserve()) && ($result === true)) {
if ($result = $handlerCallback(\unserialize($payload['job']))) {
$this->release($payload);
}
}
$this->run($handlerCallback, false);
}

public function status(string|int $id): JobStatus
Expand Down Expand Up @@ -78,7 +85,7 @@ public function push(MessageInterface $message): MessageInterface
$metadata = $message->getMetadata();
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => \serialize($message),
'job' => $this->serializer->serialize($message),
'pushed_at' => time(),
'ttr' => $metadata['ttr'] ?? 300,
'delay' => $metadata['delay'] ?? 0,
Expand All @@ -92,7 +99,7 @@ public function push(MessageInterface $message): MessageInterface

public function subscribe(callable $handlerCallback): void
{
$this->runExisting($handlerCallback);
$this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded
}

public function withChannel(string $channel): self
Expand All @@ -103,7 +110,8 @@ public function withChannel(string $channel): self

$new = clone $this;
$new->channel = $channel;

$new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel);

return $new;
}

Expand All @@ -115,10 +123,10 @@ public function withChannel(string $channel): self
*/
protected function reserve(): array|null
{
// TWK TODO ??? return $this->db->useMaster(function () {
// TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
// TWK TODO ??? throw new \Exception('Has not waited the lock.');
// TWK TODO ??? }
// TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () {
if (!$this->mutex->acquire($this->mutexTimeout)) {
throw new \Exception('Has not waited the lock.');
}

try {
$this->moveExpired();
Expand Down Expand Up @@ -147,7 +155,7 @@ protected function reserve(): array|null
}
}
} finally {
// TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel);
$this->mutex->release();
}

return $payload;
Expand Down Expand Up @@ -194,4 +202,28 @@ private function moveExpired(): void
*/
private $reserveTime = 0;

/**
* Listens queue and runs each job.
*
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages
* @param bool $repeat whether to continue listening when queue is empty.
* @param non-negative-int $timeout number of seconds to sleep before next iteration.
*/
public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void
{
while ($this->loop->canContinue()) {
if ($payload = $this->reserve()) {
if ($handlerCallback($this->serializer->unserialize($payload['job']))) {
$this->release($payload);
}
continue;
}
if (!$repeat) {
break;
}
if ($timeout > 0) {
sleep($timeout);
}
}
}
}

0 comments on commit 707c126

Please sign in to comment.