Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue523 #543

Merged
merged 6 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions example/mqtt_server_client_failed_connection.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Package : mqtt_client
* Author : S. Hamblett <steve.hamblett@linux.com>
* Date : 31/05/2017
* Copyright : S.Hamblett
*/

import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

/// An annotated connection attempt failed usage example for mqtt_server_client.
///
/// To run this example on a linux host please execute 'netcat -l 1883' at the command line.
/// Use a suitably equivalent command for other hosts.
///
/// First create a client, the client is constructed with a broker name, client identifier
/// and port if needed. The client identifier (short ClientId) is an identifier of each MQTT
/// client connecting to a MQTT broker. As the word identifier already suggests, it should be unique per broker.
/// The broker uses it for identifying the client and the current state of the client. If you don’t need a state
/// to be hold by the broker, in MQTT 3.1.1 you can set an empty ClientId, which results in a connection without any state.
/// A condition is that clean session connect flag is true, otherwise the connection will be rejected.
/// The client identifier can be a maximum length of 23 characters. If a port is not specified the standard port
/// of 1883 is used.

/// Connect to a resolvable host that is not running a broker, hence the connection will fail.
/// Set the maximum connection attempts to 3.
final client = MqttServerClient('localhost', '', maxConnectionAttempts: 3);

Future<int> main() async {
/// Set logging on if needed, defaults to off
client.logging(on: false);

/// Set the correct MQTT protocol for mosquito
client.setProtocolV311();

/// The connection timeout period can be set if needed, the default is 5 seconds.
client.connectTimeoutPeriod = 2000; // milliseconds

/// Add the unsolicited disconnection callback
client.onDisconnected = onDisconnected;

/// Add the failed connection attempt callback.
/// This callback will be called on every failed connection attempt, in the case of this
/// example it will be called 3 times at a period of 2 seconds.
client.onFailedConnectionAttempt = failedConnectionAttemptCallback;

/// Create a connection message to use or use the default one. The default one sets the
/// client identifier, any supplied username/password and clean session,
/// an example of a specific one below.
final connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueId')
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;

/// Connect the client, any errors here are communicated via the failed
/// connection attempts callback

try {
await client.connect();
} on NoConnectionException catch (e) {
// Raised by the client when connection fails.
print('EXAMPLE::client exception - $e');
client.disconnect();
exit(-1);
} on SocketException catch (e) {
// Raised by the socket layer
print('EXAMPLE::socket exception - $e');
client.disconnect();
exit(-1);
}

/// Check we are not connected
if (client.connectionStatus!.state != MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client not connected');
}

exit(0);
}

/// Failed connection attempt callback
void failedConnectionAttemptCallback(int attempt) {
print('EXAMPLE::onFailedConnectionAttempt, attempt number is $attempt');
if (attempt == 3) {
client.disconnect();
}
}

