diff --git a/src/MqttClient.php b/src/MqttClient.php index c68ef2c..0ba1a84 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -45,9 +45,10 @@ class MqttClient implements ClientContract public const SOCKET_READ_BUFFER_SIZE = 8192; private string $clientId; private ConnectionSettings $settings; - private string $buffer = ''; - private bool $connected = false; - private ?float $lastPingAt = null; + private string $buffer = ''; + private bool $connected = false; + private ?float $lastPingAt = null; + private ?float $pingResponseExpectedUntil = null; private MessageProcessor $messageProcessor; private Repository $repository; private LoggerInterface $logger; @@ -489,6 +490,7 @@ public function disconnect(): void stream_socket_shutdown($this->socket, STREAM_SHUT_WR); } + $this->socket = null; $this->connected = false; } @@ -650,6 +652,10 @@ public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sl // This includes published messages, subscribe and unsubscribe requests. $this->resendPendingMessages(); + // If the last ping we sent has not been answered within the configured keep alive interval, + // we assume the connection is dead and try to reconnect if configured to do so. + $this->handlePingTimeout(); + // If the last message of the broker has been received more seconds ago // than specified by the keep alive time, we will send a ping to ensure // the connection is kept alive. @@ -869,6 +875,14 @@ protected function handleMessage(Message $message): void $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingResponseMessage()); return; } + + // PINGRESP + if ($message->getType()->equals(MessageType::PING_RESPONSE())) { + // Set the ping response expectation to null, so we know the broker responded. + $this->logger->debug('Received ping response from the broker.'); + $this->pingResponseExpectedUntil = null; + return; + } } /** @@ -1020,6 +1034,39 @@ protected function ping(): void $this->logger->debug('Sending ping to the broker to keep the connection alive.'); $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingRequestMessage()); + + // Set the deadline for receiving a PINGRESP message according to MQTT specification requirements. + $this->pingResponseExpectedUntil = microtime(true) + $this->settings->getKeepAliveInterval(); + } + + /** + * Handles a ping timeout by closing the socket and trying to reconnect if configured to do so. + * + * @throws DataTransferException + */ + protected function handlePingTimeout(): void + { + if ($this->pingResponseExpectedUntil === null || microtime(true) < $this->pingResponseExpectedUntil) { + return; + } + + $this->logger->warning( + 'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead.' + ); + + $this->closeSocket(); + + $this->lastPingAt = null; + $this->pingResponseExpectedUntil = null; + + if (!$this->settings->shouldReconnectAutomatically()) { + throw new DataTransferException( + DataTransferException::EXCEPTION_RX_DATA, + 'No ping response received in time. The connection is dead.' + ); + } + + $this->reconnect(); } /** @@ -1231,6 +1278,7 @@ protected function closeSocket(): void ]); } - $this->socket = null; + $this->socket = null; + $this->connected = false; } }