Skip to content

Commit

Permalink
refactor event appeared callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Dec 8, 2018
1 parent b86c2bf commit ad73d73
Show file tree
Hide file tree
Showing 46 changed files with 54 additions and 105 deletions.
1 change: 0 additions & 1 deletion examples/demo-persistent-subscription-with-dns-cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use Prooph\EventStoreClient\Exception\InvalidOperationException;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\PersistentSubscriptionCreateResult;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Throwable;

require __DIR__ . '/../vendor/autoload.php';
Expand Down
1 change: 0 additions & 1 deletion examples/demo-persistent-subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use Prooph\EventStoreClient\Exception\InvalidOperationException;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\PersistentSubscriptionCreateResult;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Throwable;

require __DIR__ . '/../vendor/autoload.php';
Expand Down
1 change: 0 additions & 1 deletion examples/demo-subscribe-to-all-from.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Amp\Promise;
use Amp\Success;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Throwable;

require __DIR__ . '/../vendor/autoload.php';
Expand Down
1 change: 0 additions & 1 deletion examples/demo-subscribe-to-all.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Internal\VolatileEventStoreSubscription;
use Throwable;

Expand Down
1 change: 0 additions & 1 deletion examples/demo-subscribe-to-stream-from-with-logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Amp\Promise;
use Amp\Success;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Throwable;

require __DIR__ . '/../vendor/autoload.php';
Expand Down
1 change: 0 additions & 1 deletion examples/demo-subscribe-to-stream-from.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Amp\Promise;
use Amp\Success;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Throwable;

require __DIR__ . '/../vendor/autoload.php';
Expand Down
1 change: 0 additions & 1 deletion examples/demo-subscribe-to-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Internal\VolatileEventStoreSubscription;
use Throwable;

