Skip to content

Commit

Permalink
Merge pull request #124 from prooph/remove_manual_reset_event_slim
Browse files Browse the repository at this point in the history
remove ManualResetEventSlim with amp's timeoutWithDefault function
  • Loading branch information
prolic committed Apr 26, 2020
2 parents 049f487 + 5765120 commit 2a2eb09
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 100 deletions.
6 changes: 5 additions & 1 deletion .php_cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
<?php

$config = new Prooph\CS\Config\Prooph();
$config->getFinder()->in(__DIR__);
$config
->getFinder()
->in(__DIR__)
->exclude('GPBMetadata')
->exclude('src/Messages/ClientMessages');

$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__;

Expand Down
12 changes: 7 additions & 5 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace Prooph\EventStoreClient\Internal;

use function Amp\call;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
Expand Down Expand Up @@ -63,7 +64,7 @@ abstract class EventStoreCatchUpSubscription implements AsyncEventStoreCatchUpSu
private bool $isProcessing = false;
protected bool $shouldStop = false;
private bool $isDropped = false;
private ManualResetEventSlim $stopped;
private Deferred $stopped;
private ListenerHandler $connectListener;

/** @internal */
Expand Down Expand Up @@ -97,7 +98,8 @@ public function __construct(
$this->subscriptionName = $settings->subscriptionName() ?? '';
$this->connectListener = new ListenerHandler(function (): void {
});
$this->stopped = new ManualResetEventSlim(true);
$this->stopped = new Deferred();
$this->stopped->resolve(true);
}

public function isSubscribedToAll(): bool
Expand Down Expand Up @@ -169,7 +171,7 @@ public function stop(?int $timeout = null): Promise
));
}

return $this->stopped->wait($timeout);
return Promise\timeoutWithDefault($this->stopped->promise(), $timeout, false);
}

private function onReconnect(ClientConnectionEventArgs $clientConnectionEventArgs): void
Expand Down Expand Up @@ -209,7 +211,7 @@ private function loadHistoricalEventsAsync(): Promise
));
}

$this->stopped->reset();
$this->stopped = new Deferred();
$this->allowProcessing = false;

return call(function (): Generator {
Expand Down Expand Up @@ -495,7 +497,7 @@ public function dropSubscription(SubscriptionDropReason $reason, ?Throwable $err
($this->subscriptionDropped)($this, $reason, $error);
}

$this->stopped->set();
$this->stopped->resolve(true);
}
}
}
11 changes: 6 additions & 5 deletions src/Internal/EventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class EventStorePersistentSubscription implements AsyncEventStorePersistentSubsc
private ?DropData $dropData = null;
private bool $isDropped = false;
private int $bufferSize;
private ManualResetEventSlim $stopped;
private Deferred $stopped;

/** @internal */
public function __construct(
Expand Down Expand Up @@ -91,7 +91,8 @@ public function __construct(
$this->bufferSize = $bufferSize;
$this->autoAck = $autoAck;
$this->queue = new SplQueue();
$this->stopped = new ManualResetEventSlim(true);
$this->stopped = new Deferred();
$this->stopped->resolve(true);
$this->handler = $handler;
}

Expand Down Expand Up @@ -129,7 +130,7 @@ public function startSubscription(
*/
public function start(): Promise
{
$this->stopped->reset();
$this->stopped = new Deferred();

$eventAppeared = fn (PersistentEventStoreSubscription $subscription, PersistentSubscriptionResolvedEvent $resolvedEvent): Promise => $this->onEventAppeared($resolvedEvent);

Expand Down Expand Up @@ -284,7 +285,7 @@ public function stop(?int $timeout = null): Promise
return new Success();
}

return $this->stopped->wait($timeout);
return Promise\timeoutWithDefault($this->stopped->promise(), $timeout, false);
}

private function enqueueSubscriptionDropNotification(
Expand Down Expand Up @@ -413,7 +414,7 @@ private function dropSubscription(SubscriptionDropReason $reason, ?Throwable $er
($this->subscriptionDropped)($this, $reason, $error);
}

$this->stopped->set();
$this->stopped->resolve(true);
}
}
}
64 changes: 0 additions & 64 deletions src/Internal/ManualResetEventSlim.php

This file was deleted.

12 changes: 6 additions & 6 deletions tests/subscribe_to_all_catching_up_should.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
namespace ProophTest\EventStoreClient;

use function Amp\call;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Promise;
use Amp\Success;
Expand All @@ -36,7 +37,6 @@
use Prooph\EventStore\SubscriptionDropReason;
use Prooph\EventStore\UserCredentials;
use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ManualResetEventSlim;
use ProophTest\EventStoreClient\Helper\TestEvent;
use Throwable;

