Skip to content

Commit

Permalink
Merge pull request #22 from makise-co/feature/pq-concurrent-statement…
Browse files Browse the repository at this point in the history
…s-fix

Try again to concurrency statement handling
  • Loading branch information
codercms authored Oct 8, 2021
2 parents 814f1a2 + 7a83734 commit 1602c10
Showing 1 changed file with 16 additions and 52 deletions.
68 changes: 16 additions & 52 deletions src/Driver/Pq/PqHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -164,36 +164,34 @@ public function prepare(string $sql): Statement
$name = Handle::STATEMENT_NAME_PREFIX . sha1($modifiedSql);

// prevent returning of promised statement which is in de-allocation state
if (isset($this->statements[$name]) && !$this->statements[$name]->isDeallocating) {
while (isset($this->statements[$name])) {
$storage = $this->statements[$name];

++$storage->refCount;

// if statement is in allocation state we should wait until allocation done
if ($storage->isAllocating) {
// Statement may be being allocated or deallocated. Wait to finish, then check for existence again.
if ($storage->lock->isLocked()) {
// Do not return promised prepared statement object, as the $names array may differ.
$storage->lock->wait();

--$storage->refCount;

if ($storage->error !== null) {
throw $storage->error;
}

continue;
}

return new PqStatement($this, $name, $sql, $names);
}

// wait for statement complete de-allocation, before allocating new one
if (isset($this->statements[$name]) && $this->statements[$name]->isDeallocating) {
$this->statements[$name]->lock->wait();
}

$storage = new StatementStorage();
$storage->sql = $sql;

$this->statements[$name] = $storage;

$storage->lock->lock();
$storage->isAllocating = true;

try {
$storage->statement = $this->send(
Expand All @@ -202,57 +200,18 @@ public function prepare(string $sql): Statement
$name,
$modifiedSql
);
} catch (Exception\QueryExecutionError $e) {
$sqlState = $e->getDiagnostics()['sqlstate'] ?? '';
if ($sqlState !== '42P05') {
throw $e;
}

/*
* https://github.com/php/php-src/blob/master/ext/pdo_pgsql/pgsql_statement.c#L192
* 42P05 means that the prepared statement already existed. this can happen if you use
* a connection pooling software line pgpool which doesn't close the db-connection once
* php disconnects. if php dies (no chance to run RSHUTDOWN) during execution it has no
* chance to DEALLOCATE the prepared statements it has created. so, if we hit a 42P05 we
* deallocate it and retry ONCE (thies 2005.12.15)
*/

// try to allocate statement again
try {
$this->send(
"DEALLOCATE {$name}",
[$this->handle, "execAsync"],
"DEALLOCATE {$name}"
);

$storage->statement = $this->send(
$sql,
[$this->handle, "prepareAsync"],
$name,
$modifiedSql
);
} catch (Throwable $e) {
unset($this->statements[$name]);

$storage->error = $e;
$storage->isAllocating = false;
$storage->lock->unlock();

throw $e;
}
} catch (Throwable $exception) {
unset($this->statements[$name]);

$storage->error = $exception;
$storage->isAllocating = false;
$storage->lock->unlock();

throw $exception;
} finally {
$storage->isAllocating = false;
$storage->isDeallocating = false;
$storage->lock->unlock();
}

$storage->isAllocating = false;
$storage->lock->unlock();

return new PqStatement($this, $name, $sql, $names);
}

Expand Down Expand Up @@ -605,6 +564,11 @@ private function fetch()
*/
private function release(): void
{
// https://github.com/amphp/postgres/pull/44
while ($this->handle->busy && $this->handle->getResult()) {
// nothing
}

$this->deferred->unlock();
}

Expand Down

0 comments on commit 1602c10

Please sign in to comment.