/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
if (client.connectionStatus!.disconnectionOrigin ==
MqttDisconnectionOrigin.solicited) {
print('EXAMPLE::OnDisconnected callback is solicited, this is correct');
} else {
print(
'EXAMPLE::OnDisconnected callback is unsolicited or none, this is incorrect - exiting');
exit(-1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,45 @@ class SynchronousMqttBrowserConnectionHandler
// We're the sync connection handler so we need to wait for the
// brokers acknowledgement of the connections
await connectTimer.sleep();
connectionAttempts++;
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect - '
'post sleep, state = $connectionStatus');
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect failed, attempt $connectionAttempts');
if (onFailedConnectionAttempt != null) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::calling onFailedConnectionAttempt');
onFailedConnectionAttempt!(connectionAttempts);
}
}
}
} while (connectionStatus.state != MqttConnectionState.connected &&
++connectionAttempts < maxConnectionAttempts!);
connectionAttempts < maxConnectionAttempts!);
// If we've failed to handshake with the broker, throw an exception.
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttBrowserConnectionHandler::internalConnect failed');
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
if (onFailedConnectionAttempt == null) {
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
} else {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
}
} else {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
connectionStatus.state = MqttConnectionState.faulted;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ abstract class IMqttConnectionHandler {
/// Auto reconnected callback
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed connection attempt callback
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Auto reconnect in progress
bool autoReconnectInProgress = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler {
@override
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed connection attempt callback
@override
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Auto reconnect in progress
@override
bool autoReconnectInProgress = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,45 @@ class SynchronousMqttServerConnectionHandler
// We're the sync connection handler so we need to wait for the
// brokers acknowledgement of the connections
await connectTimer.sleep();
connectionAttempts++;
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect - '
'post sleep, state = $connectionStatus');
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect failed, attempt $connectionAttempts');
if (onFailedConnectionAttempt != null) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::calling onFailedConnectionAttempt');
onFailedConnectionAttempt!(connectionAttempts);
}
}
}
} while (connectionStatus.state != MqttConnectionState.connected &&
++connectionAttempts < maxConnectionAttempts!);
connectionAttempts < maxConnectionAttempts!);
// If we've failed to handshake with the broker, throw an exception.
if (connectionStatus.state != MqttConnectionState.connected) {
if (!autoReconnectInProgress) {
MqttLogger.log(
'SynchronousMqttServerConnectionHandler::internalConnect failed');
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
if (onFailedConnectionAttempt == null) {
if (connectionStatus.returnCode ==
MqttConnectReturnCode.noneSpecified) {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message '
'(Missing Connection Acknowledgement?');
} else {
throw NoConnectionException(
'The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
}
} else {
throw NoConnectionException('The maximum allowed connection attempts '
'({$maxConnectionAttempts}) were exceeded. '
'The broker is not responding to the connection request message correctly '
'The return code is ${connectionStatus.returnCode}');
connectionStatus.state = MqttConnectionState.faulted;
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions lib/src/mqtt_browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class MqttBrowserClient extends MqttClient {
MqttBrowserClient(
super.server,
super.clientIdentifier, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
});

/// Initializes a new instance of the MqttServerClient class using
Expand All @@ -27,7 +28,8 @@ class MqttBrowserClient extends MqttClient {
super.server,
super.clientIdentifier,
super.port, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
}) : super.withPort();

/// Max connection attempts
Expand Down
13 changes: 13 additions & 0 deletions lib/src/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ typedef AutoReconnectCallback = void Function();
/// The client auto reconnect complete callback type
typedef AutoReconnectCompleteCallback = void Function();

/// The client failed connection attempt callback
typedef FailedConnectionAttemptCallback = void Function(int attemptNumber);

/// A client class for interacting with MQTT Data Packets.
/// Do not instantiate this class directly, instead instantiate
/// either a [MqttClientServer] class or an [MqttBrowserClient] as needed.
Expand Down Expand Up @@ -201,6 +204,14 @@ class MqttClient {
/// perform any post auto reconnect actions.
AutoReconnectCompleteCallback? onAutoReconnected;

/// Failed Connection attempt callback.
/// Called on every failed connection attempt, if [maxConnectionAttempts] is
/// set to 5 say this will be called 5 times if the connection fails,
/// one for every failed attempt. Note this is never called
/// if [autoReconnect] is set, also the [NoConnectionException] is not raised
/// if this callback is supplied.
FailedConnectionAttemptCallback? onFailedConnectionAttempt;

/// Subscribed callback, function returns a void and takes a
/// string parameter, the topic that has been subscribed to.
SubscribeCallback? _onSubscribed;
Expand Down Expand Up @@ -288,6 +299,8 @@ class MqttClient {
connectionHandler.onConnected = onConnected;
connectionHandler.onAutoReconnect = onAutoReconnect;
connectionHandler.onAutoReconnected = onAutoReconnected;
connectionHandler.onFailedConnectionAttempt = onFailedConnectionAttempt;

MqttLogger.log(
'MqttClient::connect - Connection timeout period is $connectTimeoutPeriod milliseconds');
publishingManager = PublishingManager(connectionHandler, clientEventBus);
Expand Down
5 changes: 4 additions & 1 deletion lib/src/mqtt_client_constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class MqttClientConstants {

/// Default keep alive in seconds.
/// The default of 0 disables keep alive.
static int defaultKeepAlive = 0;
static const int defaultKeepAlive = 0;

/// Default maximum connection attempts
static const int defaultMaxConnectionAttempts = 3;

/// Protocol variants
/// V3
Expand Down
6 changes: 4 additions & 2 deletions lib/src/mqtt_server_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class MqttServerClient extends MqttClient {
MqttServerClient(
super.server,
super.clientIdentifier, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
});

/// Initializes a new instance of the MqttServerClient class using
Expand All @@ -27,7 +28,8 @@ class MqttServerClient extends MqttClient {
super.server,
super.clientIdentifier,
super.port, {
this.maxConnectionAttempts = 3,
this.maxConnectionAttempts =
MqttClientConstants.defaultMaxConnectionAttempts,
}) : super.withPort();

/// The security context for secure usage
Expand Down
7 changes: 7 additions & 0 deletions test/mqtt_client_connection_autoreconnect_nobroker_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void main() {
await IOOverrides.runZoned(() async {
var autoReconnectCallbackCalled = false;
var disconnectCallbackCalled = false;
var connectionFailedCallbackCalled = false;

void autoReconnect() {
autoReconnectCallbackCalled = true;
Expand All @@ -30,6 +31,10 @@ void main() {
disconnectCallbackCalled = true;
}

void connectionFailed(int attempt) {
connectionFailedCallbackCalled = true;
}

final client = MqttServerClient('localhost', testClientId);
client.logging(on: true);
client.keepAlivePeriod = 1;
Expand All @@ -38,6 +43,7 @@ void main() {
client.socketOptions.add(socketOption);
client.onAutoReconnect = autoReconnect;
client.onDisconnected = disconnect;
client.onFailedConnectionAttempt = connectionFailed;
const username = 'unused 4';
print(username);
const password = 'password 4';
Expand All @@ -48,6 +54,7 @@ void main() {
await MqttUtilities.asyncSleep(2);
expect(autoReconnectCallbackCalled, isTrue);
expect(disconnectCallbackCalled, isFalse);
expect(connectionFailedCallbackCalled, isFalse);
expect(client.connectionStatus!.state == MqttConnectionState.connecting,
isTrue);
},
Expand Down
Loading
Loading