From 7093bb2452a7579771eaad0a62d75d82fa21fd2e Mon Sep 17 00:00:00 2001 From: Steve Hamblett Date: Mon, 8 Jul 2024 14:51:10 +0100 Subject: [PATCH] Issue524 (#546) * Issue 524 * Issue 524 * Issue 524 * Issue 524 --- ...t_client_mqtt_connection_handler_base.dart | 2 + .../mqtt_client_mqtt_connect_ack_message.dart | 10 +++ ...ient_mqtt_connect_ack_variable_header.dart | 23 +++-- .../mqtt_client_mqtt_variable_header.dart | 2 +- lib/src/mqtt_client_connection_status.dart | 3 + .../mqtt_client_connection_unsecure_test.dart | 4 + test/mqtt_client_message_test.dart | 88 +++++++++++-------- test/support/mqtt_client_mock_socket.dart | 3 +- 8 files changed, 88 insertions(+), 47 deletions(-) diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart index fde412a..e7400d6 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_handler_base.dart @@ -281,6 +281,8 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler { '- state = connected'); connectionStatus.state = MqttConnectionState.connected; connectionStatus.returnCode = MqttConnectReturnCode.connectionAccepted; + connectionStatus.connectAckMessage = msg; + // Call the connected callback if we have one if (onConnected != null) { onConnected!(); diff --git a/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_message.dart b/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_message.dart index 84fcaed..93dd5a1 100644 --- a/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_message.dart +++ b/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_message.dart @@ -8,6 +8,9 @@ part of '../../../mqtt_client.dart'; /// Message that indicates a connection acknowledgement. +/// +/// On successful connection the [MqttClientConnectionStatus] class is updated +/// with this message as returned by the broker. class MqttConnectAckMessage extends MqttMessage { /// Initializes a new instance of the MqttConnectAckMessage class. /// Only called via the MqttMessage.Create operation during processing @@ -49,6 +52,13 @@ class MqttConnectAckMessage extends MqttMessage { return this; } + /// Sets the session present flag. + /// Can only be set if the protocol is 3.1.1 + MqttConnectAckMessage withSessionPresent(bool present) { + variableHeader.sessionPresent = present; + return this; + } + @override String toString() { final sb = StringBuffer(); diff --git a/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_variable_header.dart b/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_variable_header.dart index 5b5c634..21a7b49 100644 --- a/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_variable_header.dart +++ b/lib/src/messages/connectack/mqtt_client_mqtt_connect_ack_variable_header.dart @@ -16,22 +16,31 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader { MqttConnectAckVariableHeader.fromByteBuffer(super.headerStream) : super.fromByteBuffer(); + /// Session present flag. + /// Only available for the 3.1.1 protocol, for 3.1 this is always false. + bool _sessionPresent = false; + bool get sessionPresent => _sessionPresent; + set sessionPresent(bool present) { + if (Protocol.version == MqttClientConstants.mqttV311ProtocolVersion) { + _sessionPresent = present; + } + } + /// Writes the variable header for an MQTT Connect message to /// the supplied stream. @override void writeTo(MqttByteBuffer variableHeaderStream) { - // Unused additional 'compression' byte used within the variable - // header acknowledgement. - variableHeaderStream.writeByte(0); + sessionPresent + ? variableHeaderStream.writeByte(1) + : variableHeaderStream.writeByte(0); writeReturnCode(variableHeaderStream); } /// Creates a variable header from the specified header stream. @override void readFrom(MqttByteBuffer variableHeaderStream) { - // Unused additional 'compression' byte used within the variable - // header acknowledgement. - variableHeaderStream.readByte(); + final ackConnectFlags = variableHeaderStream.readByte(); + sessionPresent = ackConnectFlags == 1; readReturnCode(variableHeaderStream); } @@ -45,6 +54,6 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader { @override String toString() => - 'Connect Variable Header: TopicNameCompressionResponse={0}, ' + 'Connect Variable Header: SessionPresent={$sessionPresent}, ' 'ReturnCode={$returnCode}'; } diff --git a/lib/src/messages/mqtt_client_mqtt_variable_header.dart b/lib/src/messages/mqtt_client_mqtt_variable_header.dart index 1fb90c5..55681b9 100644 --- a/lib/src/messages/mqtt_client_mqtt_variable_header.dart +++ b/lib/src/messages/mqtt_client_mqtt_variable_header.dart @@ -60,7 +60,7 @@ class MqttVariableHeader { /// Protocol version int protocolVersion = 0; - /// Conenct flags + /// Connect flags late MqttConnectFlags connectFlags; /// Defines the maximum allowable lag, in seconds, between expected messages. diff --git a/lib/src/mqtt_client_connection_status.dart b/lib/src/mqtt_client_connection_status.dart index f8b26e8..c9bf1c6 100644 --- a/lib/src/mqtt_client_connection_status.dart +++ b/lib/src/mqtt_client_connection_status.dart @@ -18,6 +18,9 @@ class MqttClientConnectionStatus { /// Disconnection origin MqttDisconnectionOrigin disconnectionOrigin = MqttDisconnectionOrigin.none; + /// The connect acknowledgement message from the broker for the current connection + MqttConnectAckMessage? connectAckMessage; + @override String toString() { final s = state.toString().split('.')[1]; diff --git a/test/mqtt_client_connection_unsecure_test.dart b/test/mqtt_client_connection_unsecure_test.dart index c9c30d8..a8c6ee1 100644 --- a/test/mqtt_client_connection_unsecure_test.dart +++ b/test/mqtt_client_connection_unsecure_test.dart @@ -254,6 +254,7 @@ void main() { expect(ch.connectionStatus.state, MqttConnectionState.connected); expect(ch.connectionStatus.returnCode, MqttConnectReturnCode.connectionAccepted); + expect(ch.connectionStatus.connectAckMessage, isNotNull); expect(connectCbCalled, isTrue); final state = ch.disconnect(); expect(state, MqttConnectionState.disconnected); @@ -270,6 +271,7 @@ void main() { test('Successful response and disconnect with returned status', () async { await IOOverrides.runZoned(() async { + Protocol.version = MqttClientConstants.mqttV311ProtocolVersion; final clientEventBus = events.EventBus(); final ch = SynchronousMqttServerConnectionHandler(clientEventBus, maxConnectionAttempts: 3, socketOptions: socketOptions); @@ -277,6 +279,8 @@ void main() { MqttConnectMessage().withClientIdentifier(testClientId)); expect(status.state, MqttConnectionState.connected); expect(status.returnCode, MqttConnectReturnCode.connectionAccepted); + expect(status.connectAckMessage, isNotNull); + expect(status.connectAckMessage?.variableHeader.sessionPresent, isTrue); final state = ch.disconnect(); expect(state, MqttConnectionState.disconnected); }, diff --git a/test/mqtt_client_message_test.dart b/test/mqtt_client_message_test.dart index 21f7411..c336137 100644 --- a/test/mqtt_client_message_test.dart +++ b/test/mqtt_client_message_test.dart @@ -460,12 +460,6 @@ void main() { group('Connect', () { test('Basic deserialization', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and ammeneded as work progresses - // - // Message Specs________________ - // <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111 final sampleMessage = [ 0x10, 0x1B, @@ -532,12 +526,6 @@ void main() { expect(bm.payload.password, 'Billy1Pass'); }); test('Payload - invalid client idenfier length', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and ammeneded as work progresses - // - // Message Specs________________ - // <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111andy111andy111andy111 final sampleMessage = [ 0x10, 0x15, @@ -628,12 +616,6 @@ void main() { group('Connect Ack', () { test('Deserialisation - Connection accepted', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and amended as work progresses - // - // Message Specs________________ - // <20><02><00><00> final sampleMessage = typed.Uint8Buffer(4); sampleMessage[0] = 0x20; sampleMessage[1] = 0x02; @@ -661,13 +643,37 @@ void main() { expect(message.variableHeader.returnCode, MqttConnectReturnCode.connectionAccepted); }); + test('Deserialisation - Connection accepted - session present', () { + Protocol.version = MqttClientConstants.mqttV311ProtocolVersion; + final sampleMessage = typed.Uint8Buffer(4); + sampleMessage[0] = 0x20; + sampleMessage[1] = 0x02; + sampleMessage[2] = 0x01; + sampleMessage[3] = 0x00; + final byteBuffer = MqttByteBuffer(sampleMessage); + final baseMessage = MqttMessage.createFrom(byteBuffer); + print('Connect Ack - Connection accepted::${baseMessage.toString()}'); + // Check that the message was correctly identified as a connect ack message. + expect(baseMessage, const TypeMatcher()); + final message = baseMessage as MqttConnectAckMessage; + // Validate the message deserialization + expect( + message.header!.duplicate, + false, + ); + expect( + message.header!.retain, + false, + ); + expect(message.header!.qos, MqttQos.atMostOnce); + expect(message.header!.messageType, MqttMessageType.connectAck); + expect(message.header!.messageSize, 2); + // Validate the variable header + expect(message.variableHeader.returnCode, + MqttConnectReturnCode.connectionAccepted); + expect(message.variableHeader.sessionPresent, isTrue); + }); test('Deserialisation - Unacceptable protocol version', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and amended as work progresses - // - // Message Specs________________ - // <20><02><00><00> final sampleMessage = typed.Uint8Buffer(4); sampleMessage[0] = 0x20; sampleMessage[1] = 0x02; @@ -697,12 +703,6 @@ void main() { MqttConnectReturnCode.unacceptedProtocolVersion); }); test('Deserialisation - Identifier rejected', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and amended as work progresses - // - // Message Specs________________ - // <20><02><00><00> final sampleMessage = typed.Uint8Buffer(4); sampleMessage[0] = 0x20; sampleMessage[1] = 0x02; @@ -731,12 +731,6 @@ void main() { MqttConnectReturnCode.identifierRejected); }); test('Deserialisation - Broker unavailable', () { - // Our test deserialization message, with the following properties. Note this message is not - // yet a real MQTT message, because not everything is implemented, but it must be modified - // and amended as work progresses - // - // Message Specs________________ - // <20><02><00><00> final sampleMessage = typed.Uint8Buffer(4); sampleMessage[0] = 0x20; sampleMessage[1] = 0x02; @@ -769,8 +763,8 @@ void main() { final expected = typed.Uint8Buffer(4); expected[0] = 0x20; expected[1] = 0x02; - expected[2] = 0x0; - expected[3] = 0x0; + expected[2] = 0x00; + expected[3] = 0x00; final msg = MqttConnectAckMessage() .withReturnCode(MqttConnectReturnCode.connectionAccepted); print('Connect Ack - Connection accepted::${msg.toString()}'); @@ -781,6 +775,24 @@ void main() { expect(actual[2], expected[2]); // connect ack - compression? always empty expect(actual[3], expected[3]); // return code. }); + test('Serialisation - Connection accepted - session present', () { + Protocol.version = MqttClientConstants.mqttV311ProtocolVersion; + final expected = typed.Uint8Buffer(4); + expected[0] = 0x20; + expected[1] = 0x02; + expected[2] = 0x01; + expected[3] = 0x00; + final msg = MqttConnectAckMessage() + .withReturnCode(MqttConnectReturnCode.connectionAccepted) + .withSessionPresent(true); + print('Connect Ack - Connection accepted::${msg.toString()}'); + final actual = MessageSerializationHelper.getMessageBytes(msg); + expect(actual.length, expected.length); + expect(actual[0], expected[0]); // msg type of header + expect(actual[1], expected[1]); // remaining length + expect(actual[2], expected[2]); // connect ack - compression? always empty + expect(actual[3], expected[3]); // return code. + }); test('Serialisation - Unacceptable protocol version', () { final expected = typed.Uint8Buffer(4); expected[0] = 0x20; diff --git a/test/support/mqtt_client_mock_socket.dart b/test/support/mqtt_client_mock_socket.dart index 4e12363..0f7feea 100644 --- a/test/support/mqtt_client_mock_socket.dart +++ b/test/support/mqtt_client_mock_socket.dart @@ -91,7 +91,8 @@ class MqttMockSocketSimpleConnect extends MockSocket { mockBytes.addAll(data); if (initial) { final ack = MqttConnectAckMessage() - .withReturnCode(MqttConnectReturnCode.connectionAccepted); + .withReturnCode(MqttConnectReturnCode.connectionAccepted) + .withSessionPresent(true); final buff = Uint8Buffer(); final ms = MqttByteBuffer(buff); ack.writeTo(ms);