From 5ce99631178e35a8ff69457ff4d214248f496dc9 Mon Sep 17 00:00:00 2001 From: prolic Date: Wed, 1 Aug 2018 21:28:03 +0800 Subject: [PATCH] check event appeared callback returns a promise resolves https://github.com/prooph/event-store-client/issues/3 --- .../AbstractEventStorePersistentSubscription.php | 8 +++++++- src/Internal/EventStoreAllCatchUpSubscription.php | 9 ++++++++- src/Internal/EventStoreStreamCatchUpSubscription.php | 9 ++++++++- 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Internal/AbstractEventStorePersistentSubscription.php b/src/Internal/AbstractEventStorePersistentSubscription.php index 0975e79d..2be784e1 100644 --- a/src/Internal/AbstractEventStorePersistentSubscription.php +++ b/src/Internal/AbstractEventStorePersistentSubscription.php @@ -367,7 +367,13 @@ private function processQueue(): Promise } try { - yield ($this->eventAppeared)($this, $e->event(), $e->retryCount()); + $promise = ($this->eventAppeared)($this, $e->event(), $e->retryCount()); + + if (! $promise instanceof Promise) { + throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class); + } + + yield $promise; if ($this->autoAck) { $this->subscription->notifyEventsProcessed([$e->originalEvent()->eventId()]); diff --git a/src/Internal/EventStoreAllCatchUpSubscription.php b/src/Internal/EventStoreAllCatchUpSubscription.php index 9b4da1a5..a648a83f 100644 --- a/src/Internal/EventStoreAllCatchUpSubscription.php +++ b/src/Internal/EventStoreAllCatchUpSubscription.php @@ -17,6 +17,7 @@ use Prooph\EventStoreClient\AllEventsSlice; use Prooph\EventStoreClient\CatchUpSubscriptionSettings; use Prooph\EventStoreClient\EventStoreAsyncConnection; +use Prooph\EventStoreClient\Exception\RuntimeException; use Prooph\EventStoreClient\Position; use Prooph\EventStoreClient\ResolvedEvent; use Prooph\EventStoreClient\SubscriptionDropReason; @@ -162,7 +163,13 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise if ($e->originalPosition()->greater($this->lastProcessedPosition)) { try { - yield ($this->eventAppeared)($this, $e); + $promise = ($this->eventAppeared)($this, $e); + + if (! $promise instanceof Promise) { + throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class); + } + + yield $promise; } catch (\Throwable $ex) { $this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex); diff --git a/src/Internal/EventStoreStreamCatchUpSubscription.php b/src/Internal/EventStoreStreamCatchUpSubscription.php index e942a645..c784cf0b 100644 --- a/src/Internal/EventStoreStreamCatchUpSubscription.php +++ b/src/Internal/EventStoreStreamCatchUpSubscription.php @@ -17,6 +17,7 @@ use Generator; use Prooph\EventStoreClient\CatchUpSubscriptionSettings; use Prooph\EventStoreClient\EventStoreAsyncConnection; +use Prooph\EventStoreClient\Exception\RuntimeException; use Prooph\EventStoreClient\Exception\StreamDeletedException; use Prooph\EventStoreClient\ResolvedEvent; use Prooph\EventStoreClient\SliceReadStatus; @@ -173,7 +174,13 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise if ($e->originalEventNumber() > $this->lastProcessedEventNumber) { try { - yield ($this->eventAppeared)($this, $e); + $promise = ($this->eventAppeared)($this, $e); + + if (! $promise instanceof Promise) { + throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class); + } + + yield $promise; } catch (\Throwable $ex) { $this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex);