Skip to content

Commit

Permalink
Merge 04bf979 into 018c944
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Oct 27, 2018
2 parents 018c944 + 04bf979 commit cdfd46e
Show file tree
Hide file tree
Showing 17 changed files with 853 additions and 161 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -45,7 +45,7 @@ before_script:
- composer update --prefer-dist $DEPENDENCIES

script:
- if [[ $TEST_COVERAGE == 'true' ]]; then php -dzend_extension=xdebug.so ./vendor/bin/phpunit --exclude-group longrunning --coverage-text --coverage-clover ./build/logs/clover.xml; else ./vendor/bin/phpunit; fi
- if [[ $TEST_COVERAGE == 'true' ]]; then php -dzend_extension=xdebug.so ./vendor/bin/phpunit --exclude-group=ignore --coverage-text --coverage-clover ./build/logs/clover.xml; else ./vendor/bin/phpunit --exclude-group=ignore; fi
- if [[ $EXECUTE_CS_CHECK == 'true' ]]; then ./vendor/bin/php-cs-fixer fix -v --diff --dry-run; fi

after_script:
Expand Down
8 changes: 8 additions & 0 deletions README.md
Expand Up @@ -52,6 +52,14 @@ Run the server with memory database
./run-node.sh --run-projections=all --mem-db
```

You need to ignore the `ignore` group

```console
./vendor/bin/phpunit --exclude-group=ignore
```

Those are tests that only work against an empty database and can only be run manually.

Before next run, restart the server. This way you can always start with a clean server.

## Documentation
Expand Down
2 changes: 0 additions & 2 deletions docker/unittest/phpunit/phpunit.xml
Expand Up @@ -21,8 +21,6 @@
processIsolation="false"
stopOnFailure="false"
bootstrap="./../vendor/autoload.php"
timeoutForSmallTests="2"
enforceTimeLimit="true"
failOnWarning="true"
failOnRisky="true"
>
Expand Down
23 changes: 11 additions & 12 deletions src/Internal/AbstractEventStorePersistentSubscription.php
Expand Up @@ -25,7 +25,6 @@
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Exception\RuntimeException;
use Prooph\EventStoreClient\Exception\TimeoutException;
use Prooph\EventStoreClient\Internal\ResolvedEvent as InternalResolvedEvent;
use Prooph\EventStoreClient\PersistentSubscriptionDropped;
use Prooph\EventStoreClient\PersistentSubscriptionNakEventAction;
Expand Down Expand Up @@ -77,11 +76,10 @@ abstract class AbstractEventStorePersistentSubscription

/** @var int */
private $isDropped;
//private readonly ManualResetEventSlim _stopped = new ManualResetEventSlim(true);
/** @var int */
private $bufferSize;
/** @var bool */
private $stopped = true;
/** @var ManualResetEventSlim */
private $stopped;

/**
* @internal
Expand Down Expand Up @@ -113,6 +111,7 @@ public function __construct(
$this->bufferSize = $bufferSize;
$this->autoAck = $autoAck;
$this->queue = new SplQueue();
$this->stopped = new ManualResetEventSlim(true);
}

/**
Expand All @@ -122,7 +121,7 @@ public function __construct(
*/
public function start(): Promise
{
$this->stopped = false;
$this->stopped->reset();

$eventAppearedCallback = function (
EventStoreSubscription $subscription,
Expand Down Expand Up @@ -297,7 +296,7 @@ function (ResolvedEvent $event): EventId {
$this->subscription->notifyEventsFailed($ids, $action, $reason);
}

public function stop(int $timeout): void
public function stop(?int $timeout = null): Promise
{
if ($this->verbose) {
$this->log->debug(\sprintf(
Expand All @@ -308,11 +307,11 @@ public function stop(int $timeout): void

$this->enqueueSubscriptionDropNotification(SubscriptionDropReason::userInitiated(), null);

Loop::delay($timeout, function (): void {
if (! $this->stopped) {
throw new TimeoutException('Could not stop subscription in time');
}
});
if (null === $timeout) {
return new Success();
}

return $this->stopped->wait($timeout);
}

private function enqueueSubscriptionDropNotification(SubscriptionDropReason $reason, ?Throwable $error): void
Expand Down Expand Up @@ -436,7 +435,7 @@ private function dropSubscription(SubscriptionDropReason $reason, ?Throwable $er
($this->subscriptionDropped)($this, $reason, $error);
}

$this->stopped = true;
$this->stopped->set();
}
}
}
41 changes: 18 additions & 23 deletions src/Internal/EventStoreCatchUpSubscription.php
Expand Up @@ -25,7 +25,6 @@
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\EventStoreSubscription;
use Prooph\EventStoreClient\Exception\TimeoutException;
use Prooph\EventStoreClient\Internal\ResolvedEvent as InternalResolvedEvent;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\ResolvedEvent;
Expand Down Expand Up @@ -89,7 +88,7 @@ abstract class EventStoreCatchUpSubscription
protected $shouldStop;
/** @var bool */
private $isDropped;
/** @var bool */
/** @var ManualResetEventSlim */
private $stopped;

/** @var ListenerHandler */
Expand Down Expand Up @@ -125,6 +124,7 @@ public function __construct(
$this->subscriptionName = $settings->subscriptionName() ?? '';
$this->connectListener = function (): void {
};
$this->stopped = new ManualResetEventSlim(true);
}

public function isSubscribedToAll(): bool
Expand Down Expand Up @@ -166,25 +166,7 @@ public function startAsync(): Promise
return $this->runSubscriptionAsync();
}

public function stopWithTimeout(int $timeout): void
{
$this->stop();

if ($this->verbose) {
$this->log->debug(\sprintf(
'Waiting on subscription %s to stop',
$this->subscriptionName
));
}

Loop::delay($timeout, function (): void {
if (! $this->stopped) {
throw new TimeoutException('Could not stop in time');
}
});
}

public function stop(): void
public function stop(?int $timeout = null): Promise
{
if ($this->verbose) {
$this->log->debug(\sprintf(
Expand All @@ -202,6 +184,19 @@ public function stop(): void
$this->connection->detach($this->connectListener);
$this->shouldStop = true;
$this->enqueueSubscriptionDropNotification(SubscriptionDropReason::userInitiated(), null);

if (null === $timeout) {
return new Success();
}

if ($this->verbose) {
$this->log->debug(\sprintf(
'Waiting on subscription %s to stop',
$this->subscriptionName
));
}

return $this->stopped->wait($timeout);
}

private function onReconnect(ClientConnectionEventArgs $clientConnectionEventArgs): void
Expand Down Expand Up @@ -241,7 +236,7 @@ private function loadHistoricalEventsAsync(): Promise
));
}

$this->stopped = false;
$this->stopped->reset();
$this->allowProcessing = false;

return call(function (): Generator {
Expand Down Expand Up @@ -527,7 +522,7 @@ public function dropSubscription(SubscriptionDropReason $reason, ?Throwable $err
($this->subscriptionDropped)($this, $reason, $error);
}

$this->stopped = true;
$this->stopped->set();
}
}
}
66 changes: 66 additions & 0 deletions src/Internal/ManualResetEventSlim.php
@@ -0,0 +1,66 @@
<?php

/**
* This file is part of `prooph/event-store-client`.
* (c) 2018-2018 prooph software GmbH <contact@prooph.de>
* (c) 2018-2018 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Prooph\EventStoreClient\Internal;

use Amp\Deferred;
use Amp\Promise;

/** @internal */
class ManualResetEventSlim
{
/** @var bool */
private $isSet;
/** @var Deferred */
private $deferred;

public function __construct(bool $isSet = false)
{
$this->isSet = $isSet;
$this->deferred = new Deferred();

if ($isSet) {
$this->deferred->resolve(true);
}
}

public function set(): void
{
$this->isSet = true;
$this->deferred->resolve(true);
}

public function reset(): void
{
$this->isSet = false;
$this->deferred = new Deferred();
}

public function wait(int $timeout): Promise
{
$promise = Promise\timeout($this->deferred->promise(), $timeout);

$deferred = new Deferred();
$newPromise = $deferred->promise();

$promise->onResolve(function (?\Throwable $exception = null, $result) use ($deferred) {
if ($exception) {
$deferred->resolve(false);
} else {
$deferred->resolve(true);
}
});

return $newPromise;
}
}
69 changes: 69 additions & 0 deletions tests/CountdownEvent.php
@@ -0,0 +1,69 @@
<?php

/**
* This file is part of `prooph/event-store-client`.
* (c) 2018-2018 prooph software GmbH <contact@prooph.de>
* (c) 2018-2018 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\EventStoreClient;

use Amp\Deferred;
use Amp\Promise;
use InvalidArgumentException;
use Prooph\EventStoreClient\Exception\RuntimeException;

/** @internal */
class CountdownEvent
{
/** @var int */
private $counter;
/** @var Deferred */
private $deferred;

public function __construct(int $counter)
{
if ($counter < 1) {
throw new InvalidArgumentException('Counter must be positive');
}

$this->counter = $counter;
$this->deferred = new Deferred();
}

public function signal(): void
{
if (0 === $this->counter) {
throw new RuntimeException('CountdownEvent already resolved');
}

--$this->counter;

if (0 === $this->counter) {
$this->deferred->resolve(true);
}
}

public function wait(int $timeout): Promise
{
$promise = Promise\timeout($this->deferred->promise(), $timeout);

$deferred = new Deferred();
$newPromise = $deferred->promise();

$promise->onResolve(function (?\Throwable $exception = null, $result) use ($deferred) {
if ($exception) {
$deferred->resolve(false);
} else {
$deferred->resolve(true);
}
});

return $newPromise;
}
}
4 changes: 2 additions & 2 deletions tests/Helper/TestEvent.php
Expand Up @@ -23,7 +23,7 @@ private function __construct()
{
}

public static function new(?EventId $eventId = null, ?string $data = null, ?string $metadata = null): EventData
public static function newTestEvent(?EventId $eventId = null, ?string $data = null, ?string $metadata = null): EventData
{
if (null === $eventId) {
$eventId = EventId::generate();
Expand All @@ -38,7 +38,7 @@ public static function newAmount(int $amount): array
$events = [];

for ($i = 0; $i < $amount; $i++) {
$events[] = self::new();
$events[] = self::newTestEvent();
}

return $events;
Expand Down

0 comments on commit cdfd46e

Please sign in to comment.