Skip to content

Commit

Permalink
fix amphp problems
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Sep 25, 2018
1 parent deee1b5 commit ad991dd
Show file tree
Hide file tree
Showing 38 changed files with 122 additions and 1,227 deletions.
6 changes: 1 addition & 5 deletions examples/demo-subscription5.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
use Amp\Success;
use Prooph\EventStoreClient\Exception\InvalidOperationException;
use Prooph\EventStoreClient\Internal\AbstractEventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\ResolvedEvent;
use Prooph\EventStoreClient\Messages\ClientMessages\CreatePersistentSubscription;
use Throwable;
Expand Down Expand Up @@ -60,7 +59,7 @@

\var_dump($result);

$subscription = $connection->connectToPersistentSubscription(
$connection->connectToPersistentSubscriptionAsync(
'foo-bar',
'test-persistent-subscription',
new class() implements EventAppearedOnPersistentSubscription {
Expand Down Expand Up @@ -92,7 +91,4 @@ public function __invoke(
true,
new UserCredentials('admin', 'changeit')
);

/** @var EventStorePersistentSubscription $subscription */
$subscription = yield $subscription->start();
});
94 changes: 0 additions & 94 deletions examples/demo-subscription6.php

This file was deleted.

5 changes: 4 additions & 1 deletion src/ClientOperations/AbstractSubscriptionOperation.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace Prooph\EventStoreClient\ClientOperations;

use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use Generator;
Expand Down Expand Up @@ -371,7 +372,9 @@ private function executeActionAsync(callable $action): void
$this->dropSubscription(SubscriptionDropReason::userInitiated(), new \Exception('client buffer too big'));
}

Promise\rethrow($this->executeActions());
Loop::defer(function (): Generator {
yield $this->executeActions();
});
}

/** @return Promise<void> */
Expand Down
11 changes: 0 additions & 11 deletions src/EventStoreAsyncConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use Amp\Promise;
use Prooph\EventStoreClient\Internal\EventStoreAllCatchUpSubscription;
use Prooph\EventStoreClient\Internal\EventStorePersistentSubscription;
use Prooph\EventStoreClient\Internal\EventStoreStreamCatchUpSubscription;
use Prooph\EventStoreClient\Internal\ListenerHandler;

Expand Down Expand Up @@ -197,16 +196,6 @@ public function subscribeToAllFrom(
?UserCredentials $userCredentials = null
): EventStoreAllCatchUpSubscription;

public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnPersistentSubscription $eventAppeared,
?PersistentSubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription;

/**
* @return Promise<AbstractEventStorePersistentSubscription>
*/
Expand Down
6 changes: 1 addition & 5 deletions src/Internal/AbstractEventStorePersistentSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,7 @@ private function enqueue(PersistentSubscriptionResolvedEvent $resolvedEvent): vo
$this->isProcessing = true;

Loop::defer(function (): Generator {
$promise = $this->processQueue();

Promise\rethrow($promise);

yield $promise;
yield $this->processQueue();
});
}
}
Expand Down
40 changes: 2 additions & 38 deletions src/Internal/EventStoreAsyncNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public function subscribeToStreamFrom(
$settings
);

Promise\rethrow($catchUpSubscription->startAsync());
$catchUpSubscription->startAsync();

return $catchUpSubscription;
}
Expand Down Expand Up @@ -671,47 +671,11 @@ public function subscribeToAllFrom(
$settings
);

Promise\rethrow($catchUpSubscription->startAsync());
$catchUpSubscription->startAsync();

return $catchUpSubscription;
}

public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnPersistentSubscription $eventAppeared,
?PersistentSubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription {
if (empty($stream)) {
throw new InvalidArgumentException('Stream cannot be empty');
}

if (empty($groupName)) {
throw new InvalidArgumentException('Group cannot be empty');
}

$subscription = new EventStorePersistentSubscription(
$groupName,
$stream,
$eventAppeared,
$subscriptionDropped,
$userCredentials,
$this->settings->log(),
$this->settings->verboseLogging(),
$this->settings,
$this->handler,
$bufferSize,
$autoAck
);

Promise\rethrow($subscription->start());

return $subscription;
}

