diff --git a/examples/demo-subscription-with-logger.php b/examples/demo-subscription-with-logger.php index 72955baa..e4bec59c 100644 --- a/examples/demo-subscription-with-logger.php +++ b/examples/demo-subscription-with-logger.php @@ -42,7 +42,7 @@ yield $connection->connectAsync(); - $connection->subscribeToStreamFrom( + yield $connection->subscribeToStreamFromAsync( 'foo-bar', null, CatchUpSubscriptionSettings::default(), diff --git a/examples/demo-subscription1.php b/examples/demo-subscription1.php index e93f91d6..374796f9 100644 --- a/examples/demo-subscription1.php +++ b/examples/demo-subscription1.php @@ -37,7 +37,7 @@ yield $connection->connectAsync(); - $connection->subscribeToStreamFrom( + yield $connection->subscribeToStreamFromAsync( 'foo-bar', null, CatchUpSubscriptionSettings::default(), diff --git a/examples/demo-subscription2.php b/examples/demo-subscription2.php index 24e3833b..66dd7d5d 100644 --- a/examples/demo-subscription2.php +++ b/examples/demo-subscription2.php @@ -37,7 +37,7 @@ yield $connection->connectAsync(); - $connection->subscribeToAllFrom( + yield $connection->subscribeToAllFromAsync( null, CatchUpSubscriptionSettings::default(), new class() implements EventAppearedOnCatchupSubscription { diff --git a/src/ClientOperations/ReadAllEventsBackwardOperation.php b/src/ClientOperations/ReadAllEventsBackwardOperation.php index df13a22d..795ff92c 100644 --- a/src/ClientOperations/ReadAllEventsBackwardOperation.php +++ b/src/ClientOperations/ReadAllEventsBackwardOperation.php @@ -111,8 +111,9 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice foreach ($records as $record) { /** @var ResolvedIndexedEvent $record */ - $event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent()); - $link = null; + if ($event = $record->getEvent()) { + $event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent()); + } if ($link = $record->getLink()) { $link = EventMessageConverter::convertEventRecordMessageToEventRecord($link); diff --git a/src/ClientOperations/ReadAllEventsForwardOperation.php b/src/ClientOperations/ReadAllEventsForwardOperation.php index 00d42eec..93c7f582 100644 --- a/src/ClientOperations/ReadAllEventsForwardOperation.php +++ b/src/ClientOperations/ReadAllEventsForwardOperation.php @@ -110,8 +110,9 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice foreach ($records as $record) { /** @var ResolvedIndexedEvent $record */ - $event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent()); - $link = null; + if ($event = $record->getEvent()) { + $event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent()); + } if ($link = $record->getLink()) { $link = EventMessageConverter::convertEventRecordMessageToEventRecord($link); diff --git a/src/ClientOperations/ReadStreamEventsForwardOperation.php b/src/ClientOperations/ReadStreamEventsForwardOperation.php index 7075d189..b701b353 100644 --- a/src/ClientOperations/ReadStreamEventsForwardOperation.php +++ b/src/ClientOperations/ReadStreamEventsForwardOperation.php @@ -122,10 +122,7 @@ protected function transformResponse(ProtobufMessage $response): StreamEventsSli foreach ($records as $record) { /** @var ResolvedIndexedEvent $record */ - $event = null; - $link = null; - - if ($record->getEvent()) { + if ($event = $record->getEvent()) { $event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent()); } diff --git a/src/EventStoreAsyncConnection.php b/src/EventStoreAsyncConnection.php index e8fa01e6..9515322f 100644 --- a/src/EventStoreAsyncConnection.php +++ b/src/EventStoreAsyncConnection.php @@ -179,7 +179,10 @@ public function subscribeToStreamAsync( ?UserCredentials $userCredentials = null ): Promise; - public function subscribeToStreamFrom( + /** + * @return Promise + */ + public function subscribeToStreamFromAsync( string $stream, ?int $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, @@ -187,7 +190,7 @@ public function subscribeToStreamFrom( ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreStreamCatchUpSubscription; + ): Promise; /** * @return Promise @@ -199,14 +202,17 @@ public function subscribeToAllAsync( ?UserCredentials $userCredentials = null ): Promise; - public function subscribeToAllFrom( + /** + * @return Promise + */ + public function subscribeToAllFromAsync( ?Position $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, EventAppearedOnCatchupSubscription $eventAppeared, ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreAllCatchUpSubscription; + ): Promise; /** * @return Promise diff --git a/src/Internal/EventStoreAllCatchUpSubscription.php b/src/Internal/EventStoreAllCatchUpSubscription.php index d12d6e61..f3dcebac 100644 --- a/src/Internal/EventStoreAllCatchUpSubscription.php +++ b/src/Internal/EventStoreAllCatchUpSubscription.php @@ -158,8 +158,6 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise yield ($this->eventAppeared)($this, $e); } catch (Throwable $ex) { $this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex); - - throw $ex; } $this->lastProcessedPosition = $e->originalPosition(); diff --git a/src/Internal/EventStoreAsyncNodeConnection.php b/src/Internal/EventStoreAsyncNodeConnection.php index 87271ea8..3ed93f1c 100644 --- a/src/Internal/EventStoreAsyncNodeConnection.php +++ b/src/Internal/EventStoreAsyncNodeConnection.php @@ -659,7 +659,7 @@ public function subscribeToStreamAsync( } /** {@inheritdoc} */ - public function subscribeToStreamFrom( + public function subscribeToStreamFromAsync( string $stream, ?int $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, @@ -667,7 +667,7 @@ public function subscribeToStreamFrom( ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreStreamCatchUpSubscription { + ): Promise { if (empty($stream)) { throw new InvalidArgumentException('Stream cannot be empty'); } @@ -680,7 +680,7 @@ public function subscribeToStreamFrom( $settings->verboseLogging(); } - $catchUpSubscription = new EventStoreStreamCatchUpSubscription( + return (new EventStoreStreamCatchUpSubscription( $this, $this->settings->log(), $stream, @@ -690,11 +690,7 @@ public function subscribeToStreamFrom( $liveProcessingStarted, $subscriptionDropped, $settings - ); - - $catchUpSubscription->startAsync(); - - return $catchUpSubscription; + ))->startAsync(); } /** {@inheritdoc} */ @@ -720,14 +716,14 @@ public function subscribeToAllAsync( return $deferred->promise(); } - public function subscribeToAllFrom( + public function subscribeToAllFromAsync( ?Position $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, EventAppearedOnCatchupSubscription $eventAppeared, ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreAllCatchUpSubscription { + ): Promise { if (null === $settings) { $settings = CatchUpSubscriptionSettings::default(); } @@ -736,7 +732,7 @@ public function subscribeToAllFrom( $settings->verboseLogging(); } - $catchUpSubscription = new EventStoreAllCatchUpSubscription( + return (new EventStoreAllCatchUpSubscription( $this, $this->settings->log(), $lastCheckpoint, @@ -745,11 +741,7 @@ public function subscribeToAllFrom( $liveProcessingStarted, $subscriptionDropped, $settings - ); - - $catchUpSubscription->startAsync(); - - return $catchUpSubscription; + ))->startAsync(); } /** {@inheritdoc} */ diff --git a/src/Internal/EventStoreCatchUpSubscription.php b/src/Internal/EventStoreCatchUpSubscription.php index 473bdebb..26f7c12b 100644 --- a/src/Internal/EventStoreCatchUpSubscription.php +++ b/src/Internal/EventStoreCatchUpSubscription.php @@ -41,7 +41,7 @@ abstract class EventStoreCatchUpSubscription { /** @var ResolvedEvent */ - private static $dropSubscriptionEvent; + private static $dropSubscriptionEvent; // @todo: this is stateful - here must be a bug !! /** @var bool */ private $isSubscribedToAll; @@ -265,7 +265,7 @@ private function loadHistoricalEventsAsync(): Promise $this->dropSubscription(SubscriptionDropReason::userInitiated(), null); } - return new Success(true); + return new Success($this); }); } diff --git a/src/Internal/EventStoreStreamCatchUpSubscription.php b/src/Internal/EventStoreStreamCatchUpSubscription.php index 75ed6548..bdcea419 100644 --- a/src/Internal/EventStoreStreamCatchUpSubscription.php +++ b/src/Internal/EventStoreStreamCatchUpSubscription.php @@ -177,8 +177,6 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise yield ($this->eventAppeared)($this, $e); } catch (Throwable $ex) { $this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex); - - throw $ex; } $this->lastProcessedEventNumber = $e->originalEventNumber(); diff --git a/tests/FakeEventStoreConnection.php b/tests/FakeEventStoreConnection.php index 0b28f061..575e77c3 100644 --- a/tests/FakeEventStoreConnection.php +++ b/tests/FakeEventStoreConnection.php @@ -30,9 +30,7 @@ use Prooph\EventStoreClient\EventStoreAsyncConnection; use Prooph\EventStoreClient\EventStoreAsyncTransaction; use Prooph\EventStoreClient\Internal\EventHandler; -use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription; use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription; -use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription; use Prooph\EventStoreClient\Internal\ListenerHandler; use Prooph\EventStoreClient\LiveProcessingStarted; use Prooph\EventStoreClient\PersistentSubscriptionDropped; @@ -265,7 +263,7 @@ public function subscribeToStreamAsync( ); } - public function subscribeToStreamFrom( + public function subscribeToStreamFromAsync( string $stream, ?int $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, @@ -273,7 +271,7 @@ public function subscribeToStreamFrom( ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreStreamCatchUpSubscription { + ): Promise { throw new \RuntimeException('Not implemented'); } @@ -291,14 +289,14 @@ public function subscribeToAllAsync( ); } - public function subscribeToAllFrom( + public function subscribeToAllFromAsync( ?Position $lastCheckpoint, ?CatchUpSubscriptionSettings $settings, EventAppearedOnCatchupSubscription $eventAppeared, ?LiveProcessingStarted $liveProcessingStarted = null, ?CatchUpSubscriptionDropped $subscriptionDropped = null, ?UserCredentials $userCredentials = null - ): EventStoreAllCatchUpSubscription { + ): Promise { throw new \RuntimeException('Not implemented'); } diff --git a/tests/Helper/TestEvent.php b/tests/Helper/TestEvent.php index 949f4c3d..e373ecd6 100644 --- a/tests/Helper/TestEvent.php +++ b/tests/Helper/TestEvent.php @@ -19,6 +19,10 @@ /** @internal */ class TestEvent { + private function __construct() + { + } + public static function new(?EventId $eventId = null, ?string $data = null, ?string $metadata = null): EventData { if (null === $eventId) { diff --git a/tests/catch_up_subscription_handles_errors.php b/tests/catch_up_subscription_handles_errors.php index 113ceea0..9790db21 100644 --- a/tests/catch_up_subscription_handles_errors.php +++ b/tests/catch_up_subscription_handles_errors.php @@ -394,20 +394,13 @@ function ($stream, $raise, $drop): Promise { $finalEvent->resolve(true); - $promise = Promise\timeout($promise, self::$timeoutMs); - - $promise->onResolve(function ($ex, $result): void { - $this->assertTrue($result); - }); - - yield new Success(); + $this->assertNotNull(yield Promise\timeout($promise, self::$timeoutMs)); })); } /** * @test * @throws Throwable - * @group by */ public function when_live_processing_and_disconnected_reconnect_keeps_events_ordered(): void { @@ -444,10 +437,7 @@ function ($stream, $raise, $drop) use (&$innerSubscriptionDrop, &$volatileEventS } ); - $result = yield Promise\timeout($this->subscription->startAsync(), self::$timeoutMs); - - $this->assertTrue($result); - + $this->assertNotNull(yield Promise\timeout($this->subscription->startAsync(), self::$timeoutMs)); $this->assertCount(0, $this->raisedEvents); $this->assertNotNull($innerSubscriptionDrop); diff --git a/tests/catchup_subscription_handles_small_batch_sizes.php b/tests/catchup_subscription_handles_small_batch_sizes.php index 4efe8c15..30cbfbd3 100644 --- a/tests/catchup_subscription_handles_small_batch_sizes.php +++ b/tests/catchup_subscription_handles_small_batch_sizes.php @@ -35,6 +35,8 @@ class catchup_subscription_handles_small_batch_sizes extends TestCase { + private const TIMEOUT = 10000; + /** @var string */ private $streamName = 'TestStream'; /** @var CatchUpSubscriptionSettings */ @@ -89,7 +91,7 @@ public function catchupSubscriptionToAllHandlesManyEventsWithSmallBatchSize(): v $deferred = new Deferred(); - $this->connection->subscribeToAllFrom( + yield $this->connection->subscribeToAllFromAsync( null, $this->settings, $this->eventAppearedResolver(), @@ -99,8 +101,7 @@ public function catchupSubscriptionToAllHandlesManyEventsWithSmallBatchSize(): v ); try { - // we wait maximum 5 minutes - $result = yield timeout($deferred->promise(), 5 * 60 * 1000); + $result = yield timeout($deferred->promise(), self::TIMEOUT); $this->assertTrue($result); } catch (TimeoutException $e) { @@ -122,7 +123,7 @@ public function catchupSubscriptionToStreamHandlesManyEventsWithSmallBatchSize() $deferred = new Deferred(); - $this->connection->subscribeToStreamFrom( + yield $this->connection->subscribeToStreamFromAsync( $this->streamName, null, $this->settings, @@ -133,8 +134,7 @@ public function catchupSubscriptionToStreamHandlesManyEventsWithSmallBatchSize() ); try { - // we wait maximum 5 minutes - $result = yield timeout($deferred->promise(), 5 * 60 * 1000); + $result = yield timeout($deferred->promise(), self::TIMEOUT); $this->assertTrue($result); } catch (TimeoutException $e) { diff --git a/tests/read_all_events_forward_with_linkto_to_deleted_event.php b/tests/read_all_events_forward_with_linkto_to_deleted_event.php new file mode 100644 index 00000000..c436b4d9 --- /dev/null +++ b/tests/read_all_events_forward_with_linkto_to_deleted_event.php @@ -0,0 +1,89 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient; + +use Amp\Success; +use Generator; +use PHPUnit\Framework\TestCase; +use Prooph\EventStoreClient\StreamEventsSlice; +use Throwable; + +class read_all_events_forward_with_linkto_to_deleted_event extends TestCase +{ + use SpecificationWithLinkToToDeletedEvents; + + /** @var StreamEventsSlice */ + private $read; + + protected function when(): Generator + { + $this->read = yield $this->conn->readStreamEventsForwardAsync( + $this->linkedStreamName, + 0, + 1 + ); + } + + /** + * @test + * @throws Throwable + */ + public function one_event_is_read(): void + { + $this->execute(function () { + $this->assertCount(1, $this->read->events()); + + yield new Success(); + }); + } + + /** + * @test + * @throws Throwable + */ + public function the_linked_event_is_not_resolved(): void + { + $this->execute(function () { + $this->assertNull($this->read->events()[0]->event()); + + yield new Success(); + }); + } + + /** + * @test + * @throws Throwable + */ + public function the_link_event_is_included(): void + { + $this->execute(function () { + $this->assertNotNull($this->read->events()[0]->originalEvent()); + + yield new Success(); + }); + } + + /** + * @test + * @throws Throwable + */ + public function the_event_is_not_resolved(): void + { + $this->execute(function () { + $this->assertFalse($this->read->events()[0]->isResolved()); + + yield new Success(); + }); + } +} diff --git a/tests/subscribe_to_all_catching_up_should.php b/tests/subscribe_to_all_catching_up_should.php new file mode 100644 index 00000000..c58d6b7d --- /dev/null +++ b/tests/subscribe_to_all_catching_up_should.php @@ -0,0 +1,292 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient; + +use Amp\Deferred; +use Amp\Delayed; +use Amp\Promise; +use Amp\Success; +use Amp\TimeoutException; +use Exception; +use Generator; +use PHPUnit\Framework\TestCase; +use Prooph\EventStoreClient\CatchUpSubscriptionDropped; +use Prooph\EventStoreClient\CatchUpSubscriptionSettings; +use Prooph\EventStoreClient\Common\SystemRoles; +use Prooph\EventStoreClient\Common\SystemStreams; +use Prooph\EventStoreClient\EventAppearedOnCatchupSubscription; +use Prooph\EventStoreClient\EventAppearedOnSubscription; +use Prooph\EventStoreClient\EventStoreAsyncConnection; +use Prooph\EventStoreClient\EventStoreSubscription; +use Prooph\EventStoreClient\ExpectedVersion; +use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription; +use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription; +use Prooph\EventStoreClient\Internal\ResolvedEvent; +use Prooph\EventStoreClient\StreamMetadata; +use Prooph\EventStoreClient\SubscriptionDropReason; +use Prooph\EventStoreClient\UserCredentials; +use ProophTest\EventStoreClient\Helper\TestConnection; +use ProophTest\EventStoreClient\Helper\TestEvent; +use Throwable; +use function Amp\call; + +class subscribe_to_all_catching_up_should extends TestCase +{ + private const TIMEOUT = 10000; + + /** @var EventStoreAsyncConnection */ + private $conn; + + /** + * @throws Throwable + */ + private function execute(callable $function): void + { + Promise\wait(call(function () use ($function): Generator { + $this->conn = TestConnection::createAsync(); + + yield $this->conn->connectAsync(); + + yield $this->conn->setStreamMetadataAsync( + '$all', + ExpectedVersion::ANY, + StreamMetadata::build()->setReadRoles(SystemRoles::ALL)->build(), + new UserCredentials(SystemUsers::ADMIN, SystemUsers::DEFAULT_ADMIN_PASSWORD) + ); + + yield from $function(); + + yield $this->conn->setStreamMetadataAsync( + '$all', + ExpectedVersion::ANY, + StreamMetadata::build()->build(), + new UserCredentials(SystemUsers::ADMIN, SystemUsers::DEFAULT_ADMIN_PASSWORD) + ); + + $this->conn->close(); + })); + } + + /** + * @test + * @throws Throwable + */ + public function call_dropped_callback_after_stop_method_call(): void + { + $this->execute(function () { + $store = TestConnection::createAsync(); + + yield $store->connectAsync(); + + $dropped = new Deferred(); + + /** @var EventStoreAllCatchUpSubscription $subscription */ + $subscription = yield $store->subscribeToAllFromAsync( + null, + CatchUpSubscriptionSettings::default(), + new class() implements EventAppearedOnCatchupSubscription { + public function __invoke( + EventStoreCatchUpSubscription $subscription, + ResolvedEvent $resolvedEvent + ): Promise { + return new Success(); + } + }, + null, + new class($dropped) implements CatchUpSubscriptionDropped { + /** @var Deferred */ + private $dropped; + + public function __construct(Deferred $dropped) + { + $this->dropped = $dropped; + } + + public function __invoke( + EventStoreCatchUpSubscription $subscription, + SubscriptionDropReason $reason, + ?Throwable $exception = null + ): void { + $this->dropped->resolve(true); + } + } + ); + + $subscription->stopWithTimeout(self::TIMEOUT); + + $this->assertTrue(yield Promise\timeout($dropped->promise(), self::TIMEOUT)); + }); + } + + /** + * @test + * @throws Throwable + */ + public function call_dropped_callback_when_an_error_occurs_while_processing_an_event(): void + { + $this->execute(function () { + $stream = 'all_call_dropped_callback_when_an_error_occurs_while_processing_an_event'; + + $store = TestConnection::createAsync(); + + yield $store->connectAsync(); + + yield $store->appendToStreamAsync( + $stream, + ExpectedVersion::ANY, + [TestEvent::new()] + ); + + $dropped = new Deferred(); + + /** @var EventStoreAllCatchUpSubscription $subscription */ + $subscription = yield $store->subscribeToAllFromAsync( + null, + CatchUpSubscriptionSettings::default(), + new class() implements EventAppearedOnCatchupSubscription { + public function __invoke( + EventStoreCatchUpSubscription $subscription, + ResolvedEvent $resolvedEvent + ): Promise { + throw new Exception('Error'); + } + }, + null, + new class($dropped) implements CatchUpSubscriptionDropped { + /** @var Deferred */ + private $dropped; + + public function __construct(Deferred $dropped) + { + $this->dropped = $dropped; + } + + public function __invoke( + EventStoreCatchUpSubscription $subscription, + SubscriptionDropReason $reason, + ?Throwable $exception = null + ): void { + $this->dropped->resolve(true); + } + } + ); + + $subscription->stopWithTimeout(self::TIMEOUT); + + $this->assertTrue(yield Promise\timeout($dropped->promise(), self::TIMEOUT)); + }); + } + + /** + * @test + * @throws Throwable + */ + public function be_able_to_subscribe_to_empty_db(): void + { + $this->execute(function () { + $store = TestConnection::createAsync(); + + yield $store->connectAsync(); + + $appeared = new Deferred(); + $dropped = new Deferred(); + + /** @var EventStoreAllCatchUpSubscription $subscription */ + $subscription = yield $store->subscribeToAllFromAsync( + null, + CatchUpSubscriptionSettings::default(), + new class($appeared) implements EventAppearedOnCatchupSubscription { + /** @var Deferred */ + private $appeared; + + public function __construct(Deferred $appeared) + { + $this->appeared = $appeared; + } + + public function __invoke( + EventStoreCatchUpSubscription $subscription, + ResolvedEvent $resolvedEvent + ): Promise { + if (! SystemStreams::isSystemStream($resolvedEvent->originalEvent()->eventStreamId())) { + $this->appeared->resolve(true); + } + + return new Success(); + } + }, + null, + new class($dropped) implements CatchUpSubscriptionDropped { + /** @var Deferred */ + private $dropped; + + public function __construct(Deferred $dropped) + { + $this->dropped = $dropped; + } + + public function __invoke( + EventStoreCatchUpSubscription $subscription, + SubscriptionDropReason $reason, + ?Throwable $exception = null + ): void { + $this->dropped->resolve(true); + } + } + ); + + yield new Delayed(5000); // give time for first pull phase + + yield $store->subscribeToAllAsync( + false, + new class($appeared) implements EventAppearedOnSubscription { + /** @var Deferred */ + private $appeared; + + public function __construct(Deferred $appeared) + { + $this->appeared = $appeared; + } + + public function __invoke( + EventStoreSubscription $subscription, + ResolvedEvent $resolvedEvent + ): Promise { + return new Success(); + } + } + ); + + yield new Delayed(5000); + + try { + yield Promise\timeout($appeared->promise(), 0); + } catch (TimeoutException $e) { + $this->fail('Some event appeared'); + } + + try { + yield Promise\timeout($dropped->promise(), 0); + } catch (TimeoutException $e) { + $this->fail('Subscription was dropped prematurely'); + } + + $subscription->stopWithTimeout(self::TIMEOUT); + + $this->assertTrue(yield Promise\timeout($dropped->promise(), self::TIMEOUT)); + }); + } + + // @todo: 3 tests missing +}