Skip to content

Commit

Permalink
Try to fix random Swoole crash
Browse files Browse the repository at this point in the history
* Swoole crash occurs after detecting that pq\Connection is busy in the `PqHandle::send` method
* Added more verbosity for all operations that call the `send` method
*
  • Loading branch information
codercms committed Oct 12, 2021
1 parent 4ad0f99 commit 7b94420
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions src/Driver/Pq/PqHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ class PqHandle implements Handle
private ?pq\Connection $handle;

/** @var Deferred|null */
private $deferred;
private ?Deferred $deferred = null;

/** @var Deferred|null */
private $busy;
private ?Deferred $busy = null;

/** @var Coroutine\Channel[] */
private $listeners = [];
Expand All @@ -46,6 +46,7 @@ class PqHandle implements Handle
private int $lastUsedAt;

private int $fd = -1;
private string $prevOp = '';
private string $prevSql = '';

/**
Expand Down Expand Up @@ -120,6 +121,7 @@ public function close(): void
}

/**
* @param string $op
* @param string|null Query SQL or null if not related.
* @param callable $method Method to execute.
* @param mixed ...$args Arguments to pass to function.
Expand All @@ -128,14 +130,11 @@ public function close(): void
*
* @throws FailureException
*/
private function send(?string $sql, callable $method, ...$args)
private function send(string $op, ?string $sql, callable $method, ...$args)
{
while ($this->busy) {
try {
$this->busy->getResult();
} catch (\Throwable $exception) {
// Ignore failure from another operation.
}
// Ignore failure from another operation.
$this->busy->wait();
}

if (!$this->handle) {
Expand All @@ -146,10 +145,18 @@ private function send(?string $sql, callable $method, ...$args)
$this->deferred = $this->busy = new Deferred;

if ($this->handle->busy) {
var_dump("WARNING: connection is busy after sql: {$this->prevSql}", $this->handle);
}
while ($this->handle->busy && $res = $this->handle->getResult()) {
var_dump('connection busy', $res);
var_dump(
"WARNING: connection fd={$this->fd} status={$this->handle->status} is busy after sql: " .
"({$this->prevOp}) {$this->prevSql}"
);

while ($this->handle->busy && ($res = $this->handle->getResult())) {
var_dump(
"connection fd={$this->fd} status={$this->handle->status} is busy, " .
"result is object: " . \get_class($res)
);
var_dump($res);
}
}

$handle = $method(...$args);
Expand All @@ -160,12 +167,12 @@ private function send(?string $sql, callable $method, ...$args)
Event::set($this->fd, null, null, SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
}

$this->prevSql = $sql ?? '';

$result = $this->deferred->getResult();
} catch (pq\Exception $exception) {
throw new FailureException($this->handle->errorMessage, 0, $exception);
} finally {
$this->prevOp = $op;
$this->prevSql = $sql ?? '';
$this->deferred = $this->busy = null;
}

Expand Down Expand Up @@ -281,7 +288,7 @@ public function statementExecute(string $name, array $params)

\assert($storage->statement instanceof pq\Statement, "Statement storage in invalid state");

return $this->send($storage->sql, [$storage->statement, "execAsync"], $params);
return $this->send('EXECUTE', $storage->sql, [$storage->statement, "execAsync"], $params);
}

/**
Expand Down Expand Up @@ -309,7 +316,7 @@ public function statementDeallocate(string $name): void
$storage->promise = new Promise(
function () use ($storage, $name) {
try {
return $this->send(null, [$storage->statement, "deallocateAsync"]);
return $this->send('DEALLOCATE', $storage->sql, [$storage->statement, "deallocateAsync"]);
} finally {
unset($this->statements[$name]);
}
Expand All @@ -328,7 +335,7 @@ public function query(string $sql)
throw new ConnectionException("The connection to the database has been closed");
}

return $this->send($sql, [$this->handle, "execAsync"], $sql);
return $this->send('QUERY', $sql, [$this->handle, "execAsync"], $sql);
}

/**
Expand All @@ -343,7 +350,7 @@ public function execute(string $sql, array $params = [])
$sql = Internal\parseNamedParams($sql, $names);
$params = Internal\replaceNamedParams($params, $names);

return $this->send($sql, [$this->handle, "execParamsAsync"], $sql, $params);
return $this->send('EXECUTE', $sql, [$this->handle, "execParamsAsync"], $sql, $params);
}

/**
Expand Down Expand Up @@ -383,7 +390,7 @@ public function prepare(string $sql): Statement

$storage->promise = new Promise(
function () use ($sql, $name, $modifiedSql) {
return $this->send($sql, [$this->handle, "prepareAsync"], $name, $modifiedSql);
return $this->send('PREPARE', $sql, [$this->handle, "prepareAsync"], $name, $modifiedSql);
}
);

Expand All @@ -404,7 +411,7 @@ function () use ($sql, $name, $modifiedSql) {
*/
public function notify(string $channel, string $payload = ""): CommandResult
{
$result = $this->send(null, [$this->handle, "notifyAsync"], $channel, $payload);
$result = $this->send('NOTIFY', "NOTIFY {$channel}", [$this->handle, "notifyAsync"], $channel, $payload);
if ($result instanceof CommandResult) {
return $result;
}
Expand All @@ -428,7 +435,8 @@ public function listen(string $channel): Listener

try {
$this->send(
null,
'LISTEN',
"LISTEN {$channel}",
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($emitter) {
Expand Down Expand Up @@ -466,7 +474,7 @@ private function unlisten(string $channel): ?CommandResult
return null; // Connection already closed.
}

return $this->send(null, [$this->handle, "unlistenAsync"], $channel);
return $this->send('UNLISTEN', "UNLISTEN {$channel}", [$this->handle, "unlistenAsync"], $channel);
}

/**
Expand All @@ -475,7 +483,7 @@ private function unlisten(string $channel): ?CommandResult
public function quoteString(string $data): string
{
if (!$this->handle) {
throw new \Error("The connection to the database has been closed");
throw new ConnectionException("The connection to the database has been closed");
}

return $this->handle->quote($data);
Expand All @@ -487,7 +495,7 @@ public function quoteString(string $data): string
public function quoteName(string $name): string
{
if (!$this->handle) {
throw new \Error("The connection to the database has been closed");
throw new ConnectionException("The connection to the database has been closed");
}

return $this->handle->quoteName($name);
Expand Down

0 comments on commit 7b94420

Please sign in to comment.