Expand Down
10 changes: 4 additions & 6 deletions src/ClientOperations/AbstractSubscriptionOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use Amp\Success;
use Generator;
use Prooph\EventStoreClient\EndPoint;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Exception\AccessDeniedException;
use Prooph\EventStoreClient\Exception\ConnectionClosedException;
Expand All @@ -34,7 +33,6 @@
use Prooph\EventStoreClient\Messages\ClientMessages\SubscriptionDropped as SubscriptionDroppedMessage;
use Prooph\EventStoreClient\Messages\ClientMessages\SubscriptionDropped_SubscriptionDropReason as SubscriptionDropReasonMessage;
use Prooph\EventStoreClient\Messages\ClientMessages\UnsubscribeFromStream;
use Prooph\EventStoreClient\SubscriptionDropped;
use Prooph\EventStoreClient\SubscriptionDropReason;
use Prooph\EventStoreClient\SystemData\InspectionDecision;
use Prooph\EventStoreClient\SystemData\InspectionResult;
Expand All @@ -61,9 +59,9 @@ abstract class AbstractSubscriptionOperation implements SubscriptionOperation
protected $resolveLinkTos;
/** @var UserCredentials|null */
protected $userCredentials;
/** @var EventAppearedOnSubscription */
/** @var callable */
protected $eventAppeared;
/** @var SubscriptionDropped|null */
/** @var callable|null */
private $subscriptionDropped;
/** @var bool */
private $verboseLogging;
Expand All @@ -86,8 +84,8 @@ public function __construct(
string $streamId,
bool $resolveLinkTos,
?UserCredentials $userCredentials,
EventAppearedOnSubscription $eventAppeared,
?SubscriptionDropped $subscriptionDropped,
callable $eventAppeared,
?callable $subscriptionDropped,
bool $verboseLogging,
callable $getConnection
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
namespace Prooph\EventStoreClient\ClientOperations;

use Amp\Deferred;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Exception\AccessDeniedException;
Expand Down Expand Up @@ -60,8 +59,8 @@ public function __construct(
int $bufferSize,
string $streamId,
?UserCredentials $userCredentials,
EventAppearedOnSubscription $eventAppeared,
?SubscriptionDropped $subscriptionDropped,
callable $eventAppeared,
?callable $subscriptionDropped,
bool $verboseLogging,
callable $getConnection
) {
Expand Down
1 change: 0 additions & 1 deletion src/EventAppearedOnCatchupSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use Amp\Promise;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;

interface EventAppearedOnCatchupSubscription
{
Expand Down
1 change: 0 additions & 1 deletion src/EventAppearedOnPersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

use Amp\Promise;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;

interface EventAppearedOnPersistentSubscription
{
Expand Down
1 change: 0 additions & 1 deletion src/EventAppearedOnSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
namespace Prooph\EventStoreClient;

use Amp\Promise;
use Prooph\EventStoreClient\Internal\ResolvedEvent;

interface EventAppearedOnSubscription
{
Expand Down
3 changes: 1 addition & 2 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventStoreConnection;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent as InternalResolvedEvent;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SubscriptionDropped;
Expand Down Expand Up @@ -287,7 +286,7 @@ public function __construct(callable $callback)

public function __invoke(
EventStoreSubscription $subscription,
InternalResolvedEvent $resolvedEvent
ResolvedEvent $resolvedEvent
): Promise {
return ($this->callback)($subscription, $resolvedEvent);
}
Expand Down
53 changes: 9 additions & 44 deletions src/Internal/EventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@
use Generator;
use Prooph\EventStoreClient\ConnectionSettings;
use Prooph\EventStoreClient\EventAppearedOnPersistentSubscription;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Exception\RuntimeException;
use Prooph\EventStoreClient\Internal\Message\StartPersistentSubscriptionMessage;
use Prooph\EventStoreClient\Internal\ResolvedEvent as InternalResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionDropped;
use Prooph\EventStoreClient\PersistentSubscriptionNakEventAction;
use Prooph\EventStoreClient\PersistentSubscriptionResolvedEvent;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SubscriptionDropped;
use Prooph\EventStoreClient\SubscriptionDropReason;
use Prooph\EventStoreClient\UserCredentials;
use Psr\Log\LoggerInterface as Logger;
Expand All @@ -53,9 +50,9 @@ class EventStorePersistentSubscription
private $subscriptionId;
/** @var string */
private $streamId;
/** @var EventAppearedOnPersistentSubscription */
/** @var callable */
private $eventAppeared;
/** @var PersistentSubscriptionDropped|null */
/** @var callable|null */
private $subscriptionDropped;
/** @var UserCredentials|null */
private $userCredentials;
Expand Down Expand Up @@ -117,13 +114,14 @@ public function __construct(
$this->handler = $handler;
}

/** @internal */
public function startSubscription(
string $subscriptionId,
string $streamId,
int $bufferSize,
?UserCredentials $userCredentials,
EventAppearedOnSubscription $onEventAppeared,
?SubscriptionDropped $onSubscriptionDropped,
callable $onEventAppeared,
?callable $onSubscriptionDropped,
ConnectionSettings $settings
): Promise {
$deferred = new Deferred();
Expand Down Expand Up @@ -152,54 +150,21 @@ public function start(): Promise
{
$this->stopped->reset();

$eventAppearedCallback = function (
EventStoreSubscription $subscription,
$eventAppeared = function (
PersistentEventStoreSubscription $subscription,
PersistentSubscriptionResolvedEvent $resolvedEvent
): Promise {
return $this->onEventAppeared($resolvedEvent);
};

$subscriptionDroppedCallback = function (
EventStoreSubscription $subscription,
$subscriptionDropped = function (
PersistentEventStoreSubscription $subscription,
SubscriptionDropReason $reason,
?Throwable $exception
): void {
$this->onSubscriptionDropped($reason, $exception);
};

$eventAppeared = new class($eventAppearedCallback) implements EventAppearedOnSubscription {
private $callback;

public function __construct(callable $callback)
{
$this->callback = $callback;
}

public function __invoke(
EventStoreSubscription $subscription,
InternalResolvedEvent $resolvedEvent
): Promise {
return ($this->callback)($subscription, $resolvedEvent);
}
};

$subscriptionDropped = new class($subscriptionDroppedCallback) implements SubscriptionDropped {
private $callback;

public function __construct(callable $callback)
{
$this->callback = $callback;
}

public function __invoke(
EventStoreSubscription $subscription,
SubscriptionDropReason $reason,
?Throwable $exception = null
): void {
($this->callback)($subscription, $reason, $exception);
}
};

$promise = $this->startSubscription(
$this->subscriptionId,
$this->streamId,
Expand Down
14 changes: 6 additions & 8 deletions src/Internal/Message/StartPersistentSubscriptionMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
namespace Prooph\EventStoreClient\Internal\Message;

use Amp\Deferred;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\SubscriptionDropped;
use Prooph\EventStoreClient\UserCredentials;

/** @internal */
Expand All @@ -31,9 +29,9 @@ class StartPersistentSubscriptionMessage implements Message
private $bufferSize;
/** @var UserCredentials|null */
private $userCredentials;
/** @var EventAppearedOnSubscription */
/** @var callable */
private $eventAppeared;
/** @var SubscriptionDropped|null */
/** @var callable|null */
private $subscriptionDropped;
/** @var int */
private $maxRetries;
Expand All @@ -46,8 +44,8 @@ public function __construct(
string $streamId,
int $bufferSize,
?UserCredentials $userCredentials,
EventAppearedOnSubscription $eventAppeared,
?SubscriptionDropped $subscriptionDropped,
callable $eventAppeared,
?callable $subscriptionDropped,
int $maxRetries,
int $timeout
) {
Expand Down Expand Up @@ -87,12 +85,12 @@ public function userCredentials(): ?UserCredentials
return $this->userCredentials;
}

public function eventAppeared(): EventAppearedOnSubscription
public function eventAppeared(): callable
{
return $this->eventAppeared;
}

public function subscriptionDropped(): ?SubscriptionDropped
public function subscriptionDropped(): ?callable
{
return $this->subscriptionDropped;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
use Prooph\EventStoreClient\Exception\InvalidArgumentException;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionNakEventAction;
use Prooph\EventStoreClient\PersistentSubscriptions\PersistentSubscriptionDetails;
use Prooph\EventStoreClient\PersistentSubscriptions\PersistentSubscriptionsManager;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\Util\Guid;
use ProophTest\EventStoreClient\CountdownEvent;
use ProophTest\EventStoreClient\DefaultData;
Expand Down Expand Up @@ -259,7 +259,7 @@ public function __invoke(

$this->sub->stop();

$this->manager->replayParkedMessages($this->stream, 'existing');
yield $this->manager->replayParkedMessages($this->stream, 'existing', DefaultData::adminCredentials());

$event = new CountdownEvent(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionDropped;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SubscriptionDropReason;
use Prooph\EventStoreClient\Util\Guid;
use Throwable;
Expand Down
2 changes: 1 addition & 1 deletion tests/catch_up_subscription_handles_errors.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Internal\VolatileEventStoreSubscription;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\ReadDirection;
use Prooph\EventStoreClient\RecordedEvent;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\SliceReadStatus;
use Prooph\EventStoreClient\StreamEventsSlice;
use Prooph\EventStoreClient\SubscriptionDropped;
Expand Down
2 changes: 1 addition & 1 deletion tests/catchup_subscription_handles_small_batch_sizes.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
use Prooph\EventStoreClient\EventStoreConnection;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\ResolvedEvent;
use ProophTest\EventStoreClient\Helper\TestConnection;
use Throwable;
use function Amp\call;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
use Prooph\EventStoreClient\EventAppearedOnPersistentSubscription;
use Prooph\EventStoreClient\Exception\MaximumSubscribersReachedException;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\Util\Guid;
use Throwable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
use PHPUnit\Framework\TestCase;
use Prooph\EventStoreClient\EventAppearedOnPersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\Util\Guid;
use Throwable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Prooph\EventStoreClient\ResolvedEvent;
use Prooph\EventStoreClient\Util\Guid;
use Throwable;
use function Amp\call;
Expand Down

0 comments on commit ad73d73

Please sign in to comment.