Skip to content

Commit

Permalink
Merge pull request #23 from makise-co/feature/pq-concurrent-statement…
Browse files Browse the repository at this point in the history
…s-fix

Another promise implementation for PqHandle statements
  • Loading branch information
codercms authored Oct 8, 2021
2 parents 1602c10 + 834a694 commit 2a1ea53
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 32 deletions.
57 changes: 25 additions & 32 deletions src/Driver/Pq/PqHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PqHandle implements Handle
private bool $isConcurrent = true;

/**
* @var StatementStorage[]
* @var PqStatementStorage[]
*/
private array $statements = [];

Expand Down Expand Up @@ -170,46 +170,41 @@ public function prepare(string $sql): Statement
++$storage->refCount;

// Statement may be being allocated or deallocated. Wait to finish, then check for existence again.
if ($storage->lock->isLocked()) {
if ($storage->promise instanceof Promise) {
// Do not return promised prepared statement object, as the $names array may differ.
$storage->lock->wait();
$storage->promise->wait();

--$storage->refCount;

if ($storage->error !== null) {
throw $storage->error;
}

continue;
}

return new PqStatement($this, $name, $sql, $names);
}

$storage = new StatementStorage();
$storage = new PqStatementStorage();
$storage->sql = $sql;

$this->statements[$name] = $storage;

$storage->lock->lock();

try {
$storage->statement = $this->send(
$sql,
[$this->handle, "prepareAsync"],
$name,
$modifiedSql
);
$promise = new Promise(function () use ($sql, $name, $modifiedSql) {
return $this->send(
$sql,
[$this->handle, "prepareAsync"],
$name,
$modifiedSql
);
});

$storage->promise = $promise;
$storage->statement = $promise->getResult();
} catch (Throwable $exception) {
unset($this->statements[$name]);

$storage->error = $exception;

throw $exception;
} finally {
$storage->isAllocating = false;
$storage->isDeallocating = false;
$storage->lock->unlock();
$storage->promise = null;
}

return new PqStatement($this, $name, $sql, $names);
Expand Down Expand Up @@ -411,17 +406,15 @@ public function statementDeallocate(string $name): void
return;
}

$storage->lock->lock();
$storage->isDeallocating = true;

try {
$this->send(null, [$storage->statement, "deallocateAsync"]);
} finally {
unset($this->statements[$name]);

$storage->isDeallocating = false;
$storage->lock->unlock();
}
$storage->promise = new Promise(function () use ($storage, $name) {
try {
$this->send(null, [$storage->statement, "deallocateAsync"]);
} finally {
unset($this->statements[$name]);
}
});
// await resolution
// $storage->promise->getResult();
}

private function free(): void
Expand Down
30 changes: 30 additions & 0 deletions src/Driver/Pq/PqStatementStorage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php
/**
* This file is part of the Makise-Co Postgres Client
* World line: 0.571024a
*
* (c) Dmitry K. <coder1994@gmail.com>
*/

declare(strict_types=1);

namespace MakiseCo\Postgres\Driver\Pq;

use pq;

/**
* @internal
*/
final class PqStatementStorage
{
public int $refCount = 1;

/**
* Synchronization object to allocate/deallocate statement
*/
public ?Promise $promise;

public pq\Statement $statement;

public string $sql;
}
92 changes: 92 additions & 0 deletions src/Driver/Pq/Promise.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?php
/**
* This file is part of the Makise-Co Postgres Client
* World line: 0.571024a
*
* (c) Dmitry K. <coder1994@gmail.com>
*/

declare(strict_types=1);

namespace MakiseCo\Postgres\Driver\Pq;

use Swoole\Coroutine\Channel;
use Swoole\Coroutine;
use Swoole\Event;

class Promise
{
private Channel $chan;
private \Closure $func;

/**
* @var mixed
*/
private $result;

private bool $awaited = false;
private bool $awaiting = false;
private array $awaitors = [];

public function __construct(\Closure $func)
{
$this->chan = new Channel(1);
$this->func = $func;

Coroutine::create(function () {
try {
$result = ($this->func)();
$this->chan->push($result);
} catch (\Throwable $e) {
$this->chan->push($e);
}
});
}

/**
* Wait until promise result is resolved
*/
public function wait(): void
{
if ($this->awaited) {
return;
}

if ($this->awaiting) {
$this->awaitors[] = Coroutine::getCid();
Coroutine::yield();

return;
}

$this->awaiting = true;

$this->result = $this->chan->pop();
$this->chan->close();
$this->awaited = true;

$awaiters = $this->awaitors;
Event::defer(static function () use ($awaiters) {
foreach ($awaiters as $awaitor) {
Coroutine::resume($awaitor);
}
});
}

/**
* Wait until promise resolved and return result
*
* @return mixed success result
* @throws \Throwable error result
*/
public function getResult()
{
$this->wait();

if ($this->result instanceof \Throwable) {
throw $this->result;
}

return $this->result;
}
}

0 comments on commit 2a1ea53

Please sign in to comment.