From fd30ee240f4429d72896185e1cc01dff5d372f39 Mon Sep 17 00:00:00 2001 From: prolic Date: Thu, 23 Aug 2018 16:03:35 +0800 Subject: [PATCH] - refactor transaction interfaces - refactor EventStoreConnectionBuilder - add tests/appending_to_implicitly_created_stream_using_transaction --- examples/demo-async-cluster.php | 2 +- examples/demo-async-dns-cluster.php | 2 +- examples/demo-async-with-logger.php | 2 +- examples/demo-async.php | 2 +- examples/demo-subscription-with-logger.php | 2 +- examples/demo-subscription1.php | 2 +- examples/demo-subscription2.php | 2 +- examples/demo-subscription3.php | 2 +- examples/demo-subscription4.php | 2 +- examples/demo-subscription5.php | 2 +- examples/demo-subscription6.php | 2 +- examples/demo-sync.php | 2 +- .../StartAsyncTransactionOperation.php | 2 + src/EventStoreAsyncConnection.php | 12 + ...p => EventStoreAsyncConnectionBuilder.php} | 56 +-- src/EventStoreAsyncTransaction.php | 2 +- src/EventStoreSyncConnection.php | 11 + src/EventStoreSyncConnectionBuilder.php | 210 ++++++++++ src/EventStoreSyncTransaction.php | 2 +- .../EventStoreAsyncTransactionConnection.php | 12 - .../EventStoreSyncTransactionConnection.php | 11 - tests/Helper/Connection.php | 4 +- tests/Helper/OngoingTransaction.php | 52 +++ tests/Helper/TransactionalWriter.php | 49 +++ ...citly_created_stream_using_transaction.php | 382 ++++++++++++++++++ tests/not_connected_tests.php | 4 +- 26 files changed, 741 insertions(+), 92 deletions(-) rename src/{EventStoreConnectionBuilder.php => EventStoreAsyncConnectionBuilder.php} (83%) create mode 100644 src/EventStoreSyncConnectionBuilder.php create mode 100644 tests/Helper/OngoingTransaction.php create mode 100644 tests/Helper/TransactionalWriter.php create mode 100644 tests/appending_to_implicitly_created_stream_using_transaction.php diff --git a/examples/demo-async-cluster.php b/examples/demo-async-cluster.php index d2a8a3b8..f85533c0 100644 --- a/examples/demo-async-cluster.php +++ b/examples/demo-async-cluster.php @@ -24,7 +24,7 @@ new IpEndPoint('eventstore3', 2133), ]); - $connection = EventStoreConnectionBuilder::createAsyncFromSettings( + $connection = EventStoreAsyncConnectionBuilder::createFromSettings( null, $builder->build(), 'cluster-connection' diff --git a/examples/demo-async-dns-cluster.php b/examples/demo-async-dns-cluster.php index fb6eab09..3056f3cf 100644 --- a/examples/demo-async-dns-cluster.php +++ b/examples/demo-async-dns-cluster.php @@ -21,7 +21,7 @@ $builder->setClusterDns('escluster.net'); $builder->setClusterGossipPort(2113); - $connection = EventStoreConnectionBuilder::createAsyncFromSettings( + $connection = EventStoreAsyncConnectionBuilder::createFromSettings( null, $builder->build(), 'dns-cluster-connection' diff --git a/examples/demo-async-with-logger.php b/examples/demo-async-with-logger.php index b5fb51e5..7eace0c5 100644 --- a/examples/demo-async-with-logger.php +++ b/examples/demo-async-with-logger.php @@ -21,7 +21,7 @@ $builder->enableVerboseLogging(); $builder->useConsoleLogger(); - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113), $builder->build() ); diff --git a/examples/demo-async.php b/examples/demo-async.php index 0114b606..018e7df0 100644 --- a/examples/demo-async.php +++ b/examples/demo-async.php @@ -17,7 +17,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription-with-logger.php b/examples/demo-subscription-with-logger.php index fdadcc56..93c11473 100644 --- a/examples/demo-subscription-with-logger.php +++ b/examples/demo-subscription-with-logger.php @@ -25,7 +25,7 @@ $builder->enableVerboseLogging(); $builder->useConsoleLogger(); - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113), $builder->build() ); diff --git a/examples/demo-subscription1.php b/examples/demo-subscription1.php index 2e2d8e99..c3f48b02 100644 --- a/examples/demo-subscription1.php +++ b/examples/demo-subscription1.php @@ -22,7 +22,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription2.php b/examples/demo-subscription2.php index 9f191824..b4fffb46 100644 --- a/examples/demo-subscription2.php +++ b/examples/demo-subscription2.php @@ -22,7 +22,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription3.php b/examples/demo-subscription3.php index d2f6800e..3fbf671f 100644 --- a/examples/demo-subscription3.php +++ b/examples/demo-subscription3.php @@ -22,7 +22,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription4.php b/examples/demo-subscription4.php index b1c4051a..054ddbd7 100644 --- a/examples/demo-subscription4.php +++ b/examples/demo-subscription4.php @@ -22,7 +22,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription5.php b/examples/demo-subscription5.php index 2bef6527..323a8e7b 100644 --- a/examples/demo-subscription5.php +++ b/examples/demo-subscription5.php @@ -25,7 +25,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription6.php b/examples/demo-subscription6.php index e3fe5836..cca458db 100644 --- a/examples/demo-subscription6.php +++ b/examples/demo-subscription6.php @@ -24,7 +24,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/examples/demo-sync.php b/examples/demo-sync.php index ae2eb533..773f8b55 100644 --- a/examples/demo-sync.php +++ b/examples/demo-sync.php @@ -14,7 +14,7 @@ require __DIR__ . '/../vendor/autoload.php'; -$connection = EventStoreConnectionBuilder::createFromIpEndPoint( +$connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint('eventstore', 1113) ); diff --git a/src/ClientOperations/StartAsyncTransactionOperation.php b/src/ClientOperations/StartAsyncTransactionOperation.php index 03ab8fae..d89bf3b8 100644 --- a/src/ClientOperations/StartAsyncTransactionOperation.php +++ b/src/ClientOperations/StartAsyncTransactionOperation.php @@ -72,6 +72,8 @@ protected function createRequestDto(): ProtobufMessage $message->setRequireMaster($this->requireMaster); $message->setEventStreamId($this->stream); $message->setExpectedVersion($this->expectedVersion); + + return $message; } protected function inspectResponse(ProtobufMessage $response): InspectionResult diff --git a/src/EventStoreAsyncConnection.php b/src/EventStoreAsyncConnection.php index 4a158d36..308944bb 100644 --- a/src/EventStoreAsyncConnection.php +++ b/src/EventStoreAsyncConnection.php @@ -122,6 +122,18 @@ public function getStreamMetadataAsync(string $stream, UserCredentials $userCred /** @return Promise */ public function setSystemSettingsAsync(SystemSettings $settings, UserCredentials $userCredentials = null): Promise; + /** @return Promise */ + public function startTransactionAsync( + string $stream, + int $expectedVersion, + UserCredentials $userCredentials = null + ): Promise; + + public function continueTransaction( + int $transactionId, + UserCredentials $userCredentials = null + ): EventStoreAsyncTransaction; + /** @return Promise */ public function createPersistentSubscriptionAsync( string $stream, diff --git a/src/EventStoreConnectionBuilder.php b/src/EventStoreAsyncConnectionBuilder.php similarity index 83% rename from src/EventStoreConnectionBuilder.php rename to src/EventStoreAsyncConnectionBuilder.php index 00192d81..cd0c7233 100644 --- a/src/EventStoreConnectionBuilder.php +++ b/src/EventStoreAsyncConnectionBuilder.php @@ -13,15 +13,13 @@ namespace Prooph\EventStoreClient; use Prooph\EventStoreClient\EventStoreAsyncConnection as AsyncConnection; -use Prooph\EventStoreClient\EventStoreSyncConnection as SyncConnection; use Prooph\EventStoreClient\Exception\InvalidArgumentException; use Prooph\EventStoreClient\Internal\ClusterDnsEndPointDiscoverer; use Prooph\EventStoreClient\Internal\EventStoreAsyncNodeConnection; -use Prooph\EventStoreClient\Internal\EventStoreSyncNodeConnection; use Prooph\EventStoreClient\Internal\SingleEndpointDiscoverer; use Prooph\EventStoreClient\Internal\StaticEndPointDiscoverer; -class EventStoreConnectionBuilder +class EventStoreAsyncConnectionBuilder { /** * Sub-delimiters used in user info, query strings and fragments. @@ -37,18 +35,18 @@ class EventStoreConnectionBuilder private const TCP_PORT_DEFAULT = 1113; /** @throws \Exception */ - public static function createAsyncFromBuilder( + public static function createFromBuilder( string $connectionString = null, ConnectionSettingsBuilder $builder = null, string $connectionName = '' ): AsyncConnection { $builder = $builder ?? new ConnectionSettingsBuilder(); - return self::createAsyncFromSettings($connectionString, $builder->build(), $connectionName); + return self::createFromSettings($connectionString, $builder->build(), $connectionName); } /** @throws \Exception */ - public static function createAsyncFromSettings( + public static function createFromSettings( string $connectionString = null, ConnectionSettings $settings = null, string $connectionName = '' @@ -113,7 +111,7 @@ public static function createAsyncFromSettings( throw new \Exception('Must specify uri, ClusterDNS or gossip seeds'); } - public static function createAsyncFromIpEndPoint( + public static function createFromIpEndPoint( IpEndPoint $endPoint, ConnectionSettings $settings = null, string $connectionName = null @@ -128,50 +126,6 @@ public static function createAsyncFromIpEndPoint( ); } - /** @throws \Exception */ - public static function createFromBuilder( - string $connectionString = null, - ConnectionSettingsBuilder $builder = null, - string $connectionName = '' - ): SyncConnection { - $connection = self::createAsyncFromBuilder( - $connectionString, - $builder, - $connectionName - ); - - return new EventStoreSyncNodeConnection($connection); - } - - /** @throws \Exception */ - public static function createFromSettings( - string $connectionString = null, - ConnectionSettings $settings = null, - string $connectionName = '' - ): SyncConnection { - $connection = self::createAsyncFromSettings( - $connectionString, - $settings, - $connectionName - ); - - return new EventStoreSyncNodeConnection($connection); - } - - public static function createFromIpEndPoint( - IpEndPoint $endPoint, - ConnectionSettings $settings = null, - string $connectionName = null - ): SyncConnection { - $connection = self::createAsyncFromIpEndPoint( - $endPoint, - $settings, - $connectionName - ); - - return new EventStoreSyncNodeConnection($connection); - } - private static function createWithClusterDnsEndPointDiscoverer( ConnectionSettings $settings, string $connectionName = null diff --git a/src/EventStoreAsyncTransaction.php b/src/EventStoreAsyncTransaction.php index 8a1b7649..997f13fb 100644 --- a/src/EventStoreAsyncTransaction.php +++ b/src/EventStoreAsyncTransaction.php @@ -44,7 +44,7 @@ public function transactionId(): int } /** @return Promise */ - public function commit(): Promise + public function commitAsync(): Promise { if ($this->isRolledBack) { throw new \RuntimeException('Cannot commit a rolledback transaction'); diff --git a/src/EventStoreSyncConnection.php b/src/EventStoreSyncConnection.php index cfa92ae3..b9b6e4fd 100644 --- a/src/EventStoreSyncConnection.php +++ b/src/EventStoreSyncConnection.php @@ -112,6 +112,17 @@ public function getStreamMetadata(string $stream, UserCredentials $userCredentia public function setSystemSettings(SystemSettings $settings, UserCredentials $userCredentials = null): WriteResult; + public function startTransaction( + string $stream, + int $expectedVersion, + UserCredentials $userCredentials = null + ): EventStoreSyncTransaction; + + public function continueTransaction( + int $transactionId, + UserCredentials $userCredentials = null + ): EventStoreSyncTransaction; + public function onConnected(callable $handler): ListenerHandler; public function onDisconnected(callable $handler): ListenerHandler; diff --git a/src/EventStoreSyncConnectionBuilder.php b/src/EventStoreSyncConnectionBuilder.php new file mode 100644 index 00000000..1c2fba5d --- /dev/null +++ b/src/EventStoreSyncConnectionBuilder.php @@ -0,0 +1,210 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStoreClient; + +use Prooph\EventStoreClient\EventStoreSyncConnection as SyncConnection; +use Prooph\EventStoreClient\Exception\InvalidArgumentException; +use Prooph\EventStoreClient\Internal\ClusterDnsEndPointDiscoverer; +use Prooph\EventStoreClient\Internal\EventStoreSyncNodeConnection; +use Prooph\EventStoreClient\Internal\SingleEndpointDiscoverer; +use Prooph\EventStoreClient\Internal\StaticEndPointDiscoverer; + +class EventStoreSyncConnectionBuilder +{ + /** + * Sub-delimiters used in user info, query strings and fragments. + * @const string + */ + private const CHAR_SUB_DELIMS = '!\$&\'\(\)\*\+,;='; + /** + * Unreserved characters used in user info, paths, query strings, and fragments. + * @const string + */ + private const CHAR_UNRESERVED = 'a-zA-Z0-9_\-\.~\pL'; + + private const TCP_PORT_DEFAULT = 1113; + + /** @throws \Exception */ + public static function createFromBuilder( + string $connectionString = null, + ConnectionSettingsBuilder $builder = null, + string $connectionName = '' + ): SyncConnection { + $builder = $builder ?? new ConnectionSettingsBuilder(); + + return self::createFromSettings($connectionString, $builder->build(), $connectionName); + } + + /** @throws \Exception */ + public static function createFromSettings( + string $connectionString = null, + ConnectionSettings $settings = null, + string $connectionName = '' + ): SyncConnection { + if (null === $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { + throw new \Exception('Did not find ConnectTo, ClusterDNS or GossipSeeds in the connection string'); + } + + if (null !== $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { + throw new \Exception('Setting ConnectTo as well as GossipSeeds and/or ClusterDNS on the connection string is currently not supported'); + } + + if (null !== $connectionString) { + list($scheme, $host, $port, $user, $pass) = self::parseUri($connectionString); + $credentials = $user ? new UserCredentials($user, $pass) : null; + if (null !== $credentials) { + $settings = new ConnectionSettings( + $settings->maxQueueSize(), + $settings->maxConcurrentItems(), + $settings->maxRetries(), + $settings->maxReconnections(), + $settings->requireMaster(), + $settings->reconnectionDelay(), + $settings->operationTimeout(), + $settings->operationTimeoutCheckPeriod(), + $credentials, + $settings->useSslConnection(), + $settings->targetHost(), + $settings->validateServer(), + $settings->failOnNoServerResponse(), + $settings->heartbeatInterval(), + $settings->heartbeatTimeout(), + $settings->clientConnectionTimeout(), + $settings->clusterDns(), + $settings->gossipSeeds(), + $settings->maxDiscoverAttempts(), + $settings->externalGossipPort(), + $settings->gossipTimeout(), + $settings->preferRandomNode() + ); + } + + if ($scheme === 'discover') { + return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); + } + + if ($scheme === 'tcp') { + return self::createWithSingleEndpointDiscoverer( + $connectionString, + $settings, + $connectionName + ); + } + + throw new \Exception('Unknown scheme for connection'); + } + + if (! empty($settings->gossipSeeds()) || ! empty($settings->clusterDns())) { + return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); + } + + throw new \Exception('Must specify uri, ClusterDNS or gossip seeds'); + } + + public static function createFromIpEndPoint( + IpEndPoint $endPoint, + ConnectionSettings $settings = null, + string $connectionName = null + ): SyncConnection { + $settings = $settings ?? ConnectionSettings::default(); + + return new EventStoreSyncNodeConnection( + $settings, + null, + new StaticEndPointDiscoverer($endPoint, $settings->useSslConnection()), + $connectionName + ); + } + + private static function createWithClusterDnsEndPointDiscoverer( + ConnectionSettings $settings, + string $connectionName = null + ): SyncConnection { + $clusterSettings = new ClusterSettings( + $settings->clusterDns(), + $settings->maxDiscoverAttempts(), + $settings->externalGossipPort(), + $settings->gossipSeeds(), + $settings->gossipTimeout(), + $settings->preferRandomNode() + ); + + $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( + $settings->log(), + $settings->clusterDns(), + $settings->maxDiscoverAttempts(), + $settings->externalGossipPort(), + $settings->gossipSeeds(), + $settings->gossipTimeout(), + $settings->preferRandomNode() + ); + + return new EventStoreSyncNodeConnection($settings, $clusterSettings, $endPointDiscoverer, $connectionName); + } + + private static function createWithSingleEndpointDiscoverer( + string $connectionString, + ConnectionSettings $settings, + string $connectionName = null + ): SyncConnection { + return new EventStoreSyncNodeConnection( + $settings, + null, + new SingleEndpointDiscoverer($connectionString, $settings->useSslConnection()), + $connectionName + ); + } + + private static function parseUri(string $connectionString): array + { + $parts = \parse_url($connectionString); + + if (false === $parts) { + throw new InvalidArgumentException( + 'The source URI string appears to be malformed' + ); + } + + $scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; + $host = isset($parts['host']) ? \strtolower($parts['host']) : ''; + $port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; + $user = isset($parts['user']) ? self::filterUserInfoPart($parts['user']) : ''; + $pass = $parts['pass'] ?? ''; + + return [ + $scheme, + $host, + $port, + $user, + $pass, + ]; + } + + private static function filterScheme(string $scheme): string + { + return \preg_replace('#:(//)?$#', '', \strtolower($scheme)); + } + + private static function filterUserInfoPart(string $part): string + { + // Note the addition of `%` to initial charset; this allows `|` portion + // to match and thus prevent double-encoding. + return \preg_replace_callback( + '/(?:[^%' . self::CHAR_UNRESERVED . self::CHAR_SUB_DELIMS . ']+|%(?![A-Fa-f0-9]{2}))/u', + function (array $matches): string { + return \rawurlencode($matches[0]); + }, + $part + ); + } +} diff --git a/src/EventStoreSyncTransaction.php b/src/EventStoreSyncTransaction.php index 9f4c9ba8..3d8408a1 100644 --- a/src/EventStoreSyncTransaction.php +++ b/src/EventStoreSyncTransaction.php @@ -60,7 +60,7 @@ public function commit(): WriteResult * @param EventData[] $events * @return void */ - public function writeAsync(array $events): void + public function write(array $events): void { if ($this->isRolledBack) { throw new \RuntimeException('Cannot commit a rolledback transaction'); diff --git a/src/Internal/EventStoreAsyncTransactionConnection.php b/src/Internal/EventStoreAsyncTransactionConnection.php index 1dce0470..79f39e19 100644 --- a/src/Internal/EventStoreAsyncTransactionConnection.php +++ b/src/Internal/EventStoreAsyncTransactionConnection.php @@ -19,18 +19,6 @@ /** @internal */ interface EventStoreAsyncTransactionConnection { - /** @return Promise */ - public function startTransactionAsync( - string $stream, - int $expectedVersion, - UserCredentials $userCredentials = null - ): Promise; - - public function continueTransaction( - int $transactionId, - UserCredentials $userCredentials = null - ): EventStoreAsyncTransaction; - public function transactionalWriteAsync( EventStoreAsyncTransaction $transaction, array $events, diff --git a/src/Internal/EventStoreSyncTransactionConnection.php b/src/Internal/EventStoreSyncTransactionConnection.php index 045b9763..a690bd31 100644 --- a/src/Internal/EventStoreSyncTransactionConnection.php +++ b/src/Internal/EventStoreSyncTransactionConnection.php @@ -19,17 +19,6 @@ /** @internal */ interface EventStoreSyncTransactionConnection { - public function startTransaction( - string $stream, - int $expectedVersion, - UserCredentials $userCredentials = null - ): EventStoreSyncTransaction; - - public function continueTransaction( - int $transactionId, - UserCredentials $userCredentials = null - ): EventStoreSyncTransaction; - public function transactionalWrite( EventStoreSyncTransaction $transaction, array $events, diff --git a/tests/Helper/Connection.php b/tests/Helper/Connection.php index 7a7a85a4..878cbd36 100644 --- a/tests/Helper/Connection.php +++ b/tests/Helper/Connection.php @@ -14,7 +14,7 @@ use Prooph\EventStoreClient\ConnectionSettingsBuilder; use Prooph\EventStoreClient\EventStoreAsyncConnection; -use Prooph\EventStoreClient\EventStoreConnectionBuilder; +use Prooph\EventStoreClient\EventStoreAsyncConnectionBuilder; use Prooph\EventStoreClient\EventStoreSyncConnection; use Prooph\EventStoreClient\Internal\EventStoreSyncNodeConnection; use Prooph\EventStoreClient\IpEndPoint; @@ -35,7 +35,7 @@ public static function createAsync(): EventStoreAsyncConnection $settingsBuilder = new ConnectionSettingsBuilder(); $settingsBuilder->setDefaultUserCredentials(new UserCredentials($user, $pass)); - return EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + return EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint($host, $port), $settingsBuilder->build() ); diff --git a/tests/Helper/OngoingTransaction.php b/tests/Helper/OngoingTransaction.php new file mode 100644 index 00000000..ee5c4307 --- /dev/null +++ b/tests/Helper/OngoingTransaction.php @@ -0,0 +1,52 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient\Helper; + +use Amp\Promise; +use Generator; +use Prooph\EventStoreClient\EventData; +use Prooph\EventStoreClient\EventStoreAsyncTransaction; +use function Amp\call; + +/** @internal */ +class OngoingTransaction +{ + /** @var EventStoreAsyncTransaction */ + private $transaction; + + public function __construct(EventStoreAsyncTransaction $transaction) + { + $this->transaction = $transaction; + } + + /** + * @param EventData[] + * @return Promise + */ + public function writeAsync(array $events): Promise + { + return call(function () use ($events): Generator { + yield $this->transaction->writeAsync($events); + + return $this; + }); + } + + /** @return Promise */ + public function commitAsync(): Promise + { + return call(function (): Generator { + return yield $this->transaction->commitAsync(); + }); + } +} diff --git a/tests/Helper/TransactionalWriter.php b/tests/Helper/TransactionalWriter.php new file mode 100644 index 00000000..42a67657 --- /dev/null +++ b/tests/Helper/TransactionalWriter.php @@ -0,0 +1,49 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient\Helper; + +use Amp\Promise; +use Generator; +use Prooph\EventStoreClient\EventStoreAsyncConnection; +use function Amp\call; + +/** @internal */ +class TransactionalWriter +{ + /** @var EventStoreAsyncConnection */ + private $connection; + /** @var string */ + private $stream; + + public function __construct(EventStoreAsyncConnection $connection, string $stream) + { + $this->connection = $connection; + $this->stream = $stream; + } + + /** + * @param int $expectedVersion + * @return Promise + */ + public function startTransaction(int $expectedVersion): Promise + { + return call(function () use ($expectedVersion): Generator { + $transaction = yield $this->connection->startTransactionAsync( + $this->stream, + $expectedVersion + ); + + return new OngoingTransaction($transaction); + }); + } +} diff --git a/tests/appending_to_implicitly_created_stream_using_transaction.php b/tests/appending_to_implicitly_created_stream_using_transaction.php new file mode 100644 index 00000000..92e438d4 --- /dev/null +++ b/tests/appending_to_implicitly_created_stream_using_transaction.php @@ -0,0 +1,382 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient; + +use PHPUnit\Framework\TestCase; +use Prooph\EventStoreClient\Exception\WrongExpectedVersionException; +use Prooph\EventStoreClient\ExpectedVersion; +use Prooph\EventStoreClient\WriteResult; +use ProophTest\EventStoreClient\Helper\Connection; +use ProophTest\EventStoreClient\Helper\EventsStream; +use ProophTest\EventStoreClient\Helper\OngoingTransaction; +use ProophTest\EventStoreClient\Helper\TestEvent; +use ProophTest\EventStoreClient\Helper\TransactionalWriter; +use Throwable; +use function Amp\call; +use function Amp\Promise\wait; + +class appending_to_implicitly_created_stream_using_transaction extends TestCase +{ + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(6); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(5, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events), $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(6); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(5, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(ExpectedVersion::ANY); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events), $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(6); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(5, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(5); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(6, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events) + 1, $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(6); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(5, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(6); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + + $this->expectException(WrongExpectedVersionException::class); + + yield $ongoingTransaction->commitAsync(); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(6); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(5, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(4); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + + $this->expectException(WrongExpectedVersionException::class); + + yield $ongoingTransaction->commitAsync(); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_0e0_non_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0e0_non_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(1); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(0); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(1, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events) + 1, $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_0any_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0any_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(1); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(ExpectedVersion::ANY); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events), $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_0em1_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0em1_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(1); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([\current($events)]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(0, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events), $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_0em1_1e0_2e1_1any_1any_idempotent(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_1any_1any_idempotent'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(3); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(2, $writeResult->nextExpectedVersion()); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(ExpectedVersion::ANY); + $ongoingTransaction = yield $ongoingTransaction->writeAsync([$events[1]]); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(1, $writeResult->nextExpectedVersion()); + + $total = yield EventsStream::count($connection, $stream); + $this->assertSame(\count($events), $total); + })); + } + + /** + * @test + * @throws Throwable + */ + public function sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail(): void + { + wait(call(function () { + $stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail'; + + $connection = Connection::createAsync(); + yield $connection->connectAsync(); + + $events = TestEvent::newAmount(2); + $writer = new TransactionalWriter($connection, $stream); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + /** @var WriteResult $writeResult */ + $writeResult = yield $ongoingTransaction->commitAsync(); + + $this->assertSame(1, $writeResult->nextExpectedVersion()); + + $events[] = TestEvent::new(); + + /** @var OngoingTransaction $ongoingTransaction */ + $ongoingTransaction = yield $writer->startTransaction(-1); + $ongoingTransaction = yield $ongoingTransaction->writeAsync($events); + + $this->expectException(WrongExpectedVersionException::class); + + yield $ongoingTransaction->commitAsync(); + })); + } +} diff --git a/tests/not_connected_tests.php b/tests/not_connected_tests.php index bd234bad..8a91a234 100644 --- a/tests/not_connected_tests.php +++ b/tests/not_connected_tests.php @@ -16,7 +16,7 @@ use Amp\TimeoutException; use PHPUnit\Framework\TestCase; use Prooph\EventStoreClient\ConnectionSettingsBuilder; -use Prooph\EventStoreClient\EventStoreConnectionBuilder; +use Prooph\EventStoreClient\EventStoreAsyncConnectionBuilder; use Prooph\EventStoreClient\IpEndPoint; use function Amp\call; use function Amp\Promise\timeout; @@ -40,7 +40,7 @@ public function should_timeout_connection_after_configured_amount_time_on_conenc $ip = '8.8.8.8'; //NOTE: This relies on Google DNS server being configured to swallow nonsense traffic $port = 4567; - $connection = EventStoreConnectionBuilder::createAsyncFromIpEndPoint( + $connection = EventStoreAsyncConnectionBuilder::createFromIpEndPoint( new IpEndPoint($ip, $port), $settingsBuilder->build(), 'test-connection'