Skip to content

Commit

Permalink
Merge 73ccf80 into 006f853
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Oct 23, 2018
2 parents 006f853 + 73ccf80 commit 0f1ffea
Show file tree
Hide file tree
Showing 17 changed files with 427 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/demo-subscription-with-logger.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

yield $connection->connectAsync();

$connection->subscribeToStreamFrom(
yield $connection->subscribeToStreamFromAsync(
'foo-bar',
null,
CatchUpSubscriptionSettings::default(),
Expand Down
2 changes: 1 addition & 1 deletion examples/demo-subscription1.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

yield $connection->connectAsync();

$connection->subscribeToStreamFrom(
yield $connection->subscribeToStreamFromAsync(
'foo-bar',
null,
CatchUpSubscriptionSettings::default(),
Expand Down
2 changes: 1 addition & 1 deletion examples/demo-subscription2.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

yield $connection->connectAsync();

$connection->subscribeToAllFrom(
yield $connection->subscribeToAllFromAsync(
null,
CatchUpSubscriptionSettings::default(),
new class() implements EventAppearedOnCatchupSubscription {
Expand Down
5 changes: 3 additions & 2 deletions src/ClientOperations/ReadAllEventsBackwardOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice

foreach ($records as $record) {
/** @var ResolvedIndexedEvent $record */
$event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent());
$link = null;
if ($event = $record->getEvent()) {
$event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent());
}

if ($link = $record->getLink()) {
$link = EventMessageConverter::convertEventRecordMessageToEventRecord($link);
Expand Down
5 changes: 3 additions & 2 deletions src/ClientOperations/ReadAllEventsForwardOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice

foreach ($records as $record) {
/** @var ResolvedIndexedEvent $record */
$event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent());
$link = null;
if ($event = $record->getEvent()) {
$event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent());
}

if ($link = $record->getLink()) {
$link = EventMessageConverter::convertEventRecordMessageToEventRecord($link);
Expand Down
5 changes: 1 addition & 4 deletions src/ClientOperations/ReadStreamEventsForwardOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ protected function transformResponse(ProtobufMessage $response): StreamEventsSli

foreach ($records as $record) {
/** @var ResolvedIndexedEvent $record */
$event = null;
$link = null;

if ($record->getEvent()) {
if ($event = $record->getEvent()) {
$event = EventMessageConverter::convertEventRecordMessageToEventRecord($record->getEvent());
}

Expand Down
14 changes: 10 additions & 4 deletions src/EventStoreAsyncConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,18 @@ public function subscribeToStreamAsync(
?UserCredentials $userCredentials = null
): Promise;

public function subscribeToStreamFrom(
/**
* @return Promise<EventStoreStreamCatchUpSubscription>
*/
public function subscribeToStreamFromAsync(
string $stream,
?int $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreStreamCatchUpSubscription;
): Promise;

/**
* @return Promise<EventStoreSubscription>
Expand All @@ -199,14 +202,17 @@ public function subscribeToAllAsync(
?UserCredentials $userCredentials = null
): Promise;

public function subscribeToAllFrom(
/**
* @return Promise<EventStoreAllCatchUpSubscription>
*/
public function subscribeToAllFromAsync(
?Position $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreAllCatchUpSubscription;
): Promise;

/**
* @return Promise<AbstractEventStorePersistentSubscription>
Expand Down
2 changes: 0 additions & 2 deletions src/Internal/EventStoreAllCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise
yield ($this->eventAppeared)($this, $e);
} catch (Throwable $ex) {
$this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex);

throw $ex;
}

$this->lastProcessedPosition = $e->originalPosition();
Expand Down
24 changes: 8 additions & 16 deletions src/Internal/EventStoreAsyncNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,15 @@ public function subscribeToStreamAsync(
}

/** {@inheritdoc} */
public function subscribeToStreamFrom(
public function subscribeToStreamFromAsync(
string $stream,
?int $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreStreamCatchUpSubscription {
): Promise {
if (empty($stream)) {
throw new InvalidArgumentException('Stream cannot be empty');
}
Expand All @@ -680,7 +680,7 @@ public function subscribeToStreamFrom(
$settings->verboseLogging();
}

$catchUpSubscription = new EventStoreStreamCatchUpSubscription(
return (new EventStoreStreamCatchUpSubscription(
$this,
$this->settings->log(),
$stream,
Expand All @@ -690,11 +690,7 @@ public function subscribeToStreamFrom(
$liveProcessingStarted,
$subscriptionDropped,
$settings
);

$catchUpSubscription->startAsync();

return $catchUpSubscription;
))->startAsync();
}

/** {@inheritdoc} */
Expand All @@ -720,14 +716,14 @@ public function subscribeToAllAsync(
return $deferred->promise();
}

public function subscribeToAllFrom(
public function subscribeToAllFromAsync(
?Position $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreAllCatchUpSubscription {
): Promise {
if (null === $settings) {
$settings = CatchUpSubscriptionSettings::default();
}
Expand All @@ -736,7 +732,7 @@ public function subscribeToAllFrom(
$settings->verboseLogging();
}

$catchUpSubscription = new EventStoreAllCatchUpSubscription(
return (new EventStoreAllCatchUpSubscription(
$this,
$this->settings->log(),
$lastCheckpoint,
Expand All @@ -745,11 +741,7 @@ public function subscribeToAllFrom(
$liveProcessingStarted,
$subscriptionDropped,
$settings
);

$catchUpSubscription->startAsync();

return $catchUpSubscription;
))->startAsync();
}

/** {@inheritdoc} */
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
abstract class EventStoreCatchUpSubscription
{
/** @var ResolvedEvent */
private static $dropSubscriptionEvent;
private static $dropSubscriptionEvent; // @todo: this is stateful - here must be a bug !!

/** @var bool */
private $isSubscribedToAll;
Expand Down Expand Up @@ -265,7 +265,7 @@ private function loadHistoricalEventsAsync(): Promise
$this->dropSubscription(SubscriptionDropReason::userInitiated(), null);
}

return new Success(true);
return new Success($this);
});
}

Expand Down
2 changes: 0 additions & 2 deletions src/Internal/EventStoreStreamCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ protected function tryProcessAsync(ResolvedEvent $e): Promise
yield ($this->eventAppeared)($this, $e);
} catch (Throwable $ex) {
$this->dropSubscription(SubscriptionDropReason::eventHandlerException(), $ex);

throw $ex;
}

$this->lastProcessedEventNumber = $e->originalEventNumber();
Expand Down
10 changes: 4 additions & 6 deletions tests/FakeEventStoreConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\EventStoreAsyncTransaction;
use Prooph\EventStoreClient\Internal\EventHandler;
use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ListenerHandler;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\PersistentSubscriptionDropped;
Expand Down Expand Up @@ -265,15 +263,15 @@ public function subscribeToStreamAsync(
);
}

public function subscribeToStreamFrom(
public function subscribeToStreamFromAsync(
string $stream,
?int $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreStreamCatchUpSubscription {
): Promise {
throw new \RuntimeException('Not implemented');
}

Expand All @@ -291,14 +289,14 @@ public function subscribeToAllAsync(
);
}

public function subscribeToAllFrom(
public function subscribeToAllFromAsync(
?Position $lastCheckpoint,
?CatchUpSubscriptionSettings $settings,
EventAppearedOnCatchupSubscription $eventAppeared,
?LiveProcessingStarted $liveProcessingStarted = null,
?CatchUpSubscriptionDropped $subscriptionDropped = null,
?UserCredentials $userCredentials = null
): EventStoreAllCatchUpSubscription {
): Promise {
throw new \RuntimeException('Not implemented');
}

Expand Down
4 changes: 4 additions & 0 deletions tests/Helper/TestEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
/** @internal */
class TestEvent
{
private function __construct()
{
}

public static function new(?EventId $eventId = null, ?string $data = null, ?string $metadata = null): EventData
{
if (null === $eventId) {
Expand Down
14 changes: 2 additions & 12 deletions tests/catch_up_subscription_handles_errors.php
Original file line number Diff line number Diff line change
Expand Up @@ -394,20 +394,13 @@ function ($stream, $raise, $drop): Promise {

$finalEvent->resolve(true);

$promise = Promise\timeout($promise, self::$timeoutMs);

$promise->onResolve(function ($ex, $result): void {
$this->assertTrue($result);
});

yield new Success();
$this->assertNotNull(yield Promise\timeout($promise, self::$timeoutMs));
}));
}

/**
* @test
* @throws Throwable
* @group by
*/
public function when_live_processing_and_disconnected_reconnect_keeps_events_ordered(): void
{
Expand Down Expand Up @@ -444,10 +437,7 @@ function ($stream, $raise, $drop) use (&$innerSubscriptionDrop, &$volatileEventS
}
);

$result = yield Promise\timeout($this->subscription->startAsync(), self::$timeoutMs);

$this->assertTrue($result);

$this->assertNotNull(yield Promise\timeout($this->subscription->startAsync(), self::$timeoutMs));
$this->assertCount(0, $this->raisedEvents);
$this->assertNotNull($innerSubscriptionDrop);

Expand Down
12 changes: 6 additions & 6 deletions tests/catchup_subscription_handles_small_batch_sizes.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

class catchup_subscription_handles_small_batch_sizes extends TestCase
{
private const TIMEOUT = 10000;

/** @var string */
private $streamName = 'TestStream';
/** @var CatchUpSubscriptionSettings */
Expand Down Expand Up @@ -89,7 +91,7 @@ public function catchupSubscriptionToAllHandlesManyEventsWithSmallBatchSize(): v

$deferred = new Deferred();

$this->connection->subscribeToAllFrom(
yield $this->connection->subscribeToAllFromAsync(
null,
$this->settings,
$this->eventAppearedResolver(),
Expand All @@ -99,8 +101,7 @@ public function catchupSubscriptionToAllHandlesManyEventsWithSmallBatchSize(): v
);

try {
// we wait maximum 5 minutes
$result = yield timeout($deferred->promise(), 5 * 60 * 1000);
$result = yield timeout($deferred->promise(), self::TIMEOUT);

$this->assertTrue($result);
} catch (TimeoutException $e) {
Expand All @@ -122,7 +123,7 @@ public function catchupSubscriptionToStreamHandlesManyEventsWithSmallBatchSize()

$deferred = new Deferred();

$this->connection->subscribeToStreamFrom(
yield $this->connection->subscribeToStreamFromAsync(
$this->streamName,
null,
$this->settings,
Expand All @@ -133,8 +134,7 @@ public function catchupSubscriptionToStreamHandlesManyEventsWithSmallBatchSize()
);

try {
// we wait maximum 5 minutes
$result = yield timeout($deferred->promise(), 5 * 60 * 1000);
$result = yield timeout($deferred->promise(), self::TIMEOUT);

$this->assertTrue($result);
} catch (TimeoutException $e) {
Expand Down

0 comments on commit 0f1ffea

Please sign in to comment.