Skip to content

Commit

Permalink
Merge fbcecde into 7cc645f
Browse files Browse the repository at this point in the history
  • Loading branch information
prolic committed Oct 20, 2018
2 parents 7cc645f + fbcecde commit 9983171
Show file tree
Hide file tree
Showing 17 changed files with 41 additions and 35 deletions.
8 changes: 5 additions & 3 deletions src/EventStoreAsyncConnectionBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static function createFromSettings(
$credentials = $user ? new UserCredentials($user, $pass) : null;
if (null !== $credentials) {
$settings = new ConnectionSettings(
$settings->log(),
$settings->verboseLogging(),
$settings->maxQueueSize(),
$settings->maxConcurrentItems(),
$settings->maxRetries(),
Expand All @@ -80,13 +82,13 @@ public static function createFromSettings(
$settings->failOnNoServerResponse(),
$settings->heartbeatInterval(),
$settings->heartbeatTimeout(),
$settings->clientConnectionTimeout(),
$settings->clusterDns(),
$settings->gossipSeeds(),
$settings->maxDiscoverAttempts(),
$settings->externalGossipPort(),
$settings->gossipSeeds(),
$settings->gossipTimeout(),
$settings->preferRandomNode()
$settings->preferRandomNode(),
$settings->clientConnectionTimeout()
);
}

Expand Down
8 changes: 5 additions & 3 deletions src/EventStoreSyncConnectionBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static function createFromSettings(
$credentials = $user ? new UserCredentials($user, $pass) : null;
if (null !== $credentials) {
$settings = new ConnectionSettings(
$settings->log(),
$settings->verboseLogging(),
$settings->maxQueueSize(),
$settings->maxConcurrentItems(),
$settings->maxRetries(),
Expand All @@ -80,13 +82,13 @@ public static function createFromSettings(
$settings->failOnNoServerResponse(),
$settings->heartbeatInterval(),
$settings->heartbeatTimeout(),
$settings->clientConnectionTimeout(),
$settings->clusterDns(),
$settings->gossipSeeds(),
$settings->maxDiscoverAttempts(),
$settings->externalGossipPort(),
$settings->gossipSeeds(),
$settings->gossipTimeout(),
$settings->preferRandomNode()
$settings->preferRandomNode(),
$settings->clientConnectionTimeout()
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Internal/ClusterDnsEndPointDiscoverer.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function __construct(
int $gossipTimeout,
bool $preferRandomNode
) {
$this->logger = $logger;
$this->log = $logger;
$this->clusterDns = $clusterDns;
$this->maxDiscoverAttempts = $maxDiscoverAttempts;
$this->managerExternalHttpPort = $managerExternalHttpPort;
Expand Down
6 changes: 6 additions & 0 deletions src/Internal/EventStoreStreamCatchUpSubscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Prooph\EventStoreClient\CatchUpSubscriptionSettings;
use Prooph\EventStoreClient\EventAppearedOnCatchupSubscription;
use Prooph\EventStoreClient\EventStoreAsyncConnection;
use Prooph\EventStoreClient\Exception\OutOfRangeException;
use Prooph\EventStoreClient\Exception\StreamDeletedException;
use Prooph\EventStoreClient\LiveProcessingStarted;
use Prooph\EventStoreClient\ResolvedEvent;
Expand Down Expand Up @@ -151,6 +152,11 @@ private function processEventsAsync(?int $lastEventNumber, StreamEventsSlice $sl
break;
case SliceReadStatus::STREAM_DELETED:
throw StreamDeletedException::with($this->streamId());
default:
throw new OutOfRangeException(\sprintf(
'Unexpected SliceReadStatus "%s" received',
$slice->status()->name()
));
}

if (! $done && $slice->isEndOfStream()) {
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/OperationsManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
if ($operation->connectionId() !== $connection->connectionId()) {
$retryOperations[] = $operation;
} elseif ($operation->timeout() > 0
&& DateTimeUtil::utcNow()->format('U.u') - $operation->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
&& (float) DateTimeUtil::utcNow()->format('U.u') - (float) $operation->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
) {
$err = \sprintf(
'EventStoreNodeConnection \'%s\': subscription never got confirmation from server',
Expand Down
4 changes: 2 additions & 2 deletions src/Internal/StopWatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ private function __construct(int $started)
public static function startNew(): self
{
$now = DateTimeUtil::utcNow();
$started = (int) \floor($now->format('U.u') * 1000);
$started = (int) \floor((float) $now->format('U.u') * 1000);

return new self($started);
}

public function elapsed(): int
{
$now = DateTimeUtil::utcNow();
$timestamp = (int) \floor($now->format('U.u') * 1000);
$timestamp = (int) \floor((float) $now->format('U.u') * 1000);

return $timestamp - $this->started;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/SubscriptionsManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public function checkTimeoutsAndRetry(TcpPackageConnection $connection): void
if ($subscription->connectionId() !== $connection->connectionId()) {
$this->retryPendingSubscriptions[] = $subscription;
} elseif ($subscription->timeout() > 0
&& DateTimeUtil::utcNow()->format('U.u') - $subscription->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
&& (float) DateTimeUtil::utcNow()->format('U.u') - (float) $subscription->lastUpdated()->format('U.u') > $this->settings->operationTimeout()
) {
$err = \sprintf(
'EventStoreNodeConnection \'%s\': subscription never got confirmation from server',
Expand Down
3 changes: 1 addition & 2 deletions src/Projections/ProjectionsClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,7 @@ public function getState(
$endPoint,
$httpSchema,
'/projection/%s/state',
$name,
$partition
$name
),
$userCredentials,
HttpStatusCode::OK
Expand Down
5 changes: 1 addition & 4 deletions src/Transport/Tcp/TcpPackageConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,16 @@ public function enqueueSend(TcpPackage $package): void

private function incomingMessageArrived(string $data): void
{
$valid = false;

try {
$package = TcpPackage::fromRawData($data);
$valid = true;
($this->handlePackage)($this, $package);
} catch (Throwable $e) {
$this->connection->close();
$message = \sprintf(
'TcpPackageConnection: [%s, %s]: Error when processing TcpPackage %s: %s. Connection will be closed',
$this->remoteEndPoint,
$this->connectionId,
$valid ? $package->command()->name() : '<invalid package>',
isset($package) ? $package->command()->name() : '<invalid package>',
$e->getMessage()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@ public function the_subscription_gets_dropped(): void
$this->execute(function (): Generator {
try {
$result = yield Promise\timeout($this->resetEvent->promise(), 5000);

$this->assertTrue($result);
$this->assertTrue($this->reason->equals(SubscriptionDropReason::eventHandlerException()));
$this->assertInstanceOf(Exception::class, $this->exception);
$this->assertSame('test', $this->exception->getMessage());
} catch (TimeoutException $e) {
$this->fail('Timed out');
}

$this->assertTrue($result);
$this->assertTrue($this->reason->equals(SubscriptionDropReason::eventHandlerException()));
$this->assertInstanceOf(Exception::class, $this->exception);
$this->assertSame('test', $this->exception->getMessage());
});
}
}
8 changes: 4 additions & 4 deletions tests/catchup_subscription_handles_small_batch_sizes.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ public function catchupSubscriptionToAllHandlesManyEventsWithSmallBatchSize(): v
try {
// we wait maximum 5 minutes
$result = yield timeout($deferred->promise(), 5 * 60 * 1000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for test to complete');
}

$this->assertTrue($result);

$this->tearDownTestCase();
}));
}
Expand Down Expand Up @@ -135,12 +135,12 @@ public function catchupSubscriptionToStreamHandlesManyEventsWithSmallBatchSize()
try {
// we wait maximum 5 minutes
$result = yield timeout($deferred->promise(), 5 * 60 * 1000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for test to complete');
}

$this->assertTrue($result);

$this->tearDownTestCase();
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\UuidGenerator;
use Prooph\EventStoreClient\PersistentSubscriptionSettings;
use Throwable;

class create_persistent_subscription_after_deleting_the_same extends TestCase
{
Expand Down
4 changes: 2 additions & 2 deletions tests/happy_case_catching_up_to_link_to_events_auto_ack.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ public function __invoke(

try {
$result = yield Promise\timeout($this->eventsReceived->promise(), 5000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for events');
}

$this->assertTrue($result);
});
}
}
4 changes: 2 additions & 2 deletions tests/happy_case_catching_up_to_link_to_events_manual_ack.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,11 @@ public function __invoke(

try {
$result = yield Promise\timeout($this->eventsReceived->promise(), 5000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for events');
}

$this->assertTrue($result);
});
}
}
4 changes: 2 additions & 2 deletions tests/happy_case_catching_up_to_normal_events_auto_ack.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ public function __invoke(

try {
$result = yield Promise\timeout($this->eventsReceived->promise(), 5000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for events');
}

$this->assertTrue($result);
});
}
}
4 changes: 2 additions & 2 deletions tests/happy_case_catching_up_to_normal_events_manual_ack.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ public function __invoke(

try {
$result = yield Promise\timeout($this->eventsReceived->promise(), 5000);

$this->assertTrue($result);
} catch (TimeoutException $e) {
$this->fail('Timed out waiting for events');
}

$this->assertTrue($result);
});
}
}
1 change: 0 additions & 1 deletion tests/read_all_events_backward_should.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ protected function when(): Generator
[],
[],
[],
[],
[]
)
),
Expand Down

0 comments on commit 9983171

Please sign in to comment.