Skip to content

Commit

Permalink
Apply fixes from StyleCI
Browse files Browse the repository at this point in the history
  • Loading branch information
StyleCIBot committed Apr 3, 2024
1 parent 707c126 commit c6b76d3
Showing 1 changed file with 55 additions and 56 deletions.
111 changes: 55 additions & 56 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ final class Adapter implements AdapterInterface
* @var bool Ability to delete released messages from table.
*/
public $deleteReleased = true;



public function __construct(
private ConnectionInterface $db,
private MessageSerializerInterface $serializer,
Expand All @@ -48,35 +47,35 @@ public function __construct(
}

public function runExisting(callable $handlerCallback): void
{
{
$this->run($handlerCallback, false);
}

public function status(string|int $id): JobStatus
{
$id = (int) $id;

$payload = (new Query($this->db))
->from($this->tableName)
->where(['id' => $id])
->one();

if (!$payload) {
if ($this->deleteReleased) {
return JobStatus::done();
}

throw new InvalidArgumentException("Unknown message ID: $id.");
}

if (!$payload['reserved_at']) {
return JobStatus::waiting();
}

if (!$payload['done_at']) {
return JobStatus::reserved();
}

return JobStatus::done();
}

Expand All @@ -93,7 +92,7 @@ public function push(MessageInterface $message): MessageInterface
])->execute();
$tableSchema = $this->db->getTableSchema($this->tableName);
$key = $tableSchema ? $this->db->getLastInsertID($tableSchema->getSequenceName()) : $tableSchema;

return new IdEnvelope($message, $key);
}

Expand All @@ -111,57 +110,57 @@ public function withChannel(string $channel): self
$new = clone $this;
$new->channel = $channel;
$new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel);

return $new;
}

/**
* Takes one message from waiting list and reserves it for handling.
*
* @return array|null payload
* @throws \Exception in case it hasn't waited the lock
* @return array|null payload
*/
protected function reserve(): array|null
{
// TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () {
if (!$this->mutex->acquire($this->mutexTimeout)) {
throw new \Exception('Has not waited the lock.');
}

try {
$this->moveExpired();

// Reserve one message
$payload = (new Query($this->db))
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one();
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();

// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);
}
if (!$this->mutex->acquire($this->mutexTimeout)) {
throw new \Exception('Has not waited the lock.');
}

try {
$this->moveExpired();

// Reserve one message
$payload = (new Query($this->db))
->from($this->tableName)
->andWhere(['channel' => $this->channel, 'reserved_at' => null])
->andWhere('[[pushed_at]] <= :time - [[delay]]', [':time' => time()])
->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC])
->limit(1)
->one();
if (is_array($payload)) {
$payload['reserved_at'] = time();
$payload['attempt'] = (int) $payload['attempt'] + 1;
$this->db->createCommand()->update($this->tableName, [
'reserved_at' => $payload['reserved_at'],
'attempt' => $payload['attempt'],
], [
'id' => $payload['id'],
])->execute();

// pgsql
if (is_resource($payload['job'])) {
$payload['job'] = stream_get_contents($payload['job']);
}
} finally {
$this->mutex->release();
}

return $payload;
} finally {
$this->mutex->release();
}

return $payload;
// TWK TODO ??? });
}

/**
* @param array $payload
*/
Expand All @@ -171,16 +170,16 @@ protected function release($payload): void
$this->db->createCommand()->delete(
$this->tableName,
['id' => $payload['id']]
)->execute();
)->execute();
} else {
$this->db->createCommand()->update(
$this->tableName,
['done_at' => time()],
['id' => $payload['id']]
)->execute();
)->execute();
}
}

/**
* Moves expired messages into waiting list.
*/
Expand All @@ -193,15 +192,15 @@ private function moveExpired(): void
['reserved_at' => null],
'[[reserved_at]] < :time - [[ttr]] and [[reserved_at]] is not null and [[done_at]] is null',
[':time' => $this->reserveTime]
)->execute();
)->execute();
}
}

/**
* @var int reserve time
*/
private $reserveTime = 0;

/**
* Listens queue and runs each job.
*
Expand All @@ -210,17 +209,17 @@ private function moveExpired(): void
* @param non-negative-int $timeout number of seconds to sleep before next iteration.
*/
public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void
{
{
while ($this->loop->canContinue()) {
if ($payload = $this->reserve()) {
if ($handlerCallback($this->serializer->unserialize($payload['job']))) {
$this->release($payload);
}
continue;
}
}
if (!$repeat) {
break;
}
}
if ($timeout > 0) {
sleep($timeout);
}
Expand Down

0 comments on commit c6b76d3

Please sign in to comment.