Skip to content

Commit

Permalink
Got rid of some 'Unhandled promise rejections'
Browse files Browse the repository at this point in the history
  • Loading branch information
raoul committed Nov 29, 2023
1 parent 492fad5 commit e689694
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 289 deletions.
96 changes: 52 additions & 44 deletions src/AsyncConnectionManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function connect(): PromiseInterface
if ($this->isConnected() && !$this->isDisconnecting()) {
$this->logger->debug('Connected');

return resolve(new AsyncConnectionResult($this->writer, false));
return resolve(AsyncConnectionResult::success($this->writer, false));
}

if ($this->isConnecting()) {
Expand All @@ -50,44 +50,48 @@ public function connect(): PromiseInterface
$waitUntilDisconnectEnds = $this->disconnectionPromise->promise();

} else {
$waitUntilDisconnectEnds = resolve(true);
$waitUntilDisconnectEnds = resolve(AsyncDisconnectionResult::success());
}

$this->connectionPromise = new Deferred();

$doAfterFailedDisconnect = function (Throwable $e) {
$this->logger->debug('Disconnection failed. No need to reconnect now.');
$this->connectionPromise->resolve(null);
$this->connectionPromise = null;
return $waitUntilDisconnectEnds->then(function (AsyncDisconnectionResult $result) {
if (!$result->isDisconnected()) {
$this->logger->debug('Disconnection failed. No need to reconnect now.');
$result = AsyncConnectionResult::success($this->writer, false);
$this->connectionPromise->resolve($result);

return resolve(new AsyncConnectionResult($this->writer, false));
};
return resolve($result);
}

return $waitUntilDisconnectEnds->then(function () {
$this->logger->debug('Connecting...');

return $this->asyncConnector->connect()->then(
function (AsyncConnectionWriter $writer) {
$this->logger->debug('Connecting succeeded');
$this->writer = $writer;
$this->connectionPromise->resolve(null);
$this->connectionPromise = null;

return resolve(new AsyncConnectionResult($writer, true));
},
function (Throwable $e): void {
return $this->asyncConnector->connect()
->then(
function (AsyncConnectionWriter $writer) {
$this->logger->debug('Connecting succeeded');
$this->writer = $writer;
$result = AsyncConnectionResult::success($writer, true);
$this->connectionPromise->resolve($result);

return resolve($result);
},
)
->catch(function (Throwable $e): PromiseInterface {
$this->logger->error('Connecting failed');
$this->connectionPromise->reject($e);
$this->connectionPromise = null;
$this->connectionPromise->resolve(AsyncConnectionResult::failure($e));

if ($e instanceof TimeoutException) {
throw new AsyncConnectionTimeoutException($e->getMessage(), $e->getCode(), $e);
$exception = new AsyncConnectionTimeoutException($e->getMessage(), $e->getCode(), $e);

return resolve(AsyncConnectionResult::failure($exception));
}

throw $e;
},
);
}, $doAfterFailedDisconnect);
return resolve(AsyncConnectionResult::failure($e));
});
})->finally(function (): void {
$this->connectionPromise = null;
});
}

public function disconnect(): PromiseInterface
Expand All @@ -101,43 +105,47 @@ public function disconnect(): PromiseInterface
if (!$this->isConnected() && !$this->isConnecting()) {
$this->logger->debug('Not connected');

return resolve('Not connected.');
return resolve(AsyncDisconnectionResult::success());
}

if ($this->isConnecting()) {
$this->logger->debug('Connection in progress. Waiting ...');
$waitUntilFinished = $this->connectionPromise->promise();

} else {
$waitUntilFinished = resolve(true);
} elseif ($this->isConnected()) {
$waitUntilFinished = resolve(AsyncConnectionResult::success($this->writer));
}

$this->disconnectionPromise = new Deferred();

