From 84b850b485d63f636c929dd8300505cb39e6985a Mon Sep 17 00:00:00 2001 From: exaby73 Date: Wed, 9 Oct 2024 11:15:36 +0530 Subject: [PATCH 1/3] feat: Add ability to close all connections attached to a driver --- src/Basic/Driver.php | 5 +++++ src/Bolt/BoltDriver.php | 5 +++++ src/Bolt/ConnectionPool.php | 13 +++++++++++-- src/Contracts/ConnectionPoolInterface.php | 5 +++++ src/Contracts/DriverInterface.php | 5 +++++ src/Http/HttpConnectionPool.php | 5 +++++ src/Http/HttpDriver.php | 5 +++++ src/Neo4j/Neo4jConnectionPool.php | 9 +++++++++ src/Neo4j/Neo4jDriver.php | 5 +++++ 9 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/Basic/Driver.php b/src/Basic/Driver.php index ca76954d..7b73a876 100644 --- a/src/Basic/Driver.php +++ b/src/Basic/Driver.php @@ -57,4 +57,9 @@ public static function create(string|UriInterface $uri, ?DriverConfiguration $co return new self($driver); } + + public function closeConnections(): void + { + $this->driver->closeConnections(); + } } diff --git a/src/Bolt/BoltDriver.php b/src/Bolt/BoltDriver.php index db31730b..0fa16706 100644 --- a/src/Bolt/BoltDriver.php +++ b/src/Bolt/BoltDriver.php @@ -109,4 +109,9 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool return true; } + + public function closeConnections(): void + { + $this->pool->close(); + } } diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 905be531..38adef3b 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -46,8 +46,12 @@ public function __construct( private readonly ?Neo4jLogger $logger ) {} - public static function create(UriInterface $uri, AuthenticateInterface $auth, DriverConfiguration $conf, SemaphoreInterface $semaphore): self - { + public static function create( + UriInterface $uri, + AuthenticateInterface $auth, + DriverConfiguration $conf, + SemaphoreInterface $semaphore + ): self { return new self( $semaphore, BoltFactory::create($conf->getLogger()), @@ -165,4 +169,9 @@ private function returnAnyAvailableConnection(SessionConfiguration $config): ?Co return null; } + + public function close(): void + { + $this->activeConnections = []; + } } diff --git a/src/Contracts/ConnectionPoolInterface.php b/src/Contracts/ConnectionPoolInterface.php index 56978b66..98dd372c 100644 --- a/src/Contracts/ConnectionPoolInterface.php +++ b/src/Contracts/ConnectionPoolInterface.php @@ -44,4 +44,9 @@ public function acquire(SessionConfiguration $config): Generator; * Releases a connection back to the pool. */ public function release(ConnectionInterface $connection): void; + + /** + * Closes all connections in the pool. + */ + public function close(): void; } diff --git a/src/Contracts/DriverInterface.php b/src/Contracts/DriverInterface.php index ea0ca2b3..9d9b0018 100644 --- a/src/Contracts/DriverInterface.php +++ b/src/Contracts/DriverInterface.php @@ -38,4 +38,9 @@ public function createSession(?SessionConfiguration $config = null): SessionInte * Returns true if the driver can make a valid connection with the server. */ public function verifyConnectivity(?SessionConfiguration $config = null): bool; + + /** + * Closes all connections in the pool. + */ + public function closeConnections(): void; } diff --git a/src/Http/HttpConnectionPool.php b/src/Http/HttpConnectionPool.php index ac2fd8a2..2c0e3929 100644 --- a/src/Http/HttpConnectionPool.php +++ b/src/Http/HttpConnectionPool.php @@ -127,4 +127,9 @@ public function release(ConnectionInterface $connection): void { // Nothing to release in the current HTTP Protocol implementation } + + public function close(): void + { + // Nothing to close in the current HTTP Protocol implementation + } } diff --git a/src/Http/HttpDriver.php b/src/Http/HttpDriver.php index 8eb41220..76bcfa8d 100644 --- a/src/Http/HttpDriver.php +++ b/src/Http/HttpDriver.php @@ -198,4 +198,9 @@ private function tsxUrl(SessionConfiguration $config): Resolvable return str_replace('{databaseName}', $database, $tsx); }); } + + public function closeConnections(): void + { + // Nothing to close in the current HTTP Protocol implementation + } } diff --git a/src/Neo4j/Neo4jConnectionPool.php b/src/Neo4j/Neo4jConnectionPool.php index eff1c6c2..0a2bf890 100644 --- a/src/Neo4j/Neo4jConnectionPool.php +++ b/src/Neo4j/Neo4jConnectionPool.php @@ -233,4 +233,13 @@ private function createKey(ConnectionRequestData $data, ?SessionConfiguration $c ':', ], '|', $key); } + + public function close(): void + { + foreach (self::$pools as $pool) { + $pool->close(); + } + self::$pools = []; + $this->cache->clear(); + } } diff --git a/src/Neo4j/Neo4jDriver.php b/src/Neo4j/Neo4jDriver.php index 1dfccbb5..bc2a1695 100644 --- a/src/Neo4j/Neo4jDriver.php +++ b/src/Neo4j/Neo4jDriver.php @@ -111,4 +111,9 @@ public function verifyConnectivity(?SessionConfiguration $config = null): bool return true; } + + public function closeConnections(): void + { + $this->pool->close(); + } } From 6db72edffc3f03c62bda29c7630af43bbe73ffcf Mon Sep 17 00:00:00 2001 From: exaby73 Date: Wed, 9 Oct 2024 11:31:58 +0530 Subject: [PATCH 2/3] feat: Add explicit close() method to BoltConnection instead of relying on __destruct --- src/Bolt/BoltConnection.php | 38 +++++++++++++++++++++++---- src/Bolt/ConnectionPool.php | 3 +++ src/Contracts/ConnectionInterface.php | 5 ++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index 91c95a8c..cff96d5e 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -143,7 +143,23 @@ public function getAuthentication(): AuthenticateInterface */ public function isOpen(): bool { - return !in_array($this->protocol()->serverState, [ServerState::DISCONNECTED, ServerState::DEFUNCT], true); + return !in_array( + $this->protocol()->serverState, + [ServerState::DISCONNECTED, ServerState::DEFUNCT], + true + ); + } + + /** + * @psalm-mutation-free + */ + public function isStreaming(): bool + { + return in_array( + $this->protocol()->serverState, + [ServerState::STREAMING, ServerState::TX_STREAMING], + true + ); } public function setTimeout(float $timeout): void @@ -154,7 +170,8 @@ public function setTimeout(float $timeout): void public function consumeResults(): void { $this->logger?->log(LogLevel::DEBUG, 'Consuming results'); - if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol()->serverState !== ServerState::TX_STREAMING) { + if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol( + )->serverState !== ServerState::TX_STREAMING) { $this->subscribedResults = []; return; @@ -225,8 +242,14 @@ public function discard(?int $qid): void * * @return BoltMeta */ - public function run(string $text, array $parameters, ?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode): array - { + public function run( + string $text, + array $parameters, + ?string $database, + ?float $timeout, + BookmarkHolder $holder, + ?AccessMode $mode + ): array { $extra = $this->buildRunExtra($database, $timeout, $holder, $mode); $this->logger?->log(LogLevel::DEBUG, 'RUN', $extra); $response = $this->protocol() @@ -298,10 +321,15 @@ public function pull(?int $qid, ?int $fetchSize): array } public function __destruct() + { + $this->close(); + } + + public function close(): void { try { if ($this->isOpen()) { - if ($this->protocol()->serverState === ServerState::STREAMING || $this->protocol()->serverState === ServerState::TX_STREAMING) { + if ($this->isStreaming()) { $this->consumeResults(); } diff --git a/src/Bolt/ConnectionPool.php b/src/Bolt/ConnectionPool.php index 38adef3b..2b2bfa25 100644 --- a/src/Bolt/ConnectionPool.php +++ b/src/Bolt/ConnectionPool.php @@ -172,6 +172,9 @@ private function returnAnyAvailableConnection(SessionConfiguration $config): ?Co public function close(): void { + foreach ($this->activeConnections as $activeConnection) { + $activeConnection->close(); + } $this->activeConnections = []; } } diff --git a/src/Contracts/ConnectionInterface.php b/src/Contracts/ConnectionInterface.php index 3d917d99..63faa619 100644 --- a/src/Contracts/ConnectionInterface.php +++ b/src/Contracts/ConnectionInterface.php @@ -112,4 +112,9 @@ public function getEncryptionLevel(): string; * Returns the user agent handling this connection. */ public function getUserAgent(): string; + + /** + * Closes the connection. + */ + public function close(): void; } From 6a21e06d82929b94fadbfb394a53f4e9330a8d1d Mon Sep 17 00:00:00 2001 From: exaby73 Date: Wed, 9 Oct 2024 11:53:23 +0530 Subject: [PATCH 3/3] refactor: Refactor consumeResults --- src/Bolt/BoltConnection.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Bolt/BoltConnection.php b/src/Bolt/BoltConnection.php index cff96d5e..4c58f95d 100644 --- a/src/Bolt/BoltConnection.php +++ b/src/Bolt/BoltConnection.php @@ -170,8 +170,7 @@ public function setTimeout(float $timeout): void public function consumeResults(): void { $this->logger?->log(LogLevel::DEBUG, 'Consuming results'); - if ($this->protocol()->serverState !== ServerState::STREAMING && $this->protocol( - )->serverState !== ServerState::TX_STREAMING) { + if (!$this->isStreaming()) { $this->subscribedResults = []; return;