Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue524 #546

Merged
merged 5 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ abstract class MqttConnectionHandlerBase implements IMqttConnectionHandler {
'- state = connected');
connectionStatus.state = MqttConnectionState.connected;
connectionStatus.returnCode = MqttConnectReturnCode.connectionAccepted;
connectionStatus.connectAckMessage = msg;

// Call the connected callback if we have one
if (onConnected != null) {
onConnected!();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
part of '../../../mqtt_client.dart';

/// Message that indicates a connection acknowledgement.
///
/// On successful connection the [MqttClientConnectionStatus] class is updated
/// with this message as returned by the broker.
class MqttConnectAckMessage extends MqttMessage {
/// Initializes a new instance of the MqttConnectAckMessage class.
/// Only called via the MqttMessage.Create operation during processing
Expand Down Expand Up @@ -49,6 +52,13 @@ class MqttConnectAckMessage extends MqttMessage {
return this;
}

/// Sets the session present flag.
/// Can only be set if the protocol is 3.1.1
MqttConnectAckMessage withSessionPresent(bool present) {
variableHeader.sessionPresent = present;
return this;
}

@override
String toString() {
final sb = StringBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,31 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader {
MqttConnectAckVariableHeader.fromByteBuffer(super.headerStream)
: super.fromByteBuffer();

/// Session present flag.
/// Only available for the 3.1.1 protocol, for 3.1 this is always false.
bool _sessionPresent = false;
bool get sessionPresent => _sessionPresent;
set sessionPresent(bool present) {
if (Protocol.version == MqttClientConstants.mqttV311ProtocolVersion) {
_sessionPresent = present;
}
}

/// Writes the variable header for an MQTT Connect message to
/// the supplied stream.
@override
void writeTo(MqttByteBuffer variableHeaderStream) {
// Unused additional 'compression' byte used within the variable
// header acknowledgement.
variableHeaderStream.writeByte(0);
sessionPresent
? variableHeaderStream.writeByte(1)
: variableHeaderStream.writeByte(0);
writeReturnCode(variableHeaderStream);
}

/// Creates a variable header from the specified header stream.
@override
void readFrom(MqttByteBuffer variableHeaderStream) {
// Unused additional 'compression' byte used within the variable
// header acknowledgement.
variableHeaderStream.readByte();
final ackConnectFlags = variableHeaderStream.readByte();
sessionPresent = ackConnectFlags == 1;
readReturnCode(variableHeaderStream);
}

Expand All @@ -45,6 +54,6 @@ class MqttConnectAckVariableHeader extends MqttVariableHeader {

@override
String toString() =>
'Connect Variable Header: TopicNameCompressionResponse={0}, '
'Connect Variable Header: SessionPresent={$sessionPresent}, '
'ReturnCode={$returnCode}';
}
2 changes: 1 addition & 1 deletion lib/src/messages/mqtt_client_mqtt_variable_header.dart
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class MqttVariableHeader {
/// Protocol version
int protocolVersion = 0;

/// Conenct flags
/// Connect flags
late MqttConnectFlags connectFlags;

/// Defines the maximum allowable lag, in seconds, between expected messages.
Expand Down
3 changes: 3 additions & 0 deletions lib/src/mqtt_client_connection_status.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class MqttClientConnectionStatus {
/// Disconnection origin
MqttDisconnectionOrigin disconnectionOrigin = MqttDisconnectionOrigin.none;

/// The connect acknowledgement message from the broker for the current connection
MqttConnectAckMessage? connectAckMessage;

@override
String toString() {
final s = state.toString().split('.')[1];
Expand Down
4 changes: 4 additions & 0 deletions test/mqtt_client_connection_unsecure_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void main() {
expect(ch.connectionStatus.state, MqttConnectionState.connected);
expect(ch.connectionStatus.returnCode,
MqttConnectReturnCode.connectionAccepted);
expect(ch.connectionStatus.connectAckMessage, isNotNull);
expect(connectCbCalled, isTrue);
final state = ch.disconnect();
expect(state, MqttConnectionState.disconnected);
Expand All @@ -270,13 +271,16 @@ void main() {

test('Successful response and disconnect with returned status', () async {
await IOOverrides.runZoned(() async {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final clientEventBus = events.EventBus();
final ch = SynchronousMqttServerConnectionHandler(clientEventBus,
maxConnectionAttempts: 3, socketOptions: socketOptions);
final status = await ch.connect(mockBrokerAddress, mockBrokerPort,
MqttConnectMessage().withClientIdentifier(testClientId));
expect(status.state, MqttConnectionState.connected);
expect(status.returnCode, MqttConnectReturnCode.connectionAccepted);
expect(status.connectAckMessage, isNotNull);
expect(status.connectAckMessage?.variableHeader.sessionPresent, isTrue);
final state = ch.disconnect();
expect(state, MqttConnectionState.disconnected);
},
Expand Down
88 changes: 50 additions & 38 deletions test/mqtt_client_message_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,6 @@ void main() {

group('Connect', () {
test('Basic deserialization', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and ammeneded as work progresses
//
// Message Specs________________
// <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111
final sampleMessage = <int>[
0x10,
0x1B,
Expand Down Expand Up @@ -532,12 +526,6 @@ void main() {
expect(bm.payload.password, 'Billy1Pass');
});
test('Payload - invalid client idenfier length', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and ammeneded as work progresses
//
// Message Specs________________
// <10><15><00><06>MQIsdp<03><02><00><1E><00><07>andy111andy111andy111andy111
final sampleMessage = <int>[
0x10,
0x15,
Expand Down Expand Up @@ -628,12 +616,6 @@ void main() {

group('Connect Ack', () {
test('Deserialisation - Connection accepted', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -661,13 +643,37 @@ void main() {
expect(message.variableHeader.returnCode,
MqttConnectReturnCode.connectionAccepted);
});
test('Deserialisation - Connection accepted - session present', () {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
sampleMessage[2] = 0x01;
sampleMessage[3] = 0x00;
final byteBuffer = MqttByteBuffer(sampleMessage);
final baseMessage = MqttMessage.createFrom(byteBuffer);
print('Connect Ack - Connection accepted::${baseMessage.toString()}');
// Check that the message was correctly identified as a connect ack message.
expect(baseMessage, const TypeMatcher<MqttConnectAckMessage>());
final message = baseMessage as MqttConnectAckMessage;
// Validate the message deserialization
expect(
message.header!.duplicate,
false,
);
expect(
message.header!.retain,
false,
);
expect(message.header!.qos, MqttQos.atMostOnce);
expect(message.header!.messageType, MqttMessageType.connectAck);
expect(message.header!.messageSize, 2);
// Validate the variable header
expect(message.variableHeader.returnCode,
MqttConnectReturnCode.connectionAccepted);
expect(message.variableHeader.sessionPresent, isTrue);
});
test('Deserialisation - Unacceptable protocol version', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -697,12 +703,6 @@ void main() {
MqttConnectReturnCode.unacceptedProtocolVersion);
});
test('Deserialisation - Identifier rejected', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -731,12 +731,6 @@ void main() {
MqttConnectReturnCode.identifierRejected);
});
test('Deserialisation - Broker unavailable', () {
// Our test deserialization message, with the following properties. Note this message is not
// yet a real MQTT message, because not everything is implemented, but it must be modified
// and amended as work progresses
//
// Message Specs________________
// <20><02><00><00>
final sampleMessage = typed.Uint8Buffer(4);
sampleMessage[0] = 0x20;
sampleMessage[1] = 0x02;
Expand Down Expand Up @@ -769,8 +763,8 @@ void main() {
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
expected[1] = 0x02;
expected[2] = 0x0;
expected[3] = 0x0;
expected[2] = 0x00;
expected[3] = 0x00;
final msg = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted);
print('Connect Ack - Connection accepted::${msg.toString()}');
Expand All @@ -781,6 +775,24 @@ void main() {
expect(actual[2], expected[2]); // connect ack - compression? always empty
expect(actual[3], expected[3]); // return code.
});
test('Serialisation - Connection accepted - session present', () {
Protocol.version = MqttClientConstants.mqttV311ProtocolVersion;
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
expected[1] = 0x02;
expected[2] = 0x01;
expected[3] = 0x00;
final msg = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted)
.withSessionPresent(true);
print('Connect Ack - Connection accepted::${msg.toString()}');
final actual = MessageSerializationHelper.getMessageBytes(msg);
expect(actual.length, expected.length);
expect(actual[0], expected[0]); // msg type of header
expect(actual[1], expected[1]); // remaining length
expect(actual[2], expected[2]); // connect ack - compression? always empty
expect(actual[3], expected[3]); // return code.
});
test('Serialisation - Unacceptable protocol version', () {
final expected = typed.Uint8Buffer(4);
expected[0] = 0x20;
Expand Down
3 changes: 2 additions & 1 deletion test/support/mqtt_client_mock_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ class MqttMockSocketSimpleConnect extends MockSocket {
mockBytes.addAll(data);
if (initial) {
final ack = MqttConnectAckMessage()
.withReturnCode(MqttConnectReturnCode.connectionAccepted);
.withReturnCode(MqttConnectReturnCode.connectionAccepted)
.withSessionPresent(true);
final buff = Uint8Buffer();
final ms = MqttByteBuffer(buff);
ack.writeTo(ms);
Expand Down
Loading