Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ class MqttClient implements ClientContract
public const SOCKET_READ_BUFFER_SIZE = 8192;
private string $clientId;
private ConnectionSettings $settings;
private string $buffer = '';
private bool $connected = false;
private ?float $lastPingAt = null;
private string $buffer = '';
private bool $connected = false;
private ?float $lastPingAt = null;
private ?float $pingResponseExpectedUntil = null;
private MessageProcessor $messageProcessor;
private Repository $repository;
private LoggerInterface $logger;
Expand Down Expand Up @@ -489,6 +490,7 @@ public function disconnect(): void
stream_socket_shutdown($this->socket, STREAM_SHUT_WR);
}

$this->socket = null;
$this->connected = false;
}

Expand Down Expand Up @@ -650,6 +652,10 @@ public function loopOnce(float $loopStartedAt, bool $allowSleep = false, int $sl
// This includes published messages, subscribe and unsubscribe requests.
$this->resendPendingMessages();

// If the last ping we sent has not been answered within the configured keep alive interval,
// we assume the connection is dead and try to reconnect if configured to do so.
$this->handlePingTimeout();

// If the last message of the broker has been received more seconds ago
// than specified by the keep alive time, we will send a ping to ensure
// the connection is kept alive.
Expand Down Expand Up @@ -869,6 +875,14 @@ protected function handleMessage(Message $message): void
$this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingResponseMessage());
return;
}

// PINGRESP
if ($message->getType()->equals(MessageType::PING_RESPONSE())) {
// Set the ping response expectation to null, so we know the broker responded.
$this->logger->debug('Received ping response from the broker.');
$this->pingResponseExpectedUntil = null;
return;
}
}

/**
Expand Down Expand Up @@ -1020,6 +1034,39 @@ protected function ping(): void
$this->logger->debug('Sending ping to the broker to keep the connection alive.');

$this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingRequestMessage());

// Set the deadline for receiving a PINGRESP message according to MQTT specification requirements.
$this->pingResponseExpectedUntil = microtime(true) + $this->settings->getKeepAliveInterval();
}

/**
* Handles a ping timeout by closing the socket and trying to reconnect if configured to do so.
*
* @throws DataTransferException
*/
protected function handlePingTimeout(): void
{
if ($this->pingResponseExpectedUntil === null || microtime(true) < $this->pingResponseExpectedUntil) {
return;
}

$this->logger->warning(
'The broker did not respond to our ping request within the configured keep alive interval. Assuming the connection is dead.'
);

$this->closeSocket();

$this->lastPingAt = null;
$this->pingResponseExpectedUntil = null;

if (!$this->settings->shouldReconnectAutomatically()) {
throw new DataTransferException(
DataTransferException::EXCEPTION_RX_DATA,
'No ping response received in time. The connection is dead.'
);
}

$this->reconnect();
}

/**
Expand Down Expand Up @@ -1231,6 +1278,7 @@ protected function closeSocket(): void
]);
}

$this->socket = null;
$this->socket = null;
$this->connected = false;
}
}