return $waitUntilFinished->then(function () {
return $waitUntilFinished->then(function (AsyncConnectionResult $connectionResult) {
if (!$connectionResult->isConnected()) {
$this->logger->error('Connection failed. No need to disconnect now.');

return resolve(AsyncDisconnectionResult::success());
}
$this->logger->debug('Disconnection started');

return $this->asyncConnector->disconnect($this->writer)
->then(function ($value) {
->then(function () {
$this->logger->debug('Disconnection succeeded');
$this->disconnectionPromise->resolve(null);
$this->disconnectionPromise = null;
$this->writer = null;

return resolve($value);
}, function (Throwable $e): void {
$result = AsyncDisconnectionResult::success();
$this->disconnectionPromise->resolve($result);

return resolve($result);
})
->catch(function (Throwable $e) {
$this->logger->debug('Disconnection failed');
$this->disconnectionPromise->reject($e);
$this->disconnectionPromise = null;

throw $e;
});
}, function (Throwable $e) {
$this->logger->error('Connection failed. No need to disconnect now.');
$this->disconnectionPromise->resolve(null);
$this->disconnectionPromise = null;
$result = AsyncDisconnectionResult::failure($e);
$this->disconnectionPromise->resolve($result);

return resolve(true);
return resolve($result);
})->finally(function (): void {
$this->disconnectionPromise = null;
});
});
}

Expand Down
37 changes: 29 additions & 8 deletions src/AsyncConnectionResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,48 @@

namespace AsyncConnection;

use Throwable;

class AsyncConnectionResult
{

private AsyncConnectionWriter $asyncConnectionWriter;
private function __construct(
private bool $isSuccess,
private ?AsyncConnectionWriter $asyncConnectionWriter = null,
private ?bool $connectionRequest = null, // false = already existing connection returned
private ?Throwable $error = null
)
{
}

private bool $connectionRequest; // false = already existing connection returned
public static function success(AsyncConnectionWriter $writer, ?bool $connectionRequest = null): self
{
return new self(true, $writer, $connectionRequest);
}

public function __construct(AsyncConnectionWriter $asyncConnectionWriter, bool $connectionRequest)
public static function failure(Throwable $error): self
{
$this->asyncConnectionWriter = $asyncConnectionWriter;
$this->connectionRequest = $connectionRequest;
return new self(false, null, null, $error);
}

public function getWriter(): AsyncConnectionWriter
public function isConnected(): bool
{
return $this->isSuccess;
}

public function getWriter(): ?AsyncConnectionWriter
{
return $this->asyncConnectionWriter;
}

public function hasConnectedToServer(): bool
public function newServerRequestWasSent(): ?bool
{
return $this->connectionRequest === true;
}

public function getError(): ?Throwable
{
return $this->connectionRequest;
return $this->error;
}

}
37 changes: 37 additions & 0 deletions src/AsyncDisconnectionResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php declare(strict_types = 1);

namespace AsyncConnection;

use Throwable;

class AsyncDisconnectionResult
{

private function __construct(
private bool $isSuccess,
private ?Throwable $error = null,
)
{
}

public static function success(): self
{
return new self(true);
}

public static function failure(Throwable $error): self
{
return new self(false, $error);
}

public function isDisconnected(): bool
{
return $this->isSuccess;
}

public function getError(): ?Throwable
{
return $this->error;
}

}
21 changes: 12 additions & 9 deletions src/AsyncMessageQueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use function array_slice;
use function array_values;
use function count;
use function React\Promise\reject;
use function React\Promise\resolve;
use function sprintf;
use function time;
Expand Down Expand Up @@ -121,21 +122,23 @@ function () use ($requestsCounter) {
},
)->then(
function (AsyncConnectionResult $result) use ($requestsCounter) {
$this->log('connected', $requestsCounter);
if ($result->isConnected()) {
$this->log('connected', $requestsCounter);

return $this->minIntervalPromise->then(static fn () => resolve($result));
},
function (Throwable $exception) use ($requestsCounter): void {
return $this->minIntervalPromise->then(static fn () => resolve($result));
}

$exception = $result->getError();
$this->log('connection failed', $requestsCounter);
$this->finishWithError($exception, $requestsCounter);

throw $exception;
return reject($exception);
},
)->then(
fn (AsyncConnectionResult $result) => $this->asyncMessageSender->sendMessage($result->getWriter(), $message)->then(static fn () => resolve($result)),
)->then(
function (AsyncConnectionResult $result) use ($requestsCounter): void {
if ($result->hasConnectedToServer()) {
if ($result->newServerRequestWasSent()) {
self::$sentMessagesCount = 1;
} else {
self::$sentMessagesCount++;
Expand All @@ -144,14 +147,14 @@ function (AsyncConnectionResult $result) use ($requestsCounter): void {
$this->lastSentMessageTime = time();
$this->finishWithSuccess($requestsCounter);
},
function (Throwable $exception) use ($requestsCounter): void {
function (Throwable $exception) use ($requestsCounter): PromiseInterface {
$this->log('sending failed', $requestsCounter);
$this->forceReconnect = true;
$this->finishWithError($exception, $requestsCounter);

throw $exception;
return reject($exception);
},
);
)->catch(static fn (Throwable $e) => reject($e));
}

public function getQueuedMessagesCount(): int
Expand Down
42 changes: 42 additions & 0 deletions src/AsyncResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php declare(strict_types = 1);

namespace AsyncConnection;

use Throwable;

class AsyncResult
{

private function __construct(
private bool $isSuccess,
private ?Throwable $error = null
)
{
}

public static function success(): self
{
return new self(true);
}

public static function failure(Throwable $error): self
{
return new self(false, $error);
}

public function getError(): ?Throwable
{
return $this->error;
}

public function isSuccess(): bool
{
return $this->isSuccess;
}

public function isFailure(): bool
{
return !$this->isSuccess();
}

}
Loading

0 comments on commit e689694

Please sign in to comment.