Skip to content

Commit

Permalink
Merge 69a5f98 into bc608a0
Browse files Browse the repository at this point in the history
  • Loading branch information
enumag committed May 3, 2020
2 parents bc608a0 + 69a5f98 commit 2263bdc
Show file tree
Hide file tree
Showing 19 changed files with 210 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private function subscribeToStreamAsync(): Promise
}

$eventAppeared = new class(Closure::fromCallable([$this, 'enqueuePushedEvent'])) implements EventAppearedOnSubscription {
private $callback;
private Closure $callback;

public function __construct(Closure $callback)
{
Expand All @@ -269,7 +269,7 @@ public function __invoke(
};

$subscriptionDropped = new class(Closure::fromCallable([$this, 'serverSubscriptionDropped'])) implements SubscriptionDropped {
private $callback;
private Closure $callback;

public function __construct(Closure $callback)
{
Expand Down
30 changes: 30 additions & 0 deletions src/Internal/EventStoreConnectionLogicHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class EventStoreConnectionLogicHandler
private bool $wasConnected = false;
private int $packageNumber = 0;
private int $lastTimeoutsTimeStamp;
private \SplObjectStorage $currentPromises;

public function __construct(EventStoreConnection $connection, ConnectionSettings $settings)
{
Expand All @@ -98,6 +99,7 @@ public function __construct(EventStoreConnection $connection, ConnectionSettings
$this->stopWatch = StopWatch::startNew();
// this allows first connection to connect quick
$this->lastTimeoutsTimeStamp = -$this->settings->operationTimeoutCheckPeriod();
$this->currentPromises = new \SplObjectStorage();

$this->handler->registerHandler(
StartConnectionMessage::class,
Expand Down Expand Up @@ -172,9 +174,32 @@ public function enqueueMessage(Message $message): void
{
$this->logDebug(\sprintf('enqueuing message %s', (string) $message));

$promise = $message->promise();
if ($promise !== null) {
$this->keepLoopAlive($promise);
}

$this->handler->handle($message);
}

private function keepLoopAlive(Promise $promise): void
{
if ($this->connection) {
$this->connection->reference();
}

$this->currentPromises->attach($promise);
$promise->onResolve(
function () use ($promise): void {
$this->currentPromises->detach($promise);

if ($this->connection && $this->currentPromises->count() === 0) {
$this->connection->unreference();
}
}
);
}

private function startConnection(Deferred $deferred, EndPointDiscoverer $endPointDiscoverer): void
{
$this->logDebug('startConnection');
Expand All @@ -184,6 +209,7 @@ private function startConnection(Deferred $deferred, EndPointDiscoverer $endPoin
$this->timerTickWatcherId = Loop::repeat(Consts::TIMER_PERIOD, function (): void {
$this->timerTick();
});
Loop::unreference($this->timerTickWatcherId);
$this->endPointDiscoverer = $endPointDiscoverer;
$this->state = ConnectionState::connecting();
$this->connectingPhase = ConnectingPhase::reconnecting();
Expand Down Expand Up @@ -329,6 +355,10 @@ function (TcpPackageConnection $connection, Throwable $exception): void {

yield $this->connection->connectAsync();

if ($this->currentPromises->count() === 0) {
$this->connection->unreference();
}

if (null !== $this->connection && ! $this->connection->isClosed()) {
$this->connection->startReceiving();
}
Expand Down
60 changes: 58 additions & 2 deletions src/Internal/EventStoreNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Prooph\EventStore\EventData;
use Prooph\EventStore\EventReadResult;
use Prooph\EventStore\EventReadStatus;
use Prooph\EventStore\EventStoreSubscription;
use Prooph\EventStore\Exception\InvalidArgumentException;
use Prooph\EventStore\Exception\InvalidOperationException;
use Prooph\EventStore\Exception\MaxQueueSizeLimitReached;
Expand All @@ -45,6 +46,7 @@
use Prooph\EventStore\StreamMetadata;
use Prooph\EventStore\StreamMetadataResult;
use Prooph\EventStore\SubscriptionDropped;
use Prooph\EventStore\SubscriptionDropReason;
use Prooph\EventStore\SystemSettings;
use Prooph\EventStore\UserCredentials;
use Prooph\EventStore\Util\Guid;
Expand Down Expand Up @@ -647,6 +649,32 @@ public function subscribeToStreamAsync(
throw new InvalidArgumentException('Stream cannot be empty');
}

$stopped = new Deferred();

$subscriptionDropped = new class($subscriptionDropped, $stopped) implements SubscriptionDropped {
private ?SubscriptionDropped $callback;

private Deferred $stopped;

public function __construct(?SubscriptionDropped $callback, Deferred $stopped)
{
$this->callback = $callback;
$this->stopped = $stopped;
}

public function __invoke(
EventStoreSubscription $subscription,
SubscriptionDropReason $reason,
?Throwable $exception = null
): void {
if ($this->callback) {
($this->callback)($subscription, $reason, $exception);
}

$this->stopped->resolve();
}
};

$deferred = new Deferred();

$this->handler->enqueueMessage(new StartSubscriptionMessage(
Expand All @@ -657,7 +685,8 @@ public function subscribeToStreamAsync(
$eventAppeared,
$subscriptionDropped,
$this->settings->maxRetries(),
$this->settings->operationTimeout()
$this->settings->operationTimeout(),
$stopped->promise()
));

return $deferred->promise();
Expand Down Expand Up @@ -705,6 +734,32 @@ public function subscribeToAllAsync(
?SubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): Promise {
$stopped = new Deferred();

$subscriptionDropped = new class($subscriptionDropped, $stopped) implements SubscriptionDropped {
private ?SubscriptionDropped $callback;

private Deferred $stopped;

public function __construct(?SubscriptionDropped $callback, Deferred $stopped)
{
$this->callback = $callback;
$this->stopped = $stopped;
}

public function __invoke(
EventStoreSubscription $subscription,
SubscriptionDropReason $reason,
?Throwable $exception = null
): void {
if ($this->callback) {
($this->callback)($subscription, $reason, $exception);
}

$this->stopped->resolve();
}
};

$deferred = new Deferred();

$this->handler->enqueueMessage(new StartSubscriptionMessage(
Expand All @@ -715,7 +770,8 @@ public function subscribeToAllAsync(
$eventAppeared,
$subscriptionDropped,
$this->settings->maxRetries(),
$this->settings->operationTimeout()
$this->settings->operationTimeout(),
$stopped->promise()
));

return $deferred->promise();
Expand Down
9 changes: 6 additions & 3 deletions src/Internal/EventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public function startSubscription(
?UserCredentials $userCredentials,
Closure $onEventAppeared,
?Closure $onSubscriptionDropped,
ConnectionSettings $settings
ConnectionSettings $settings,
Promise $stopped
): Promise {
$deferred = new Deferred();

Expand All @@ -117,7 +118,8 @@ public function startSubscription(
$onEventAppeared,
$onSubscriptionDropped,
$settings->maxRetries(),
$settings->operationTimeout()
$settings->operationTimeout(),
$stopped
));

return $deferred->promise();
Expand Down Expand Up @@ -149,7 +151,8 @@ public function start(): Promise
$this->userCredentials,
$eventAppeared,
$subscriptionDropped,
$this->settings
$this->settings,
$this->stopped->promise()
);

$deferred = new Deferred();
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/Message/CloseConnectionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Promise;
use Throwable;

/** @internal */
Expand All @@ -37,6 +38,11 @@ public function exception(): ?Throwable
return $this->exception;
}

public function promise(): ?Promise
{
return null;
}

public function __toString(): string
{
return 'CloseConnectionMessage';
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/Message/EstablishTcpConnectionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Promise;
use Prooph\EventStoreClient\Internal\NodeEndPoints;

/** @internal */
Expand All @@ -30,6 +31,11 @@ public function nodeEndPoints(): NodeEndPoints
return $this->nodeEndPoints;
}

public function promise(): ?Promise
{
return null;
}

public function __toString(): string
{
return 'EstablishTcpConnectionMessage';
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/Message/HandleTcpPackageMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Promise;
use Prooph\EventStoreClient\SystemData\TcpPackage;
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;

Expand All @@ -38,6 +39,11 @@ public function tcpPackage(): TcpPackage
return $this->tcpPackage;
}

public function promise(): ?Promise
{
return null;
}

public function __toString(): string
{
return 'HandleTcpPackageMessage';
Expand Down
4 changes: 4 additions & 0 deletions src/Internal/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@

namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Promise;

/** @internal */
interface Message
{
public function promise(): ?Promise;

public function __toString(): string;
}
6 changes: 6 additions & 0 deletions src/Internal/Message/StartConnectionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Deferred;
use Amp\Promise;
use Prooph\EventStoreClient\Internal\EndPointDiscoverer;

/** @internal */
Expand All @@ -38,6 +39,11 @@ public function endPointDiscoverer(): EndPointDiscoverer
return $this->endPointDiscoverer;
}

public function promise(): ?Promise
{
return $this->deferred->promise();
}

public function __toString(): string
{
return 'StartConnectionMessage';
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/Message/StartOperationMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Promise;
use Prooph\EventStoreClient\ClientOperations\ClientOperation;

/** @internal */
Expand Down Expand Up @@ -44,6 +45,11 @@ public function timeout(): int
return $this->timeout;
}

public function promise(): ?Promise
{
return $this->operation->promise();
}

public function __toString(): string
{
return 'StartOperationMessage';
Expand Down
11 changes: 10 additions & 1 deletion src/Internal/Message/StartPersistentSubscriptionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Deferred;
use Amp\Promise;
use Closure;
use Prooph\EventStore\UserCredentials;

Expand All @@ -29,6 +30,7 @@ class StartPersistentSubscriptionMessage implements Message
private ?Closure $subscriptionDropped;
private int $maxRetries;
private int $timeout;
private Promise $stopped;

public function __construct(
Deferred $deferred,
Expand All @@ -39,7 +41,8 @@ public function __construct(
Closure $eventAppeared,
?Closure $subscriptionDropped,
int $maxRetries,
int $timeout
int $timeout,
Promise $stopped
) {
$this->deferred = $deferred;
$this->subscriptionId = $subscriptionId;
Expand All @@ -50,6 +53,7 @@ public function __construct(
$this->subscriptionDropped = $subscriptionDropped;
$this->maxRetries = $maxRetries;
$this->timeout = $timeout;
$this->stopped = $stopped;
}

public function deferred(): Deferred
Expand Down Expand Up @@ -97,6 +101,11 @@ public function timeout(): int
return $this->timeout;
}

public function promise(): ?Promise
{
return $this->stopped;
}

public function __toString(): string
{
return 'StartPersistentSubscriptionMessage';
Expand Down

0 comments on commit 2263bdc

Please sign in to comment.