Skip to content

Commit

Permalink
add test subscribe_to_all_catching_up_should::read_all_existing_event…
Browse files Browse the repository at this point in the history
…s_and_keep_listening_to_new_ones

bugfixes
  • Loading branch information
prolic committed Oct 24, 2018
1 parent b45ed09 commit 018c944
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/ClientOperations/ReadAllEventsBackwardOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice
$link = EventMessageConverter::convertEventRecordMessageToEventRecord($link);
}

$resolvedEvents[] = new ResolvedEvent($event, $link, null);
$resolvedEvents[] = new ResolvedEvent($event, $link, new Position($record->getCommitPosition(), $record->getPreparePosition()));
}

return new AllEventsSlice(
Expand Down
2 changes: 1 addition & 1 deletion src/ClientOperations/ReadAllEventsForwardOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected function transformResponse(ProtobufMessage $response): AllEventsSlice
$link = EventMessageConverter::convertEventRecordMessageToEventRecord($link);
}

$resolvedEvents[] = new ResolvedEvent($event, $link, new Position($response->getCommitPosition(), $response->getPreparePosition()));
$resolvedEvents[] = new ResolvedEvent($event, $link, new Position($record->getCommitPosition(), $record->getPreparePosition()));
}

return new AllEventsSlice(
Expand Down
14 changes: 4 additions & 10 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ private function loadHistoricalEventsAsync(): Promise
yield $this->subscribeToStreamAsync();
} catch (Throwable $ex) {
$this->dropSubscription(SubscriptionDropReason::catchUpError(), $ex);

throw $ex;
}
} else {
Expand Down Expand Up @@ -468,7 +469,6 @@ private function ensureProcessingPushQueue(): void
private function processLiveQueueAsync(): Promise
{
return call(function (): Generator {
$this->isProcessing = true;
do {
/** @var ResolvedEvent $e */
while (! $this->liveQueue->isEmpty()) {
Expand All @@ -478,9 +478,7 @@ private function processLiveQueueAsync(): Promise
$this->dropData = $this->dropData ?? new DropData(SubscriptionDropReason::unknown(), new \Exception('Drop reason not specified'));
$this->dropSubscription($this->dropData->reason(), $this->dropData->error());

if ($this->isProcessing) {
$this->isProcessing = false;
}
$this->isProcessing = false;

return null;
}
Expand All @@ -500,13 +498,9 @@ private function processLiveQueueAsync(): Promise
return null;
}
}
} while ($this->liveQueue->count() > 0);

if ($this->isProcessing) {
$this->isProcessing = false;
}
} while ($this->liveQueue->count() > 0 && ! $this->isProcessing);

$this->isProcessing = true;
$this->isProcessing = false;
});
}

Expand Down
130 changes: 129 additions & 1 deletion tests/subscribe_to_all_catching_up_should.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
use Exception;
use Generator;
use PHPUnit\Framework\TestCase;
use Prooph\EventStoreClient\AllEventsSlice;
use Prooph\EventStoreClient\CatchUpSubscriptionDropped;
use Prooph\EventStoreClient\CatchUpSubscriptionSettings;
use Prooph\EventStoreClient\Common\SystemRoles;
use Prooph\EventStoreClient\Common\SystemStreams;
use Prooph\EventStoreClient\EventAppearedOnCatchupSubscription;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventData;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription;
use Prooph\EventStoreClient\Internal\EventStoreCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Position;
use Prooph\EventStoreClient\StreamMetadata;
use Prooph\EventStoreClient\SubscriptionDropReason;
use Prooph\EventStoreClient\UserCredentials;
Expand Down Expand Up @@ -288,5 +291,130 @@ public function __invoke(
});
}

// @todo: 3 tests missing
/**
* @test
* @throws Throwable
*/
public function read_all_existing_events_and_keep_listening_to_new_ones(): void
{
$this->execute(function () {
$store = TestConnection::createAsync();

yield $store->connectAsync();

/** @var AllEventsSlice $result */
$result = yield $this->conn->readAllEventsBackwardAsync(Position::end(), 1, false);
$position = $result->nextPosition();

$events = [];
$appearedCounter = 0;
$maxAppeared = 20;
$lastAppeared = new Deferred();
$dropped = new Deferred();

for ($i = 0; $i < 10; $i++) {
yield $store->appendToStreamAsync(
'all_read_all_existing_events_and_keep_listening_to_new_ones-' . $i,
-1,
[new EventData(null, 'et-' . $i, false)]
);
}

/** @var EventStoreAllCatchUpSubscription $subscription */
$subscription = yield $store->subscribeToAllFromAsync(
$position,
CatchUpSubscriptionSettings::default(),
new class($events, $appearedCounter, $maxAppeared, $lastAppeared) implements EventAppearedOnCatchupSubscription {
/** @var array */
private $events;
/** @var int */
private $appearedCounter;
/** @var Deferred */
private $maxAppeared;
/** @var Deferred */
private $lastAppeared;

public function __construct(array &$events, int &$appearedCounter, int $maxAppeared, Deferred $lastAppeared)
{
$this->events = &$events;
$this->appearedCounter = &$appearedCounter;
$this->maxAppeared = $maxAppeared;
$this->lastAppeared = $lastAppeared;
}

public function __invoke(
EventStoreCatchUpSubscription $subscription,
ResolvedEvent $resolvedEvent
): Promise {
if (! SystemStreams::isSystemStream($resolvedEvent->originalEvent()->eventStreamId())) {
$this->events[] = $resolvedEvent;
++$this->appearedCounter;

if ($this->appearedCounter === $this->maxAppeared) {
$this->lastAppeared->resolve(true);
}
}

return new Success();
}
},
null,
new class($dropped) implements CatchUpSubscriptionDropped {
/** @var Deferred */
private $dropped;

public function __construct(Deferred $dropped)
{
$this->dropped = $dropped;
}

public function __invoke(
EventStoreCatchUpSubscription $subscription,
SubscriptionDropReason $reason,
?Throwable $exception = null
): void {
$this->dropped->resolve(true);
}
}
);

for ($i = 10; $i < 20; $i++) {
yield $store->appendToStreamAsync(
'all_read_all_existing_events_and_keep_listening_to_new_ones-' . $i,
-1,
[new EventData(null, 'et-' . $i, false)]
);
}

try {
yield Promise\timeout($lastAppeared->promise(), self::TIMEOUT);
} catch (TimeoutException $e) {
$this->fail('Could not wait for all events.');

return;
}

$this->assertCount(20, $events);

for ($i = 0; $i < 20; $i++) {
$this->assertSame('et-' . $i, $events[$i]->originalEvent()->eventType());
}

$wasDropped = true;

try {
yield Promise\timeout($dropped->promise(), 0);
} catch (TimeoutException $e) {
$wasDropped = false;
}

$this->assertFalse($wasDropped);

$subscription->stopWithTimeout(self::TIMEOUT);

$this->assertTrue(yield Promise\timeout($dropped->promise(), 0));
});
}

// @todo: 2 tests missing
}

0 comments on commit 018c944

Please sign in to comment.