/** {@inheritdoc} */
public function connectToPersistentSubscriptionAsync(
string $stream,
Expand Down
10 changes: 2 additions & 8 deletions src/Internal/EventStoreCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ private function onReconnect(ClientConnectionEventArgs $clientConnectionEventArg
$this->connection->detach($this->connectListener);

Loop::defer(function (): Generator {
$promise = $this->runSubscriptionAsync();
Promise\rethrow($promise);

yield $promise;
yield $this->runSubscriptionAsync();
});
}

Expand Down Expand Up @@ -462,10 +459,7 @@ private function ensureProcessingPushQueue(): void
$this->isProcessing = true;

Loop::defer(function (): Generator {
$promise = $this->processLiveQueueAsync();
Promise\rethrow($promise);

yield $promise;
yield $this->processLiveQueueAsync();
});
}
}
Expand Down
8 changes: 1 addition & 7 deletions src/Internal/EventStoreConnectionLogicHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
use Prooph\EventStoreClient\SystemData\TcpPackage;
use Prooph\EventStoreClient\Transport\Tcp\TcpPackageConnection;
use Throwable;
use function Amp\Promise\rethrow;

/** @internal */
class EventStoreConnectionLogicHandler
Expand Down Expand Up @@ -250,8 +249,6 @@ private function discoverEndPoint(?Deferred $deferred): void
$deferred->resolve(null);
}
});

rethrow($promise);
}

/** @throws \Exception */
Expand Down Expand Up @@ -334,10 +331,7 @@ function (TcpPackageConnection $connection, Throwable $exception): void {
);

Loop::defer(function (): Generator {
$promise = $this->connection->connectAsync();
rethrow($promise);

yield $promise;
yield $this->connection->connectAsync();

if (! $this->connection->isClosed()) {
$this->connection->startReceiving();
Expand Down
23 changes: 0 additions & 23 deletions src/Internal/EventStoreSyncNodeConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Prooph\EventStoreClient\ClusterSettings;
use Prooph\EventStoreClient\ConditionalWriteResult;
use Prooph\EventStoreClient\ConnectionSettings;
use Prooph\EventStoreClient\EventAppearedOnSubscription;
use Prooph\EventStoreClient\EventReadResult;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\EventStoreSyncConnection;
Expand All @@ -31,7 +30,6 @@
use Prooph\EventStoreClient\StreamEventsSlice;
use Prooph\EventStoreClient\StreamMetadata;
use Prooph\EventStoreClient\StreamMetadataResult;
use Prooph\EventStoreClient\SubscriptionDropped;
use Prooph\EventStoreClient\SystemSettings;
use Prooph\EventStoreClient\UserCredentials;
use Prooph\EventStoreClient\WriteResult;
Expand Down Expand Up @@ -273,27 +271,6 @@ public function deletePersistentSubscription(
return Promise\wait($this->asyncConnection->deletePersistentSubscriptionAsync($stream, $groupName, $userCredentials));
}

/** @throws Throwable */
public function connectToPersistentSubscription(
string $stream,
string $groupName,
EventAppearedOnSubscription $eventAppeared,
?SubscriptionDropped $subscriptionDropped = null,
int $bufferSize = 10,
bool $autoAck = true,
?UserCredentials $userCredentials = null
): EventStorePersistentSubscription {
return $this->asyncConnection->connectToPersistentSubscription(
$stream,
$groupName,
$eventAppeared,
$subscriptionDropped,
$bufferSize,
$autoAck,
$userCredentials
);
}

public function startTransaction(
string $stream,
int $expectedVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected function given(): Generator
$this->resetEvent->resolve(true);
};

$this->conn->connectToPersistentSubscription(
yield $this->conn->connectToPersistentSubscriptionAsync(
$this->stream,
$this->group,
new class() implements EventAppearedOnPersistentSubscription {
Expand Down

0 comments on commit ad991dd

Please sign in to comment.