From 5e2e4fbbae7bcb8595af265732cb5dcbb8650aaa Mon Sep 17 00:00:00 2001 From: prolic Date: Sun, 11 Nov 2018 21:42:22 +0800 Subject: [PATCH 1/2] change factory methods - EventStoreAsyncConnectionFactory - EventStoreSyncConnectionFactory - Allow connecting via connection string - Allow connection via URI resolves https://github.com/prooph/event-store-client/issues/27 --- examples/demo-async-cluster.php | 1 - examples/demo-async-dns-cluster.php | 1 - examples/demo-async-with-logger.php | 2 +- examples/demo-async.php | 2 +- examples/demo-subscription-with-logger.php | 2 +- examples/demo-subscription1.php | 2 +- examples/demo-subscription2.php | 2 +- examples/demo-subscription3.php | 2 +- examples/demo-subscription4.php | 2 +- examples/demo-subscription5.php | 2 +- examples/demo-sync.php | 2 +- src/ClusterSettings.php | 56 ++++- src/ConnectionSettings.php | 10 +- src/ConnectionString.php | 167 +++++++++++++ src/EventStoreAsyncConnectionFactory.php | 271 +++++++++------------ src/EventStoreSyncConnectionFactory.php | 271 +++++++++------------ src/Internal/SingleEndpointDiscoverer.php | 20 +- src/Uri.php | 102 ++++++++ tests/Helper/TestConnection.php | 4 +- tests/connect.php | 16 +- tests/connection_string.php | 71 ++++++ tests/not_connected_tests.php | 2 +- 22 files changed, 651 insertions(+), 359 deletions(-) create mode 100644 src/ConnectionString.php create mode 100644 src/Uri.php create mode 100644 tests/connection_string.php diff --git a/examples/demo-async-cluster.php b/examples/demo-async-cluster.php index 7cc2394f..4980ebe5 100644 --- a/examples/demo-async-cluster.php +++ b/examples/demo-async-cluster.php @@ -26,7 +26,6 @@ ]); $connection = EventStoreAsyncConnectionFactory::createFromSettings( - null, $builder->build(), 'cluster-connection' ); diff --git a/examples/demo-async-dns-cluster.php b/examples/demo-async-dns-cluster.php index 2f0672d6..15432aad 100644 --- a/examples/demo-async-dns-cluster.php +++ b/examples/demo-async-dns-cluster.php @@ -23,7 +23,6 @@ $builder->setClusterGossipPort(2113); $connection = EventStoreAsyncConnectionFactory::createFromSettings( - null, $builder->build(), 'dns-cluster-connection' ); diff --git a/examples/demo-async-with-logger.php b/examples/demo-async-with-logger.php index 43350b0b..0eb15541 100644 --- a/examples/demo-async-with-logger.php +++ b/examples/demo-async-with-logger.php @@ -22,7 +22,7 @@ $builder->enableVerboseLogging(); $builder->useConsoleLogger(); - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113), $builder->build() ); diff --git a/examples/demo-async.php b/examples/demo-async.php index 4b077a8f..a5b44683 100644 --- a/examples/demo-async.php +++ b/examples/demo-async.php @@ -18,7 +18,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription-with-logger.php b/examples/demo-subscription-with-logger.php index 7b585981..f7aa74ee 100644 --- a/examples/demo-subscription-with-logger.php +++ b/examples/demo-subscription-with-logger.php @@ -27,7 +27,7 @@ $builder->enableVerboseLogging(); $builder->useConsoleLogger(); - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113), $builder->build() ); diff --git a/examples/demo-subscription1.php b/examples/demo-subscription1.php index 13e6352d..7e2e0018 100644 --- a/examples/demo-subscription1.php +++ b/examples/demo-subscription1.php @@ -23,7 +23,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription2.php b/examples/demo-subscription2.php index 5e768f76..8e4f45dd 100644 --- a/examples/demo-subscription2.php +++ b/examples/demo-subscription2.php @@ -23,7 +23,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription3.php b/examples/demo-subscription3.php index e7a74ee8..711ed359 100644 --- a/examples/demo-subscription3.php +++ b/examples/demo-subscription3.php @@ -23,7 +23,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription4.php b/examples/demo-subscription4.php index 3fad8b6d..0b39525d 100644 --- a/examples/demo-subscription4.php +++ b/examples/demo-subscription4.php @@ -23,7 +23,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-subscription5.php b/examples/demo-subscription5.php index 01e83538..d45a92a4 100644 --- a/examples/demo-subscription5.php +++ b/examples/demo-subscription5.php @@ -25,7 +25,7 @@ require __DIR__ . '/../vendor/autoload.php'; Loop::run(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/examples/demo-sync.php b/examples/demo-sync.php index ba378902..142fff98 100644 --- a/examples/demo-sync.php +++ b/examples/demo-sync.php @@ -15,7 +15,7 @@ require __DIR__ . '/../vendor/autoload.php'; -$connection = EventStoreSyncConnectionFactory::createFromSettingsWithEndPoint( +$connection = EventStoreSyncConnectionFactory::createFromEndPoint( new EndPoint('eventstore', 1113) ); diff --git a/src/ClusterSettings.php b/src/ClusterSettings.php index 20bc7414..e50f3b14 100644 --- a/src/ClusterSettings.php +++ b/src/ClusterSettings.php @@ -13,41 +13,71 @@ namespace Prooph\EventStoreClient; +use Prooph\EventStoreClient\Exception\InvalidArgumentException; + /** * All times are milliseconds */ class ClusterSettings { - /** @var string */ + /** @var string|null */ private $clusterDns; /** @var int */ private $maxDiscoverAttempts; /** @var int */ private $externalGossipPort; - /** @var GossipSeed[] */ + /** @var GossipSeed[]|null */ private $gossipSeeds; /** @var int */ private $gossipTimeout; /** @var bool */ private $preferRandomNode; - public function __construct( + public static function fromGossipSeeds( + array $gossipSeeds, + int $maxDiscoverAttempts, + int $gossipTimeout, + bool $preferRandomNode + ): self { + $clusterSettings = new self(); + + foreach ($gossipSeeds as $gossipSeed) { + if (! $gossipSeed instanceof GossipSeed) { + throw new InvalidArgumentException(\sprintf( + 'Expected an array of %s', + GossipSeed::class + )); + } + + $clusterSettings->gossipSeeds[] = $gossipSeed; + } + + $clusterSettings->maxDiscoverAttempts = $maxDiscoverAttempts; + $clusterSettings->gossipTimeout = $gossipTimeout; + $clusterSettings->preferRandomNode = $preferRandomNode; + + return $clusterSettings; + } + + public static function fromClusterDns( string $clusterDns, int $maxDiscoverAttempts, int $externalGossipPort, - array $gossipSeeds, int $gossipTimeout, bool $preferRandomNode - ) { - $this->clusterDns = $clusterDns; - $this->maxDiscoverAttempts = $maxDiscoverAttempts; - $this->externalGossipPort = $externalGossipPort; - $this->gossipSeeds = $gossipSeeds; - $this->gossipTimeout = $gossipTimeout; - $this->preferRandomNode = $preferRandomNode; + ): self { + $clusterSettings = new self(); + + $clusterSettings->clusterDns = $clusterDns; + $clusterSettings->maxDiscoverAttempts = $maxDiscoverAttempts; + $clusterSettings->externalGossipPort = $externalGossipPort; + $clusterSettings->gossipTimeout = $gossipTimeout; + $clusterSettings->preferRandomNode = $preferRandomNode; + + return $clusterSettings; } - public function clusterDns(): string + public function clusterDns(): ?string { return $this->clusterDns; } @@ -62,7 +92,7 @@ public function externalGossipPort(): int return $this->externalGossipPort; } - public function gossipSeeds(): array + public function gossipSeeds(): ?array { return $this->gossipSeeds; } diff --git a/src/ConnectionSettings.php b/src/ConnectionSettings.php index 7518bb6b..f104c73b 100644 --- a/src/ConnectionSettings.php +++ b/src/ConnectionSettings.php @@ -62,7 +62,7 @@ class ConnectionSettings /** @var int */ private $externalGossipPort; /** @var GossipSeed[] */ - private $gossipSeeds; + private $gossipSeeds = []; /** @var int */ private $gossipTimeout; /** @var bool */ @@ -256,4 +256,12 @@ public function clientConnectionTimeout(): int { return $this->clientConnectionTimeout; } + + public function withDefaultCredentials(UserCredentials $userCredentials): self + { + $clone = clone $this; + $clone->defaultUserCredentials = $userCredentials; + + return $clone; + } } diff --git a/src/ConnectionString.php b/src/ConnectionString.php new file mode 100644 index 00000000..de57e692 --- /dev/null +++ b/src/ConnectionString.php @@ -0,0 +1,167 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStoreClient; + +use Prooph\EventStoreClient\Exception\InvalidArgumentException; +use ReflectionObject; + +class ConnectionString +{ + private static $allowedValues = [ + 'verboselogging' => 'bool', + 'maxqueuesize' => 'int', + 'maxconcurrentitems' => 'int', + 'maxretries' => 'int', + 'maxreconnections' => 'int', + 'requiremaster' => 'bool', + 'reconnectiondelay' => 'int', + 'operationtimeout' => 'int', + 'operationtimeoutcheckperiod' => 'int', + 'defaultusercredentials' => UserCredentials::class, + 'usesslconnection' => 'bool', + 'targethost' => 'string', + 'validateserver' => 'bool', + 'failonnoserverresponse' => 'bool', + 'heartbeatinterval' => 'int', + 'heartbeattimeout' => 'int', + 'clusterdns' => 'string', + 'maxdiscoverattempts' => 'int', + 'externalgossipport' => 'int', + 'gossipseeds' => GossipSeed::class, + 'gossiptimeout' => 'int', + 'preferrandomNode' => 'bool', + 'clientconnectiontimeout' => 'int', + ]; + + public static function getConnectionSettings( + string $connectionString, + ?ConnectionSettings $settings = null + ): ConnectionSettings { + $settings = $settings ?? ConnectionSettings::default(); + $reflection = new ReflectionObject($settings); + $properties = $reflection->getProperties(); + $values = \explode(';', $connectionString); + + foreach ($values as $value) { + list($key, $value) = \explode('=', $value); + $key = \strtolower($key); + + if (! \array_key_exists($key, self::$allowedValues)) { + throw new InvalidArgumentException(\sprintf( + 'Key %s is not an allowed key in %s', + $key, + __CLASS__ + )); + } + + $type = self::$allowedValues[$key]; + + switch ($type) { + case 'bool': + $filteredValue = \filter_var($value, \FILTER_VALIDATE_BOOLEAN); + break; + case 'int': + $filteredValue = \filter_var($value, \FILTER_VALIDATE_INT); + + if (false === $filteredValue) { + throw new InvalidArgumentException(\sprintf( + 'Expected type for key %s is %s, but %s given', + $key, + $type, + $value + )); + } + break; + case 'string': + $filteredValue = $value; + break; + case UserCredentials::class: + $exploded = \explode(':', $value); + + if (\count($exploded) !== 2) { + throw new InvalidArgumentException(\sprintf( + 'Expected user credentials in format user:pass, %s given', + $value + )); + } + + $filteredValue = new UserCredentials($exploded[0], $exploded[1]); + break; + case GossipSeed::class: + $gossipSeeds = []; + + foreach (\explode(',', $value) as $v) { + $exploded = \explode(':', $v); + + if (\count($exploded) !== 2) { + throw new InvalidArgumentException(\sprintf( + 'Expected user credentials in format user:pass, %s given', + $value + )); + } + + $host = $exploded[0]; + $port = \filter_var($exploded[1], \FILTER_VALIDATE_INT); + + if (false === $port) { + throw new InvalidArgumentException(\sprintf( + 'Expected type for port of gossip seed is int, but %s given', + $exploded[1] + )); + } + + $gossipSeeds[] = new GossipSeed(new EndPoint($host, $port)); + } + + if (empty($gossipSeeds)) { + throw new InvalidArgumentException(\sprintf( + 'No gossip seeds specified in connection string' + )); + } + + $filteredValue = $gossipSeeds; + break; + } + + foreach ($properties as $property) { + if (\strtolower($property->getName()) === $key) { + $property->setAccessible(true); + $property->setValue($settings, $filteredValue); + break; + } + } + } + + return $settings; + } + + public static function getUriFromConnectionString(string $connectionString): ?Uri + { + $values = \explode(';', $connectionString); + + if (1 === \count($values)) { + return new Uri($connectionString); + } + + foreach ($values as $value) { + list($key, $value) = \explode('=', $value); + + if (\strtolower($key) === 'connectto') { + return new Uri($value); + } + } + + return null; + } +} diff --git a/src/EventStoreAsyncConnectionFactory.php b/src/EventStoreAsyncConnectionFactory.php index cb708b17..0612b699 100644 --- a/src/EventStoreAsyncConnectionFactory.php +++ b/src/EventStoreAsyncConnectionFactory.php @@ -22,206 +22,167 @@ class EventStoreAsyncConnectionFactory { - /** - * Sub-delimiters used in user info, query strings and fragments. - * @const string - */ - private const CHAR_SUB_DELIMS = '!\$&\'\(\)\*\+,;='; - /** - * Unreserved characters used in user info, paths, query strings, and fragments. - * @const string - */ - private const CHAR_UNRESERVED = 'a-zA-Z0-9_\-\.~\pL'; - - private const TCP_PORT_DEFAULT = 1113; - - /** @throws \Exception */ - public static function createFromBuilder( - ?string $connectionString = null, - ?ConnectionSettingsBuilder $builder = null, + public static function createFromConnectionString( + string $connectionString, + ConnectionSettings $settings = null, string $connectionName = '' ): AsyncConnection { - $builder = $builder ?? new ConnectionSettingsBuilder(); + $settings = ConnectionString::getConnectionSettings( + $connectionString, + $settings ?? ConnectionSettings::default() + ); - return self::createFromSettings($connectionString, $builder->build(), $connectionName); - } + $uri = ConnectionString::getUriFromConnectionString($connectionString); - /** @throws \Exception */ - public static function createFromSettings( - ?string $connectionString = null, - ?ConnectionSettings $settings = null, - string $connectionName = '' - ): AsyncConnection { - if (null === $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { - throw new \Exception('Did not find ConnectTo, ClusterDNS or GossipSeeds in the connection string'); + if (null === $uri && empty($settings->gossipSeeds())) { + throw new InvalidArgumentException( + 'Did not find ConnectTo or GossipSeeds in the connection string' + ); } - if (null !== $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { - throw new \Exception('Setting ConnectTo as well as GossipSeeds and/or ClusterDNS on the connection string is currently not supported'); + if (null !== $uri && ! empty($settings->gossipSeeds())) { + throw new InvalidArgumentException( + 'Setting ConnectTo as well as GossipSeeds on the connection string is currently not supported' + ); } - if (null === $settings) { - $settings = ConnectionSettings::default(); - } + return self::createFromUri($uri, $settings, $connectionName); + } + + public static function createFromUri( + ?Uri $uri, + ?ConnectionSettings $connectionSettings = null, + string $connectionName = '' + ): AsyncConnection { + $connectionSettings = $connectionSettings ?? ConnectionSettings::default(); + + if (null !== $uri) { + $scheme = \strtolower($uri->scheme()); + $credentials = $uri->userCredentials(); - if (null !== $connectionString) { - list($scheme, $host, $port, $user, $pass) = self::parseUri($connectionString); - $credentials = $user ? new UserCredentials($user, $pass) : null; if (null !== $credentials) { - $settings = new ConnectionSettings( - $settings->log(), - $settings->verboseLogging(), - $settings->maxQueueSize(), - $settings->maxConcurrentItems(), - $settings->maxRetries(), - $settings->maxReconnections(), - $settings->requireMaster(), - $settings->reconnectionDelay(), - $settings->operationTimeout(), - $settings->operationTimeoutCheckPeriod(), - $credentials, - $settings->useSslConnection(), - $settings->targetHost(), - $settings->validateServer(), - $settings->failOnNoServerResponse(), - $settings->heartbeatInterval(), - $settings->heartbeatTimeout(), - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode(), - $settings->clientConnectionTimeout() - ); + $connectionSettings = $connectionSettings->withDefaultCredentials($credentials); } - if ($scheme === 'discover') { - return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); + if ('discover' === $scheme) { + $clusterSettings = ClusterSettings::fromClusterDns( + $uri->host(), + $connectionSettings->maxDiscoverAttempts(), + $uri->port(), + $connectionSettings->gossipTimeout(), + $connectionSettings->preferRandomNode() + ); + + $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() + ); + + return new EventStoreAsyncNodeConnection( + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, + $connectionName + ); } - if ($scheme === 'tcp') { - return self::createWithSingleEndpointDiscoverer( - $connectionString, - $settings, + if ('tcp' === $scheme) { + return new EventStoreAsyncNodeConnection( + $connectionSettings, + null, + new SingleEndpointDiscoverer( + $uri, + $connectionSettings->useSslConnection() + ), $connectionName ); } - throw new \Exception('Unknown scheme for connection'); + throw new InvalidArgumentException(\sprintf( + 'Unknown scheme for connection \'%s\'', + $scheme + )); } - if (! empty($settings->gossipSeeds()) || ! empty($settings->clusterDns())) { - return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); - } + if (! empty($connectionSettings->gossipSeeds())) { + $clusterSettings = ClusterSettings::fromGossipSeeds( + $connectionSettings->gossipSeeds(), + $connectionSettings->maxDiscoverAttempts(), + $connectionSettings->gossipTimeout(), + $connectionSettings->preferRandomNode() + ); - throw new \Exception('Must specify uri, ClusterDNS or gossip seeds'); - } + $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() + ); - public static function createFromBuilderWithIpEndPoint( - EndPoint $endPoint, - ?ConnectionSettingsBuilder $builder = null, - ?string $connectionName = null - ): AsyncConnection { - $builder = $builder ?? new ConnectionSettingsBuilder(); + return new EventStoreAsyncNodeConnection( + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, + $connectionName + ); + } - return self::createFromSettingsWithEndPoint($endPoint, $builder->build(), $connectionName); + throw new InvalidArgumentException('Must specify uri or gossip seeds'); } - public static function createFromSettingsWithEndPoint( + public static function createFromEndPoint( EndPoint $endPoint, ?ConnectionSettings $settings = null, - ?string $connectionName = null + string $connectionName = '' ): AsyncConnection { $settings = $settings ?? ConnectionSettings::default(); return new EventStoreAsyncNodeConnection( $settings, null, - new StaticEndPointDiscoverer($endPoint, $settings->useSslConnection()), + new StaticEndPointDiscoverer( + $endPoint, + $settings->useSslConnection() + ), $connectionName ); } - private static function createWithClusterDnsEndPointDiscoverer( + public static function createFromSettings( ConnectionSettings $settings, - ?string $connectionName = null + string $connectionName = '' ): AsyncConnection { - $clusterSettings = new ClusterSettings( - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode() - ); + return self::createFromUri(null, $settings, $connectionName); + } + public static function createFromClusterSettings( + ConnectionSettings $connectionSettings, + ClusterSettings $clusterSettings, + string $connectionName = '' + ): AsyncConnection { $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( - $settings->log(), - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode() + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() ); - return new EventStoreAsyncNodeConnection($settings, $clusterSettings, $endPointDiscoverer, $connectionName); - } - - private static function createWithSingleEndpointDiscoverer( - string $connectionString, - ConnectionSettings $settings, - ?string $connectionName = null - ): AsyncConnection { return new EventStoreAsyncNodeConnection( - $settings, - null, - new SingleEndpointDiscoverer($connectionString, $settings->useSslConnection()), + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, $connectionName ); } - - private static function parseUri(string $connectionString): array - { - $parts = \parse_url($connectionString); - - if (false === $parts) { - throw new InvalidArgumentException( - 'The source URI string appears to be malformed' - ); - } - - $scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; - $host = isset($parts['host']) ? \strtolower($parts['host']) : ''; - $port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; - $user = isset($parts['user']) ? self::filterUserInfoPart($parts['user']) : ''; - $pass = $parts['pass'] ?? ''; - - return [ - $scheme, - $host, - $port, - $user, - $pass, - ]; - } - - private static function filterScheme(string $scheme): string - { - return \preg_replace('#:(//)?$#', '', \strtolower($scheme)); - } - - private static function filterUserInfoPart(string $part): string - { - // Note the addition of `%` to initial charset; this allows `|` portion - // to match and thus prevent double-encoding. - return \preg_replace_callback( - '/(?:[^%' . self::CHAR_UNRESERVED . self::CHAR_SUB_DELIMS . ']+|%(?![A-Fa-f0-9]{2}))/u', - function (array $matches): string { - return \rawurlencode($matches[0]); - }, - $part - ); - } } diff --git a/src/EventStoreSyncConnectionFactory.php b/src/EventStoreSyncConnectionFactory.php index 13ffe43a..bcff9918 100644 --- a/src/EventStoreSyncConnectionFactory.php +++ b/src/EventStoreSyncConnectionFactory.php @@ -22,206 +22,167 @@ class EventStoreSyncConnectionFactory { - /** - * Sub-delimiters used in user info, query strings and fragments. - * @const string - */ - private const CHAR_SUB_DELIMS = '!\$&\'\(\)\*\+,;='; - /** - * Unreserved characters used in user info, paths, query strings, and fragments. - * @const string - */ - private const CHAR_UNRESERVED = 'a-zA-Z0-9_\-\.~\pL'; - - private const TCP_PORT_DEFAULT = 1113; - - /** @throws \Exception */ - public static function createFromBuilder( - ?string $connectionString = null, - ?ConnectionSettingsBuilder $builder = null, + public static function createFromConnectionString( + string $connectionString, + ConnectionSettings $settings = null, string $connectionName = '' ): SyncConnection { - $builder = $builder ?? new ConnectionSettingsBuilder(); + $settings = ConnectionString::getConnectionSettings( + $connectionString, + $settings ?? ConnectionSettings::default() + ); - return self::createFromSettings($connectionString, $builder->build(), $connectionName); - } + $uri = ConnectionString::getUriFromConnectionString($connectionString); - /** @throws \Exception */ - public static function createFromSettings( - ?string $connectionString = null, - ?ConnectionSettings $settings = null, - string $connectionName = '' - ): SyncConnection { - if (null === $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { - throw new \Exception('Did not find ConnectTo, ClusterDNS or GossipSeeds in the connection string'); + if (null === $uri && empty($settings->gossipSeeds())) { + throw new InvalidArgumentException( + 'Did not find ConnectTo or GossipSeeds in the connection string' + ); } - if (null !== $connectionString && (null === $settings || (empty($settings->gossipSeeds()) && empty($settings->clusterDns())))) { - throw new \Exception('Setting ConnectTo as well as GossipSeeds and/or ClusterDNS on the connection string is currently not supported'); + if (null !== $uri && ! empty($settings->gossipSeeds())) { + throw new InvalidArgumentException( + 'Setting ConnectTo as well as GossipSeeds on the connection string is currently not supported' + ); } - if (null === $settings) { - $settings = ConnectionSettings::default(); - } + return self::createFromUri($uri, $settings, $connectionName); + } + + public static function createFromUri( + ?Uri $uri, + ?ConnectionSettings $connectionSettings = null, + string $connectionName = '' + ): SyncConnection { + $connectionSettings = $connectionSettings ?? ConnectionSettings::default(); + + if (null !== $uri) { + $scheme = \strtolower($uri->scheme()); + $credentials = $uri->userCredentials(); - if (null !== $connectionString) { - list($scheme, $host, $port, $user, $pass) = self::parseUri($connectionString); - $credentials = $user ? new UserCredentials($user, $pass) : null; if (null !== $credentials) { - $settings = new ConnectionSettings( - $settings->log(), - $settings->verboseLogging(), - $settings->maxQueueSize(), - $settings->maxConcurrentItems(), - $settings->maxRetries(), - $settings->maxReconnections(), - $settings->requireMaster(), - $settings->reconnectionDelay(), - $settings->operationTimeout(), - $settings->operationTimeoutCheckPeriod(), - $credentials, - $settings->useSslConnection(), - $settings->targetHost(), - $settings->validateServer(), - $settings->failOnNoServerResponse(), - $settings->heartbeatInterval(), - $settings->heartbeatTimeout(), - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode(), - $settings->clientConnectionTimeout() - ); + $connectionSettings = $connectionSettings->withDefaultCredentials($credentials); } - if ($scheme === 'discover') { - return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); + if ('discover' === $scheme) { + $clusterSettings = ClusterSettings::fromClusterDns( + $uri->host(), + $connectionSettings->maxDiscoverAttempts(), + $uri->port(), + $connectionSettings->gossipTimeout(), + $connectionSettings->preferRandomNode() + ); + + $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() + ); + + return new EventStoreSyncNodeConnection( + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, + $connectionName + ); } - if ($scheme === 'tcp') { - return self::createWithSingleEndpointDiscoverer( - $connectionString, - $settings, + if ('tcp' === $scheme) { + return new EventStoreSyncNodeConnection( + $connectionSettings, + null, + new SingleEndpointDiscoverer( + $uri, + $connectionSettings->useSslConnection() + ), $connectionName ); } - throw new \Exception('Unknown scheme for connection'); + throw new InvalidArgumentException(\sprintf( + 'Unknown scheme for connection \'%s\'', + $scheme + )); } - if (! empty($settings->gossipSeeds()) || ! empty($settings->clusterDns())) { - return self::createWithClusterDnsEndPointDiscoverer($settings, $connectionName); - } + if (! empty($connectionSettings->gossipSeeds())) { + $clusterSettings = ClusterSettings::fromGossipSeeds( + $connectionSettings->gossipSeeds(), + $connectionSettings->maxDiscoverAttempts(), + $connectionSettings->gossipTimeout(), + $connectionSettings->preferRandomNode() + ); - throw new \Exception('Must specify uri, ClusterDNS or gossip seeds'); - } + $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() + ); - public static function createFromBuilderWithEndPoint( - EndPoint $endPoint, - ?ConnectionSettingsBuilder $builder = null, - ?string $connectionName = null - ): SyncConnection { - $builder = $builder ?? new ConnectionSettingsBuilder(); + return new EventStoreSyncNodeConnection( + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, + $connectionName + ); + } - return self::createFromSettingsWithEndPoint($endPoint, $builder->build(), $connectionName); + throw new InvalidArgumentException('Must specify uri or gossip seeds'); } - public static function createFromSettingsWithEndPoint( + public static function createFromEndPoint( EndPoint $endPoint, ?ConnectionSettings $settings = null, - ?string $connectionName = null + string $connectionName = '' ): SyncConnection { $settings = $settings ?? ConnectionSettings::default(); return new EventStoreSyncNodeConnection( $settings, null, - new StaticEndPointDiscoverer($endPoint, $settings->useSslConnection()), + new StaticEndPointDiscoverer( + $endPoint, + $settings->useSslConnection() + ), $connectionName ); } - private static function createWithClusterDnsEndPointDiscoverer( + public static function createFromSettings( ConnectionSettings $settings, - ?string $connectionName = null + string $connectionName = '' ): SyncConnection { - $clusterSettings = new ClusterSettings( - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode() - ); + return self::createFromUri(null, $settings, $connectionName); + } + public static function createFromClusterSettings( + ConnectionSettings $connectionSettings, + ClusterSettings $clusterSettings, + string $connectionName = '' + ): SyncConnection { $endPointDiscoverer = new ClusterDnsEndPointDiscoverer( - $settings->log(), - $settings->clusterDns(), - $settings->maxDiscoverAttempts(), - $settings->externalGossipPort(), - $settings->gossipSeeds(), - $settings->gossipTimeout(), - $settings->preferRandomNode() + $connectionSettings->log(), + $clusterSettings->clusterDns(), + $clusterSettings->maxDiscoverAttempts(), + $clusterSettings->externalGossipPort(), + $clusterSettings->gossipSeeds(), + $clusterSettings->gossipTimeout(), + $clusterSettings->preferRandomNode() ); - return new EventStoreSyncNodeConnection($settings, $clusterSettings, $endPointDiscoverer, $connectionName); - } - - private static function createWithSingleEndpointDiscoverer( - string $connectionString, - ConnectionSettings $settings, - ?string $connectionName = null - ): SyncConnection { return new EventStoreSyncNodeConnection( - $settings, - null, - new SingleEndpointDiscoverer($connectionString, $settings->useSslConnection()), + $connectionSettings, + $clusterSettings, + $endPointDiscoverer, $connectionName ); } - - private static function parseUri(string $connectionString): array - { - $parts = \parse_url($connectionString); - - if (false === $parts) { - throw new InvalidArgumentException( - 'The source URI string appears to be malformed' - ); - } - - $scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; - $host = isset($parts['host']) ? \strtolower($parts['host']) : ''; - $port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; - $user = isset($parts['user']) ? self::filterUserInfoPart($parts['user']) : ''; - $pass = $parts['pass'] ?? ''; - - return [ - $scheme, - $host, - $port, - $user, - $pass, - ]; - } - - private static function filterScheme(string $scheme): string - { - return \preg_replace('#:(//)?$#', '', \strtolower($scheme)); - } - - private static function filterUserInfoPart(string $part): string - { - // Note the addition of `%` to initial charset; this allows `|` portion - // to match and thus prevent double-encoding. - return \preg_replace_callback( - '/(?:[^%' . self::CHAR_UNRESERVED . self::CHAR_SUB_DELIMS . ']+|%(?![A-Fa-f0-9]{2}))/u', - function (array $matches): string { - return \rawurlencode($matches[0]); - }, - $part - ); - } } diff --git a/src/Internal/SingleEndpointDiscoverer.php b/src/Internal/SingleEndpointDiscoverer.php index 97cd6c1a..a70c95c0 100644 --- a/src/Internal/SingleEndpointDiscoverer.php +++ b/src/Internal/SingleEndpointDiscoverer.php @@ -13,36 +13,28 @@ namespace Prooph\EventStoreClient\Internal; -use Amp\Failure; use Amp\Promise; use Amp\Success; -use Amp\Uri\InvalidUriException; -use Amp\Uri\Uri; use Prooph\EventStoreClient\EndPoint; +use Prooph\EventStoreClient\Uri; /** @internal */ final class SingleEndpointDiscoverer implements EndPointDiscoverer { - /** @var string */ - private $connectionString; + /** @var Uri */ + private $uri; /** @var bool */ private $useSslConnection; - public function __construct(string $connectionString, bool $useSslConnection) + public function __construct(Uri $uri, bool $useSslConnection) { - $this->connectionString = $connectionString; + $this->uri = $uri; $this->useSslConnection = $useSslConnection; } public function discoverAsync(?EndPoint $failedTcpEndPoint): Promise { - try { - $uri = new Uri($this->connectionString); - } catch (InvalidUriException $e) { - return new Failure($e); - } - - $endPoint = new EndPoint($uri->getHost(), $uri->getPort()); + $endPoint = new EndPoint($this->uri->host(), $this->uri->port()); return new Success( new NodeEndPoints( diff --git a/src/Uri.php b/src/Uri.php new file mode 100644 index 00000000..5dcaf416 --- /dev/null +++ b/src/Uri.php @@ -0,0 +1,102 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\EventStoreClient; + +use Prooph\EventStoreClient\Exception\InvalidArgumentException; + +class Uri +{ + /** + * Sub-delimiters used in user info, query strings and fragments. + * @const string + */ + private const CHAR_SUB_DELIMS = '!\$&\'\(\)\*\+,;='; + + /** + * Unreserved characters used in user info, paths, query strings, and fragments. + * @const string + */ + private const CHAR_UNRESERVED = 'a-zA-Z0-9_\-\.~\pL'; + + private const TCP_PORT_DEFAULT = 1113; + + /** @var string */ + private $scheme; + /** @var UserCredentials|null */ + private $userCredentials; + /** @var string */ + private $host; + /** @var int */ + private $port; + + public function __construct(string $uri) + { + $parts = \parse_url($uri); + + if (false === $parts) { + throw new InvalidArgumentException( + 'The source URI string appears to be malformed' + ); + } + + $this->scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; + $this->host = isset($parts['host']) ? \strtolower($parts['host']) : ''; + $this->port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; + + if (isset($parts['user'])) { + $user = self::filterUserInfoPart($parts['user']); + $pass = $pass['pass'] ?? ''; + + $this->userCredentials = new UserCredentials($user, $pass); + } + } + + public function scheme(): string + { + return $this->scheme; + } + + public function userCredentials(): ?UserCredentials + { + return $this->userCredentials; + } + + public function host(): string + { + return $this->host; + } + + public function port(): int + { + return $this->port; + } + + private static function filterScheme(string $scheme): string + { + return \preg_replace('#:(//)?$#', '', \strtolower($scheme)); + } + + private static function filterUserInfoPart(string $part): string + { + // Note the addition of `%` to initial charset; this allows `|` portion + // to match and thus prevent double-encoding. + return \preg_replace_callback( + '/(?:[^%' . self::CHAR_UNRESERVED . self::CHAR_SUB_DELIMS . ']+|%(?![A-Fa-f0-9]{2}))/u', + function (array $matches): string { + return \rawurlencode($matches[0]); + }, + $part + ); + } +} diff --git a/tests/Helper/TestConnection.php b/tests/Helper/TestConnection.php index fdf3f3d5..a95a1c9e 100644 --- a/tests/Helper/TestConnection.php +++ b/tests/Helper/TestConnection.php @@ -29,7 +29,7 @@ public static function createAsync(?UserCredentials $userCredentials = null): Ev { self::checkRequiredEnvironmentSettings(); - return EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + return EventStoreAsyncConnectionFactory::createFromEndPoint( self::endPoint(), self::settings($userCredentials) ); @@ -39,7 +39,7 @@ public static function createSync(?UserCredentials $userCredentials = null): Eve { self::checkRequiredEnvironmentSettings(); - return EventStoreSyncConnectionFactory::createFromSettingsWithEndPoint( + return EventStoreSyncConnectionFactory::createFromEndPoint( self::endPoint(), self::settings($userCredentials) ); diff --git a/tests/connect.php b/tests/connect.php index eeec6e9e..0edb3752 100644 --- a/tests/connect.php +++ b/tests/connect.php @@ -48,7 +48,7 @@ protected function setUp(): void public function should_not_throw_exception_when_server_is_down(): void { wait(call(function () { - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( $this->blackhole ); @@ -70,11 +70,12 @@ public function should_throw_exception_when_trying_to_reopen_closed_connection() ->limitReconnectionsTo(0) ->withConnectionTimeoutOf(10000) ->setReconnectionDelayTo(0) - ->failOnNoServerResponse(); + ->failOnNoServerResponse() + ->build(); - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( $this->blackhole, - $settings->build() + $settings ); $connection->onClosed(function () use ($closed) { @@ -107,11 +108,12 @@ public function should_close_connection_after_configured_amount_of_failed_reconn ->limitReconnectionsTo(0) ->withConnectionTimeoutOf(10000) ->setReconnectionDelayTo(0) - ->failOnNoServerResponse(); + ->failOnNoServerResponse() + ->build(); - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( $this->blackhole, - $settings->build() + $settings ); $connection->onClosed(function (ClientClosedEventArgs $args) use ($closed) { diff --git a/tests/connection_string.php b/tests/connection_string.php new file mode 100644 index 00000000..34511b88 --- /dev/null +++ b/tests/connection_string.php @@ -0,0 +1,71 @@ + + * (c) 2018-2018 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\EventStoreClient; + +use PHPUnit\Framework\TestCase; +use Prooph\EventStoreClient\ConnectionString; + +class connection_string extends TestCase +{ + /** @test */ + public function can_set_string_value(): void + { + $settings = ConnectionString::getConnectionSettings('targethost=testtest'); + $this->assertEquals('testtest', $settings->targetHost()); + } + + /** @test */ + public function can_set_bool_value_with_string(): void + { + $settings = ConnectionString::getConnectionSettings('verboselogging=true'); + $this->assertTrue($settings->verboseLogging()); + } + + /** @test */ + public function can_set_int(): void + { + $settings = ConnectionString::getConnectionSettings('maxretries=55'); + $this->assertSame(55, $settings->maxRetries()); + } + + /** @test */ + public function can_set_multiple_values(): void + { + $settings = ConnectionString::getConnectionSettings('heartbeattimeout=5555;maxretries=55'); + $this->assertSame(5555, $settings->heartbeatTimeout()); + $this->assertSame(55, $settings->maxRetries()); + } + + /** @test */ + public function can_set_mixed_case(): void + { + $settings = ConnectionString::getConnectionSettings('heArtbeAtTimeout=5555'); + $this->assertSame(5555, $settings->heartbeatTimeout()); + } + + /** @test */ + public function can_set_gossip_seeds(): void + { + $settings = ConnectionString::getConnectionSettings('gossipseeds=111.222.222.111:1111,111.222.222.111:1112,111.222.222.111:1113'); + $this->assertCount(3, $settings->gossipSeeds()); + } + + /** @test */ + public function can_set_user_credentials(): void + { + $settings = ConnectionString::getConnectionSettings('DefaultUserCredentials=foo:bar'); + $this->assertEquals('foo', $settings->defaultUserCredentials()->username()); + $this->assertEquals('bar', $settings->defaultUserCredentials()->password()); + } +} diff --git a/tests/not_connected_tests.php b/tests/not_connected_tests.php index d5bb7cc3..da6759fc 100644 --- a/tests/not_connected_tests.php +++ b/tests/not_connected_tests.php @@ -41,7 +41,7 @@ public function should_timeout_connection_after_configured_amount_time_on_connec $ip = '8.8.8.8'; //NOTE: This relies on Google DNS server being configured to swallow nonsense traffic $port = 4567; - $connection = EventStoreAsyncConnectionFactory::createFromSettingsWithEndPoint( + $connection = EventStoreAsyncConnectionFactory::createFromEndPoint( new EndPoint($ip, $port), $settingsBuilder->build(), 'test-connection' From 4d15d4eb9e69f8f7344ba1ff4d24758e7ed3b5f3 Mon Sep 17 00:00:00 2001 From: prolic Date: Sun, 11 Nov 2018 21:50:42 +0800 Subject: [PATCH 2/2] add another uri constructor --- src/Uri.php | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/Uri.php b/src/Uri.php index 5dcaf416..670a93f7 100644 --- a/src/Uri.php +++ b/src/Uri.php @@ -40,7 +40,19 @@ class Uri /** @var int */ private $port; - public function __construct(string $uri) + public function __construct( + string $scheme, + string $host, + int $port, + ?UserCredentials $userCredentials = null + ) { + $this->scheme = $scheme; + $this->host = $host; + $this->port = $port; + $this->userCredentials = $userCredentials; + } + + public static function fromString(string $uri): self { $parts = \parse_url($uri); @@ -50,16 +62,19 @@ public function __construct(string $uri) ); } - $this->scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; - $this->host = isset($parts['host']) ? \strtolower($parts['host']) : ''; - $this->port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; + $scheme = isset($parts['scheme']) ? self::filterScheme($parts['scheme']) : ''; + $host = isset($parts['host']) ? \strtolower($parts['host']) : ''; + $port = isset($parts['port']) ? (int) $parts['port'] : self::TCP_PORT_DEFAULT; + $userCredentials = null; if (isset($parts['user'])) { $user = self::filterUserInfoPart($parts['user']); $pass = $pass['pass'] ?? ''; - $this->userCredentials = new UserCredentials($user, $pass); + $userCredentials = new UserCredentials($user, $pass); } + + return new self($scheme, $host, $port, $userCredentials); } public function scheme(): string