Expand Down Expand Up @@ -170,16 +170,16 @@ public function __invoke(
*/
public function be_able_to_subscribe_to_empty_db(): Generator
{
$appeared = new ManualResetEventSlim();
$appeared = new Deferred();
$dropped = new CountdownEvent(1);

$subscription = yield $this->connection->subscribeToAllFromAsync(
null,
CatchUpSubscriptionSettings::default(),
new class($appeared) implements EventAppearedOnCatchupSubscription {
private ManualResetEventSlim $appeared;
private Deferred $appeared;

public function __construct(ManualResetEventSlim $appeared)
public function __construct(Deferred $appeared)
{
$this->appeared = $appeared;
}
Expand All @@ -189,7 +189,7 @@ public function __invoke(
ResolvedEvent $resolvedEvent
): Promise {
if (! SystemStreams::isSystemStream($resolvedEvent->originalEvent()->eventStreamId())) {
$this->appeared->set();
$this->appeared->resolve(true);
}

return new Success();
Expand Down Expand Up @@ -231,7 +231,7 @@ public function __invoke(

yield new Delayed(5000);

$this->assertFalse(yield $appeared->wait(0), 'Some event appeared');
$this->assertFalse(yield Promise\timeoutWithDefault($appeared->promise(), 0, false), 'Some event appeared');
$this->assertFalse(yield $dropped->wait(0), 'Subscription was dropped prematurely');

yield $subscription->stop(self::TIMEOUT);
Expand Down
38 changes: 19 additions & 19 deletions tests/subscribe_to_stream_catching_up_should.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace ProophTest\EventStoreClient;

use Amp\Deferred;
use Amp\Delayed;
use Amp\Promise;
use Amp\Success;
Expand All @@ -28,7 +29,6 @@
use Prooph\EventStore\ResolvedEvent;
use Prooph\EventStore\SubscriptionDropReason;
use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ManualResetEventSlim;
use ProophTest\EventStoreClient\Helper\TestEvent;
use Throwable;

Expand All @@ -41,7 +41,7 @@ public function be_able_to_subscribe_to_non_existing_stream(): Generator
{
$stream = 'be_able_to_subscribe_to_non_existing_stream';

$appeared = new ManualResetEventSlim(false);
$appeared = new Deferred();
$dropped = new CountdownEvent(1);

$subscription = yield $this->connection->subscribeToStreamFromAsync(
Expand Down Expand Up @@ -71,7 +71,7 @@ public function __invoke(

yield new Delayed(self::TIMEOUT);

$this->assertFalse(yield $appeared->wait(0), 'Some event appeared');
$this->assertFalse(yield Promise\timeoutWithDefault($appeared->promise(), 0, false), 'Some event appeared');
$this->assertFalse(yield $dropped->wait(0), 'Subscription was dropped prematurely');

yield $subscription->stop(self::TIMEOUT);
Expand Down Expand Up @@ -121,8 +121,8 @@ public function allow_multiple_subscriptions_to_same_stream(): Generator
$stream = 'allow_multiple_subscriptions_to_same_stream';

$appeared = new CountdownEvent(2);
$dropped1 = new ManualResetEventSlim(false);
$dropped2 = new ManualResetEventSlim(false);
$dropped1 = new Deferred();
$dropped2 = new Deferred();

$sub1 = yield $this->connection->subscribeToStreamFromAsync(
$stream,
Expand Down Expand Up @@ -151,20 +151,20 @@ public function allow_multiple_subscriptions_to_same_stream(): Generator
);

if (! yield $appeared->wait(self::TIMEOUT)) {
$this->assertFalse(yield $dropped1->wait(0), 'Subscription1 was dropped prematurely');
$this->assertFalse(yield $dropped2->wait(0), 'Subscription2 was dropped prematurely');
$this->assertFalse(yield Promise\timeoutWithDefault($dropped1->promise(), 0, false), 'Subscription1 was dropped prematurely');
$this->assertFalse(yield Promise\timeoutWithDefault($dropped2->promise(), 0, false), 'Subscription2 was dropped prematurely');
$this->fail('Could not wait for all events');

return;
}

$this->assertFalse(yield $dropped1->wait(0));
$this->assertFalse(yield Promise\timeoutWithDefault($dropped1->promise(), 0, false));
yield $sub1->stop(self::TIMEOUT);
$this->assertTrue(yield $dropped1->wait(self::TIMEOUT));
$this->assertTrue(yield Promise\timeoutWithDefault($dropped1->promise(), self::TIMEOUT, false));

$this->assertFalse(yield $dropped2->wait(0));
$this->assertFalse(yield Promise\timeoutWithDefault($dropped2->promise(), 0, false));
yield $sub2->stop(self::TIMEOUT);
$this->assertTrue(yield $dropped2->wait(self::TIMEOUT));
$this->assertTrue(yield Promise\timeoutWithDefault($dropped2->promise(), self::TIMEOUT, false));
}

/** @test */
Expand Down Expand Up @@ -433,12 +433,12 @@ public function __invoke(
};
}

private function appearedWithResetEvent(ManualResetEventSlim $appeared): EventAppearedOnCatchupSubscription
private function appearedWithResetEvent(Deferred $appeared): EventAppearedOnCatchupSubscription
{
return new class($appeared) implements EventAppearedOnCatchupSubscription {
private ManualResetEventSlim $appeared;
private Deferred $appeared;

public function __construct(ManualResetEventSlim $appeared)
public function __construct(Deferred $appeared)
{
$this->appeared = $appeared;
}
Expand All @@ -447,7 +447,7 @@ public function __invoke(
EventStoreCatchUpSubscription $subscription,
ResolvedEvent $resolvedEvent
): Promise {
$this->appeared->set();
$this->appeared->resolve(true);

return new Success();
}
Expand All @@ -474,12 +474,12 @@ public function __invoke(
};
}

private function droppedWithResetEvent(ManualResetEventSlim $dropped): CatchUpSubscriptionDropped
private function droppedWithResetEvent(Deferred $dropped): CatchUpSubscriptionDropped
{
return new class($dropped) implements CatchUpSubscriptionDropped {
private ManualResetEventSlim $dropped;
private Deferred $dropped;

public function __construct(ManualResetEventSlim $dropped)
public function __construct(Deferred $dropped)
{
$this->dropped = $dropped;
}
Expand All @@ -489,7 +489,7 @@ public function __invoke(
SubscriptionDropReason $reason,
?Throwable $exception = null
): void {
$this->dropped->set();
$this->dropped->resolve(true);
}
};
}
Expand Down

0 comments on commit 2a2eb09

Please sign in to comment.