Skip to content

Commit

Permalink
Merge 76fa634 into e52901e
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Jul 21, 2019
2 parents e52901e + 76fa634 commit 3aef466
Show file tree
Hide file tree
Showing 17 changed files with 99 additions and 63 deletions.
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
},
"require-dev": {
"amphp/log": "^1.0",
"phpspec/prophecy": "^1.7.2",
"phpunit/phpunit": "^8.2.2",
"doctrine/instantiator": "^1.1",
"sebastian/object-enumerator": "^3.0.3",
"php-coveralls/php-coveralls": "^2.1",
"prooph/php-cs-fixer-config": "^0.3"
"phpspec/prophecy": "^1.7.2",
"phpunit/phpunit": "^8.2.2",
"prooph/php-cs-fixer-config": "^0.3",
"sebastian/object-enumerator": "^3.0.3"
},
"suggest": {
"ext/protobuf": "for protobuf PHP extension (pecl)"
Expand Down
6 changes: 3 additions & 3 deletions src/Internal/ClusterDnsEndPointDiscoverer.php
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ private function tryDetermineBestNode(array $members, bool $preferRandomNode): ?

$this->log->info(\sprintf(
'Discovering: found best choice [%s, %s] (%s)',
$normTcp,
null === $secTcp ? 'n/a' : $secTcp->__toString(),
$node->state()
(string) $normTcp,
null === $secTcp ? 'n/a' : (string) $secTcp,
(string) $node->state()
));

return new NodeEndPoints($normTcp, $secTcp);
Expand Down
8 changes: 5 additions & 3 deletions src/Internal/ConnectingPhase.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ final class ConnectingPhase
public const IDENTIFICATION = 5;
public const CONNECTED = 6;

/** @var string */
private $name;
/** @var int */
private $value;

private function __construct(string $name)
Expand Down Expand Up @@ -87,7 +89,7 @@ public static function fromName(string $value): self
return self::{$value}();
}

