From e0f2513b50e3419e58d24cdc96cff6cd754afe4d Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Tue, 30 Sep 2025 19:34:24 +0200 Subject: [PATCH 1/2] Add ping response timeout handling --- src/MqttClient.php | 43 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/src/MqttClient.php b/src/MqttClient.php index c68ef2c..1545804 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -45,9 +45,10 @@ class MqttClient implements ClientContract public const SOCKET_READ_BUFFER_SIZE = 8192; private string $clientId; private ConnectionSettings $settings; - private string $buffer = ''; - private bool $connected = false; - private ?float $lastPingAt = null; + private string $buffer = ''; + private bool $connected = false; + private ?float $lastPingAt = null; + private ?float $pingResponseExpectedUntil = null; private MessageProcessor $messageProcessor; private Repository $repository; private LoggerInterface $logger; @@ -489,6 +490,7 @@ public function disconnect(): void stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } + $this->socket = null; $this->connected = false; } @@ -650,6 +652,28 @@ public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sl // This includes published messages, subscribe and unsubscribe requests. $this->resendPendingMessages(); + // If the last ping we sent has not been answered within the configured keep alive interval, + // we assume the connection is dead and try to reconnect if configured to do so. + if ($this->pingResponseExpectedUntil !== null && microtime(true) > $this->pingResponseExpectedUntil) { + $this->logger->warning( + 'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead.' + ); + + $this->closeSocket(); + + $this->lastPingAt = null; + $this->pingResponseExpectedUntil = null; + + if (!$this->settings->shouldReconnectAutomatically()) { + throw new DataTransferException( + DataTransferException::EXCEPTION_RX_DATA, + 'No ping response received in time. The connection is dead.' + ); + } + + $this->reconnect(); + } + // If the last message of the broker has been received more seconds ago // than specified by the keep alive time, we will send a ping to ensure // the connection is kept alive. @@ -869,6 +893,14 @@ protected function handleMessage(Message $message): void $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingResponseMessage()); return; } + + // PINGRESP + if ($message->getType()->equals(MessageType::PING_RESPONSE())) { + // Set the ping response expectation to null, so we know the broker responded. + $this->logger->debug('Received ping response from the broker.'); + $this->pingResponseExpectedUntil = null; + return; + } } /** @@ -1020,6 +1052,8 @@ protected function ping(): void $this->logger->debug('Sending ping to the broker to keep the connection alive.'); $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingRequestMessage()); + + $this->pingResponseExpectedUntil = microtime(true) + $this->settings->getKeepAliveInterval(); } /** @@ -1231,6 +1265,7 @@ protected function closeSocket(): void ]); } - $this->socket = null; + $this->socket = null; + $this->connected = false; } } From 13367b7bb2b7c3a4add4072de2146528f6d7bf2c Mon Sep 17 00:00:00 2001 From: Marvin Mall Date: Tue, 30 Sep 2025 19:47:19 +0200 Subject: [PATCH 2/2] Implement pr feedback --- src/MqttClient.php | 53 +++++++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/src/MqttClient.php b/src/MqttClient.php index 1545804..0ba1a84 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -654,25 +654,7 @@ public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sl // If the last ping we sent has not been answered within the configured keep alive interval, // we assume the connection is dead and try to reconnect if configured to do so. - if ($this->pingResponseExpectedUntil !== null && microtime(true) > $this->pingResponseExpectedUntil) { - $this->logger->warning( - 'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead.' - ); - - $this->closeSocket(); - - $this->lastPingAt = null; - $this->pingResponseExpectedUntil = null; - - if (!$this->settings->shouldReconnectAutomatically()) { - throw new DataTransferException( - DataTransferException::EXCEPTION_RX_DATA, - 'No ping response received in time. The connection is dead.' - ); - } - - $this->reconnect(); - } + $this->handlePingTimeout(); // If the last message of the broker has been received more seconds ago // than specified by the keep alive time, we will send a ping to ensure @@ -1052,10 +1034,41 @@ protected function ping(): void $this->logger->debug('Sending ping to the broker to keep the connection alive.'); $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingRequestMessage()); - + + // Set the deadline for receiving a PINGRESP message according to MQTT specification requirements. $this->pingResponseExpectedUntil = microtime(true) + $this->settings->getKeepAliveInterval(); } + /** + * Handles a ping timeout by closing the socket and trying to reconnect if configured to do so. + * + * @throws DataTransferException + */ + protected function handlePingTimeout(): void + { + if ($this->pingResponseExpectedUntil === null || microtime(true) < $this->pingResponseExpectedUntil) { + return; + } + + $this->logger->warning( + 'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead.' + ); + + $this->closeSocket(); + + $this->lastPingAt = null; + $this->pingResponseExpectedUntil = null; + + if (!$this->settings->shouldReconnectAutomatically()) { + throw new DataTransferException( + DataTransferException::EXCEPTION_RX_DATA, + 'No ping response received in time. The connection is dead.' + ); + } + + $this->reconnect(); + } + /** * Sends a disconnect message to the broker. Does not close the socket. *