Skip to content

Commit

Permalink
check event appeared callback returns a promise
Browse files Browse the repository at this point in the history
resolves #3
  • Loading branch information
prolic committed Aug 1, 2018
1 parent 3960a7c commit 5ce9963
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
8 changes: 7 additions & 1 deletion src/Internal/AbstractEventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,13 @@ private function processQueue(): Promise
}

try {
yield ($this->eventAppeared)($this, $e->event(), $e->retryCount());
$promise = ($this->eventAppeared)($this, $e->event(), $e->retryCount());

if (! $promise instanceof Promise) {
throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class);
}

yield $promise;

if ($this->autoAck) {
$this->subscription->notifyEventsProcessed([$e->originalEvent()->eventId()]);
Expand Down
9 changes: 8 additions & 1 deletion src/Internal/EventStoreAllCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Prooph\EventStoreClient\AllEventsSlice;
use Prooph\EventStoreClient\CatchUpSubscriptionSettings;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\Exception\RuntimeException;
use Prooph\EventStoreClient\Position;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SubscriptionDropReason;
Expand Down Expand Up @@ -162,7 +163,13 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise

if ($e->originalPosition()->greater($this->lastProcessedPosition)) {
try {
yield ($this->eventAppeared)($this, $e);
$promise = ($this->eventAppeared)($this, $e);

if (! $promise instanceof Promise) {
throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class);
}

yield $promise;
} catch (\Throwable $ex) {
$this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex);

Expand Down
9 changes: 8 additions & 1 deletion src/Internal/EventStoreStreamCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Generator;
use Prooph\EventStoreClient\CatchUpSubscriptionSettings;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\Exception\RuntimeException;
use Prooph\EventStoreClient\Exception\StreamDeletedException;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SliceReadStatus;
Expand Down Expand Up @@ -173,7 +174,13 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise

if ($e->originalEventNumber() > $this->lastProcessedEventNumber) {
try {
yield ($this->eventAppeared)($this, $e);
$promise = ($this->eventAppeared)($this, $e);

if (! $promise instanceof Promise) {
throw new RuntimeException('Event appeared callback needs to return an ' . Promise::class);
}

yield $promise;
} catch (\Throwable $ex) {
$this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex);

Expand Down

0 comments on commit 5ce9963

Please sign in to comment.