Skip to content

Commit

Permalink
Initial Version (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
aphraoh committed Mar 26, 2024
1 parent d652948 commit d11aa31
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 20 deletions.
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@
<a href="https://github.com/yiisoft" target="_blank">
<img src="https://yiisoft.github.io/docs/images/yii_logo.svg" height="100px">
</a>
<h1 align="center">Yii _____</h1>
<h1 align="center">Yii Queue Database Adapter</h1>
<br>
</p>

[![Latest Stable Version](https://poser.pugx.org/yiisoft/_____/v/stable.png)](https://packagist.org/packages/yiisoft/_____)
[![Total Downloads](https://poser.pugx.org/yiisoft/_____/downloads.png)](https://packagist.org/packages/yiisoft/_____)
[![Build status](https://github.com/yiisoft/_____/workflows/build/badge.svg)](https://github.com/yiisoft/_____/actions?query=workflow%3Abuild)
[![Code Coverage](https://codecov.io/gh/yiisoft/_____/branch/master/graph/badge.svg)](https://codecov.io/gh/yiisoft/_____)
[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fyiisoft%2F_____%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/yiisoft/_____/master)
[![static analysis](https://github.com/yiisoft/_____/workflows/static%20analysis/badge.svg)](https://github.com/yiisoft/_____/actions?query=workflow%3A%22static+analysis%22)
[![type-coverage](https://shepherd.dev/github/yiisoft/_____/coverage.svg)](https://shepherd.dev/github/yiisoft/_____)
[![psalm-level](https://shepherd.dev/github/yiisoft/_____/level.svg)](https://shepherd.dev/github/yiisoft/_____)
[![Latest Stable Version](https://poser.pugx.org/yiisoft/queue-db/v/stable.png)](https://packagist.org/packages/yiisoft/queue-db)
[![Total Downloads](https://poser.pugx.org/yiisoft/queue-db/downloads.png)](https://packagist.org/packages/yiisoft/queue-db)
[![Build status](https://github.com/yiisoft/queue-db/workflows/build/badge.svg)](https://github.com/yiisoft/queue-db/actions?query=workflow%3Abuild)
[![Code Coverage](https://codecov.io/gh/yiisoft/queue-db/branch/master/graph/badge.svg)](https://codecov.io/gh/yiisoft/queue-db)
[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fyiisoft%2Fqueue-db%2Fmaster)](https://dashboard.stryker-mutator.io/reports/github.com/yiisoft/queue-db/master)
[![static analysis](https://github.com/yiisoft/queue-db/workflows/static%20analysis/badge.svg)](https://github.com/yiisoft/queue-db/actions?query=workflow%3A%22static+analysis%22)
[![type-coverage](https://shepherd.dev/github/yiisoft/queue-db/coverage.svg)](https://shepherd.dev/github/yiisoft/queue-db)
[![psalm-level](https://shepherd.dev/github/yiisoft/queue-db/level.svg)](https://shepherd.dev/github/yiisoft/queue-db)

Yii Queue Database Adapter implemention for [Yii Queue](https://github.com/yiisoft/queue).
Works with databases implemented for [Yii Database](https://github.com/yiisoft/queue).


The package ...

## Requirements

Expand All @@ -26,7 +29,7 @@ The package ...
The package could be installed with composer:

```shell
composer require yiisoft/_____
composer require yiisoft/queue-db
```

## General usage
Expand Down Expand Up @@ -74,7 +77,7 @@ Use [ComposerRequireChecker](https://github.com/maglnet/ComposerRequireChecker)

## License

The Yii _____ is free software. It is released under the terms of the BSD License.
The Yii Queue Database Adapter is free software. It is released under the terms of the BSD License.
Please see [`LICENSE`](./LICENSE.md) for more information.

Maintained by [Yii Software](https://www.yiiframework.com/).
Expand Down
33 changes: 25 additions & 8 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
{
"name": "yiisoft/_____",
"name": "yiisoft/queue-db",
"type": "library",
"description": "_____",
"description": "Yii Queue DB adapter using yiisoft/db",
"keywords": [
"_____"
"yii",
"queue",
"sql",
"database",
"async"
],
"homepage": "https://www.yiiframework.com/",
"license": "BSD-3-Clause",
"support": {
"issues": "https://github.com/yiisoft/_____/issues?state=open",
"issues": "https://github.com/yiisoft/queue-db/issues?state=open",
"forum": "https://www.yiiframework.com/forum/",
"wiki": "https://www.yiiframework.com/wiki/",
"irc": "ircs://irc.libera.chat:6697/yii",
"chat": "https://t.me/yii3en",
"source": "https://github.com/yiisoft/_____"
"source": "https://github.com/yiisoft/queue-db"
},
"funding": [
{
Expand All @@ -28,7 +32,9 @@
"minimum-stability": "dev",
"prefer-stable": true,
"require": {
"php": "^8.1"
"php": "^8.1",
"yiisoft/db": "^1.0",
"yiisoft/queue": "dev-master"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.7",
Expand All @@ -40,12 +46,23 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\_____\\": "src"
"Yiisoft\\Queue\\Db\\": "src"
}
},
"autoload-dev": {
"psr-4": {
"Yiisoft\\_____\\Tests\\": "tests"
"Yiisoft\\Queue\\Db\\Tests\\": "tests"
}
},
"extra": {
"branch-alias": {
"dev-master": "3.0.x-dev"
},
"config-plugin-options": {
"source-directory": "config"
},
"config-plugin": {
"di": "di.php"
}
},
"config": {
Expand Down
197 changes: 197 additions & 0 deletions src/Adapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Db;

use InvalidArgumentException;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Db\Connection\ConnectionInterface;
use Yiisoft\Db\Query\Query;

final class Adapter implements AdapterInterface
{
/**
* @var int timeout
*/
public $mutexTimeout = 3;
/**
* @var string table name
*/
public $tableName = '{{%queue}}';
/**
* @var bool ability to delete released messages from table
*/
public $deleteReleased = true;


public function __construct(
private ConnectionInterface $db,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
}

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (($payload = $this->reserve()) && ($result === true)) {
if ($result = $handlerCallback(\unserialize($payload['job']))) {
$this->release($payload);
}
}
}

public function status(string|int $id): JobStatus
{
$id = (int) $id;

$payload = (new Query($this->db))
->from($this->tableName)
->where(['id' => $id])
->one();

if (!$payload) {
if ($this->deleteReleased) {
return JobStatus::done();
}

throw new InvalidArgumentException("Unknown message ID: $id.");
}

if (!$payload['reserved_at']) {
return JobStatus::waiting();
}

if (!$payload['done_at']) {
return JobStatus::reserved();
}

return JobStatus::done();
}

public function push(MessageInterface $message): MessageInterface
{
$metadata = $message->getMetadata();
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => \serialize($message),
'pushed_at' => time(),
'ttr' => $metadata['ttr'] ?? 300,
'delay' => $metadata['delay'] ?? 0,
'priority' => $metadata['priority'] ?? 1024,
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
$key = $tableSchema ? $this->db->getLastInsertID($tableSchema->getSequenceName()) : $tableSchema;

return new IdEnvelope($message, $key);
}

public function subscribe(callable $handlerCallback): void
{
$this->runExisting($handlerCallback);
}

public function withChannel(string $channel): self
{
if ($channel === $this->channel) {
return $this;
}

$new = clone $this;
$new->channel = $channel;

return $new;
}

/**
* Takes one message from waiting list and reserves it for handling.
*
* @return array|null payload
* @throws \Exception in case it hasn't waited the lock
*/
protected function reserve(): array|null
{
// TWK TODO ??? return $this->db->useMaster(function () {
// TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
// TWK TODO ??? throw new \Exception('Has not waited the lock.');
// TWK TODO ??? }

try {
$this->moveExpired();

// Reserve one message
$payload = (new Query($this->db))
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one();
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();

// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);
}
}
} finally {
// TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel);
}

return $payload;
// TWK TODO ??? });
}

/**
* @param array $payload
*/
protected function release($payload): void
{
if ($this->deleteReleased) {
$this->db->createCommand()->delete(
$this->tableName,
['id' => $payload['id']]
)->execute();
} else {
$this->db->createCommand()->update(
$this->tableName,
['done_at' => time()],
['id' => $payload['id']]
)->execute();
}
}

/**
* Moves expired messages into waiting list.
*/
private function moveExpired(): void
{
if ($this->reserveTime !== time()) {
$this->reserveTime = time();
$this->db->createCommand()->update(
$this->tableName,
['reserved_at' => null],
'[[reserved_at]] < :time - [[ttr]] and [[reserved_at]] is not null and [[done_at]] is null',
[':time' => $this->reserveTime]
)->execute();
}
}

/**
* @var int reserve time
*/
private $reserveTime = 0;

}

0 comments on commit d11aa31

Please sign in to comment.