Skip to content

Commit

Permalink
Issue524 (#546)
Browse files Browse the repository at this point in the history
* Issue 524

* Issue 524

* Issue 524

* Issue 524
  • Loading branch information
shamblett committed Jul 8, 2024
1 parent bc94b23 commit 7093bb2
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler {
'- state = connected');
connectionStatus.state = MqttConnectionState.connected;
connectionStatus.returnCode = MqttConnectReturnCode.connectionAccepted;
connectionStatus.connectAckMessage = msg;

// Call the connected callback if we have one
if (onConnected != null) {
onConnected!();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
part of '../../../mqtt_client.dart';

/// Message that indicates a connection acknowledgement.
///
/// On successful connection the [MqttClientConnectionStatus] class is updated
/// with this message as returned by the broker.
class MqttConnectAckMessage extends MqttMessage {
/// Initializes a new instance of the MqttConnectAckMessage class.
/// Only called via the MqttMessage.Create operation during processing
Expand Down Expand Up @@ -49,6 +52,13 @@ class MqttConnectAckMessage extends MqttMessage {
return this;
}

/// Sets the session present flag.
/// Can only be set if the protocol is 3.1.1
MqttConnectAckMessage withSessionPresent(bool present) {
variableHeader.sessionPresent = present;
return this;
}

@override
String toString() {
final sb = StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,31 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader {
MqttConnectAckVariableHeader.fromByteBuffer(super.headerStream)
: super.fromByteBuffer();

/// Session present flag.
/// Only available for the 3.1.1 protocol, for 3.1 this is always false.
bool _sessionPresent = false;
bool get sessionPresent => _sessionPresent;
set sessionPresent(bool present) {
if (Protocol.version == MqttClientConstants.mqttV311ProtocolVersion) {
_sessionPresent = present;
}
}

/// Writes the variable header for an MQTT Connect message to
/// the supplied stream.
@override
void writeTo(MqttByteBuffer variableHeaderStream) {
// Unused additional 'compression' byte used within the variable
// header acknowledgement.
variableHeaderStream.writeByte(0);
sessionPresent
? variableHeaderStream.writeByte(1)
: variableHeaderStream.writeByte(0);
writeReturnCode(variableHeaderStream);
}

/// Creates a variable header from the specified header stream.
@override
void readFrom(MqttByteBuffer variableHeaderStream) {
// Unused additional 'compression' byte used within the variable
// header acknowledgement.
variableHeaderStream.readByte();
final ackConnectFlags = variableHeaderStream.readByte();
sessionPresent = ackConnectFlags == 1;
readReturnCode(variableHeaderStream);
}

Expand All @@ -45,6 +54,6 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader {

@override
String toString() =>
'Connect Variable Header: TopicNameCompressionResponse={0}, '
'Connect Variable Header: SessionPresent={$sessionPresent}, '
'ReturnCode={$returnCode}';
}
2 changes: 1 addition & 1 deletion lib/src/messages/mqtt_client_mqtt_variable_header.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MqttVariableHeader {
/// Protocol version
int protocolVersion = 0;

/// Conenct flags
/// Connect flags
late MqttConnectFlags connectFlags;

/// Defines the maximum allowable lag, in seconds, between expected messages.
Expand Down
3 changes: 3 additions & 0 deletions lib/src/mqtt_client_connection_status.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class MqttClientConnectionStatus {
/// Disconnection origin
MqttDisconnectionOrigin disconnectionOrigin = MqttDisconnectionOrigin.none;

/// The connect acknowledgement message from the broker for the current connection
MqttConnectAckMessage? connectAckMessage;

@override
String toString() {
final s = state.toString().split('.')[1];
Expand Down
4 changes: 4 additions & 0 deletions test/mqtt_client_connection_unsecure_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void main() {
expect(ch.connectionStatus.state, MqttConnectionState.connected);
expect(ch.connectionStatus.returnCode,
MqttConnectReturnCode.connectionAccepted);
expect(ch.connectionStatus.connectAckMessage, isNotNull);
expect(connectCbCalled, isTrue);
final state = ch.disconnect();
expect(state, MqttConnectionState.disconnected);
Expand All @@ -270,13 +271,16 @@ void main() {

test('Successful response and disconnect with returned status', () async {
await IOOverrides.runZoned(() async {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final clientEventBus = events.EventBus();
final ch = SynchronousMqttServerConnectionHandler(clientEventBus,
maxConnectionAttempts: 3, socketOptions: socketOptions);
final status = await ch.connect(mockBrokerAddress, mockBrokerPort,
MqttConnectMessage().withClientIdentifier(testClientId));
expect(status.state, MqttConnectionState.connected);
expect(status.returnCode, MqttConnectReturnCode.connectionAccepted);
expect(status.connectAckMessage, isNotNull);
expect(status.connectAckMessage?.variableHeader.sessionPresent, isTrue);
final state = ch.disconnect();
expect(state, MqttConnectionState.disconnected);
},
Expand Down
88 changes: 50 additions & 38 deletions test/mqtt_client_message_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,6 @@ void main() {

group('Connect', () {
test('Basic deserialization', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and ammeneded as work progresses
//
// Message Specs________________
// <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111
final sampleMessage = <int>[
0x10,
0x1B,
Expand Down Expand Up @@ -532,12 +526,6 @@ void main() {
expect(bm.payload.password, 'Billy1Pass');
});
test('Payload - invalid client idenfier length', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and ammeneded as work progresses
//
// Message Specs________________
// <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111andy111andy111andy111
final sampleMessage = <int>[
0x10,
0x15,
Expand Down Expand Up @@ -628,12 +616,6 @@ void main() {

group('Connect Ack', () {
test('Deserialisation - Connection accepted', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -661,13 +643,37 @@ void main() {
expect(message.variableHeader.returnCode,
MqttConnectReturnCode.connectionAccepted);
});
test('Deserialisation - Connection accepted - session present', () {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
sampleMessage[2] = 0x01;
sampleMessage[3] = 0x00;
final byteBuffer = MqttByteBuffer(sampleMessage);
final baseMessage = MqttMessage.createFrom(byteBuffer);
print('Connect Ack - Connection accepted::${baseMessage.toString()}');
// Check that the message was correctly identified as a connect ack message.
expect(baseMessage, const TypeMatcher<MqttConnectAckMessage>());
final message = baseMessage as MqttConnectAckMessage;
// Validate the message deserialization
expect(
message.header!.duplicate,
false,
);
expect(
message.header!.retain,
false,
);
expect(message.header!.qos, MqttQos.atMostOnce);
expect(message.header!.messageType, MqttMessageType.connectAck);
expect(message.header!.messageSize, 2);
// Validate the variable header
expect(message.variableHeader.returnCode,
MqttConnectReturnCode.connectionAccepted);
expect(message.variableHeader.sessionPresent, isTrue);
});
test('Deserialisation - Unacceptable protocol version', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -697,12 +703,6 @@ void main() {
MqttConnectReturnCode.unacceptedProtocolVersion);
});
test('Deserialisation - Identifier rejected', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -731,12 +731,6 @@ void main() {
MqttConnectReturnCode.identifierRejected);
});
test('Deserialisation - Broker unavailable', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -769,8 +763,8 @@ void main() {
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
expected[1] = 0x02;
expected[2] = 0x0;
expected[3] = 0x0;
expected[2] = 0x00;
expected[3] = 0x00;
final msg = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted);
print('Connect Ack - Connection accepted::${msg.toString()}');
Expand All @@ -781,6 +775,24 @@ void main() {
expect(actual[2], expected[2]); // connect ack - compression? always empty
expect(actual[3], expected[3]); // return code.
});
test('Serialisation - Connection accepted - session present', () {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
expected[1] = 0x02;
expected[2] = 0x01;
expected[3] = 0x00;
final msg = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted)
.withSessionPresent(true);
print('Connect Ack - Connection accepted::${msg.toString()}');
final actual = MessageSerializationHelper.getMessageBytes(msg);
expect(actual.length, expected.length);
expect(actual[0], expected[0]); // msg type of header
expect(actual[1], expected[1]); // remaining length
expect(actual[2], expected[2]); // connect ack - compression? always empty
expect(actual[3], expected[3]); // return code.
});
test('Serialisation - Unacceptable protocol version', () {
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
Expand Down
3 changes: 2 additions & 1 deletion test/support/mqtt_client_mock_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class MqttMockSocketSimpleConnect extends MockSocket {
mockBytes.addAll(data);
if (initial) {
final ack = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted);
.withReturnCode(MqttConnectReturnCode.connectionAccepted)
.withSessionPresent(true);
final buff = Uint8Buffer();
final ms = MqttByteBuffer(buff);
ack.writeTo(ms);
Expand Down

0 comments on commit 7093bb2

Please sign in to comment.