public static function fromValue($value): self
public static function fromValue(int $value): self
{
foreach (self::OPTIONS as $name => $v) {
if ($v === $value) {
Expand All @@ -100,15 +102,15 @@ public static function fromValue($value): self

public function equals(ConnectingPhase $other): bool
{
return \get_class($this) === \get_class($other) && $this->name === $other->name;
return $this->name === $other->name;
}

public function name(): string
{
return $this->name;
}

public function value()
public function value(): int
{
return $this->value;
}
Expand Down
8 changes: 5 additions & 3 deletions src/Internal/ConnectionState.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ final class ConnectionState
public const CONNECTED = 2;
public const CLOSED = 3;

/** @var string */
private $name;
/** @var int */
private $value;

private function __construct(string $name)
Expand Down Expand Up @@ -66,7 +68,7 @@ public static function fromName(string $value): self
return self::{$value}();
}

public static function fromValue($value): self
public static function fromValue(int $value): self
{
foreach (self::OPTIONS as $name => $v) {
if ($v === $value) {
Expand All @@ -79,15 +81,15 @@ public static function fromValue($value): self

public function equals(ConnectionState $other): bool
{
return \get_class($this) === \get_class($other) && $this->name === $other->name;
return $this->name === $other->name;
}

public function name(): string
{
return $this->name;
}

public function value()
public function value(): int
{
return $this->value;
}
Expand Down
38 changes: 23 additions & 15 deletions src/Internal/EventStoreConnectionLogicHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public function totalOperationCount(): int

public function enqueueMessage(Message $message): void
{
$this->logDebug(\sprintf('enqueing message %s', $message));
$this->logDebug(\sprintf('enqueing message %s', (string) $message));

$this->handler->handle($message);
}
Expand Down Expand Up @@ -306,7 +306,7 @@ private function establishTcpConnection(NodeEndPoints $endPoints): void
return;
}

$this->logDebug('EstablishTcpConnection to [%s]', $endPoint);
$this->logDebug('EstablishTcpConnection to [%s]', (string) $endPoint);

if (! $this->state->equals(ConnectionState::connecting())
|| ! $this->connectingPhase->equals(ConnectingPhase::endPointDiscovery())
Expand Down Expand Up @@ -400,7 +400,11 @@ private function tcpConnectionClosed(TcpPackageConnection $connection): void
$this->state = ConnectionState::connecting();
$this->connectingPhase = ConnectingPhase::reconnecting();

$this->logDebug('TCP connection to [%s, %s] closed', $connection->remoteEndPoint(), $connection->connectionId());
$this->logDebug(
'TCP connection to [%s, %s] closed',
(string) $connection->remoteEndPoint(),
$connection->connectionId()
);

$this->subscriptions->purgeSubscribedAndDroppedSubscriptions($this->connection->connectionId());

Expand Down Expand Up @@ -433,7 +437,11 @@ private function tcpConnectionEstablished(TcpPackageConnection $connection): voi
return;
}

$this->logDebug('TCP connection to [%s, %s] established', $connection->remoteEndPoint(), $connection->connectionId());
$this->logDebug(
'TCP connection to [%s, %s] established',
(string) $connection->remoteEndPoint(),
$connection->connectionId()
);
$elapsed = $this->stopWatch->elapsed();

$this->heartbeatInfo = new HeartbeatInfo($this->packageNumber, true, $elapsed);
Expand Down Expand Up @@ -627,7 +635,7 @@ private function startOperation(ClientOperation $operation, int $maxRetries, int
$this->logDebug(
'StartOperation schedule %s, %s, %s, %s',
$operation->name(),
$operation,
(string) $operation,
$maxRetries,
$timeout
);
Expand Down Expand Up @@ -668,7 +676,7 @@ function (): ?TcpPackageConnection {
'StartSubscription %s %s, %s, MaxRetries: %d, Timeout: %d',
$this->state->equals(ConnectionState::connected()) ? 'fire' : 'enqueue',
$operation->name(),
$operation,
(string) $operation,
$message->maxRetries(),
$message->timeout()
);
Expand Down Expand Up @@ -718,7 +726,7 @@ function (): ?TcpPackageConnection {
'StartSubscription %s %s, %s, MaxRetries: %d, Timeout: %d',
$this->state->equals(ConnectionState::connected()) ? 'fire' : 'enqueue',
$operation->name(),
$operation,
(string) $operation,
$message->maxRetries(),
$message->timeout()
);
Expand Down Expand Up @@ -748,7 +756,7 @@ private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $
$this->logDebug(
'IGNORED: HandleTcpPackage connId %s, package %s, %s',
$connection->connectionId(),
$package->command(),
(string) $package->command(),
$package->correlationId()
);

Expand All @@ -758,7 +766,7 @@ private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $
$this->logDebug(
'HandleTcpPackage connId %s, package %s, %s',
$this->connection->connectionId(),
$package->command(),
(string) $package->command(),
$package->correlationId()
);

Expand Down Expand Up @@ -818,9 +826,9 @@ private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $

$this->logDebug(
'HandleTcpPackage OPERATION DECISION %s (%s), %s',
$result->decision(),
(string) $result->decision(),
$result->description(),
$operation
(string) $operation
);

switch ($result->decision()->value()) {
Expand All @@ -847,9 +855,9 @@ private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $
$this->logDebug(
'HandleTcpPackage %s SUBSCRIPTION DECISION %s (%s), %s',
$package->correlationId(),
$result->decision(),
(string) $result->decision(),
$result->description(),
$operation
(string) $operation
);

switch ($result->decision()->value()) {
Expand All @@ -873,7 +881,7 @@ private function handleTcpPackage(TcpPackageConnection $connection, TcpPackage $
$this->logDebug(
'HandleTcpPackage UNMAPPED PACKAGE with CorrelationId %s, Command: %s',
$package->correlationId(),
$package->command()
(string) $package->command()
);
}
}
Expand All @@ -900,7 +908,7 @@ private function reconnectTo(NodeEndPoints $endPoints): void
$msg = \sprintf(
'EventStoreNodeConnection \'%s\': going to reconnect to [%s]. Current end point: [%s]',
$this->esConnection->connectionName(),
$endPoint,
(string) $endPoint,
$this->connection->remoteEndPoint()
);

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/OperationItem.php
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public function __toString(): string
'Operation %s (%s): %s, retry count: %d, created: %s, last updated: %s',
$this->operation->name(),
$this->correlationId,
$this->operation,
(string) $this->operation,
$this->retryCount,
DateTime::format($this->created),
DateTime::format($this->lastUpdated)
Expand Down
14 changes: 7 additions & 7 deletions src/Internal/OperationsManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
$oldCorrId = $operation->correlationId();
$operation->setCorrelationId(Guid::generateAsHex());
$operation->incRetryCount();
$this->logDebug('retrying, old corrId %s, operation %s', $oldCorrId, $operation);
$this->logDebug('retrying, old corrId %s, operation %s', $oldCorrId, (string) $operation);
$this->scheduleOperation($operation, $connection);
}

Expand All @@ -159,7 +159,7 @@ public function scheduleOperationRetry(OperationItem $operation): void
return;
}

$this->logDebug('ScheduleOperationRetry for %s', $operation);
$this->logDebug('ScheduleOperationRetry for %s', (string) $operation);
if ($operation->maxRetries() >= 0 && $operation->retryCount() >= $operation->maxRetries()) {
$operation->operation()->fail(
RetriesLimitReached::with($operation->retryCount())
Expand All @@ -174,13 +174,13 @@ public function scheduleOperationRetry(OperationItem $operation): void
public function removeOperation(OperationItem $operation): bool
{
if (! isset($this->activeOperations[$operation->correlationId()])) {
$this->logDebug('RemoveOperation FAILED for %s', $operation);
$this->logDebug('RemoveOperation FAILED for %s', (string) $operation);

return false;
}

unset($this->activeOperations[$operation->correlationId()]);
$this->logDebug('RemoveOperation SUCCEEDED for %s', $operation);
$this->logDebug('RemoveOperation SUCCEEDED for %s', (string) $operation);

return true;
}
Expand All @@ -207,16 +207,16 @@ public function executeOperation(OperationItem $operation, TcpPackageConnection
$package = $operation->operation()->createNetworkPackage($correlationId);

$this->logDebug('ExecuteOperation package %s, %s, %s',
$package->command(),
(string) $package->command(),
$package->correlationId(),
$operation
(string) $operation
);
$connection->enqueueSend($package);
}

public function enqueueOperation(OperationItem $operation): void
{
$this->logDebug('EnqueueOperation WAITING for %s', $operation);
$this->logDebug('EnqueueOperation WAITING for %s', (string) $operation);
$this->waitingOperations->enqueue($operation);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/SubscriptionItem.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public function __toString(): string
'Subscription %s (%s): %s, is subscribed: %s, retry count: %d, created: %s, last updated: %s',
$this->operation->name(),
$this->correlationId,
$this->operation,
(string) $this->operation,
$this->isSubscribed ? 'yes' : 'no',
$this->retryCount,
DateTime::format($this->created),
Expand Down
14 changes: 7 additions & 7 deletions src/Internal/SubscriptionsManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
public function removeSubscription(SubscriptionItem $subscription): bool
{
$result = isset($this->activeSubscriptions[$subscription->correlationId()]);
$this->logDebug('RemoveSubscription %s, result %s', $subscription, $result ? 'yes' : 'no');
$this->logDebug('RemoveSubscription %s, result %s', (string) $subscription, $result ? 'yes' : 'no');
unset($this->activeSubscriptions[$subscription->correlationId()]);

return $result;
Expand All @@ -168,13 +168,13 @@ public function removeSubscription(SubscriptionItem $subscription): bool
public function scheduleSubscriptionRetry(SubscriptionItem $subscription): void
{
if (! $this->removeSubscription($subscription)) {
$this->logDebug('RemoveSubscription failed when trying to retry %s', $subscription);
$this->logDebug('RemoveSubscription failed when trying to retry %s', (string) $subscription);

return;
}

if ($subscription->maxRetries() >= 0 && $subscription->retryCount() >= $subscription->maxRetries()) {
$this->logDebug('RETRIES LIMIT REACHED when trying to retry %s', $subscription);
$this->logDebug('RETRIES LIMIT REACHED when trying to retry %s', (string) $subscription);
$subscription->operation()->dropSubscription(
SubscriptionDropReason::subscribingError(),
RetriesLimitReached::with($subscription->retryCount())
Expand All @@ -183,7 +183,7 @@ public function scheduleSubscriptionRetry(SubscriptionItem $subscription): void
return;
}

$this->logDebug('retrying subscription %s', $subscription);
$this->logDebug('retrying subscription %s', (string) $subscription);
$this->retryPendingSubscriptions[] = $subscription;
}

Expand All @@ -195,7 +195,7 @@ public function enqueueSubscription(SubscriptionItem $subscriptionItem): void
public function startSubscription(SubscriptionItem $subscription, TcpPackageConnection $connection): void
{
if ($subscription->isSubscribed()) {
$this->logDebug('StartSubscription REMOVING due to already subscribed %s', $subscription);
$this->logDebug('StartSubscription REMOVING due to already subscribed %s', (string) $subscription);
$this->removeSubscription($subscription);

return;
Expand All @@ -209,10 +209,10 @@ public function startSubscription(SubscriptionItem $subscription, TcpPackageConn
$this->activeSubscriptions[$correlationId] = $subscription;

if (! $subscription->operation()->subscribe($correlationId, $connection)) {
$this->logDebug('StartSubscription REMOVING AS COULD NOT SUBSCRIBE %s', $subscription);
$this->logDebug('StartSubscription REMOVING AS COULD NOT SUBSCRIBE %s', (string) $subscription);
$this->removeSubscription($subscription);
}
$this->logDebug('StartSubscription SUBSCRIBING %s', $subscription);
$this->logDebug('StartSubscription SUBSCRIBING %s', (string) $subscription);
}

private function logDebug(string $message, ...$parameters): void
Expand Down
1 change: 1 addition & 0 deletions src/Messages/ClusterMessages/ClusterInfoDto.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class ClusterInfoDto
/** @var MemberInfoDto[] */
private $members = [];

/** @param MemberInfoDto[] */
public function __construct(array $members = [])
{
foreach ($members as $member) {
Expand Down
2 changes: 1 addition & 1 deletion src/Messages/ClusterMessages/MemberInfoDto.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public function __construct(array $data = [])
}
}

public function instanceId()
public function instanceId(): string
{
return $this->instanceId;
}
Expand Down

0 comments on commit 3aef466

Please sign in to comment.