Skip to content

Commit

Permalink
Issue 516 (#517)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamblett committed Feb 19, 2024
1 parent 026182d commit a3c376d
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions lib/src/mqtt_client_publishing_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ class PublishingManager implements IPublishingManager {
if (pubMsg.header!.qos == MqttQos.atMostOnce) {
// QOS AtMostOnce 0 require no response.
// Send the message for processing to whoever is waiting.
_clientEventBus!.fire(MessageReceived(topic, msg));
_fireMessageReceived(topic, msg);
_notifyPublish(msg);
} else if (pubMsg.header!.qos == MqttQos.atLeastOnce) {
// QOS AtLeastOnce 1 requires an acknowledgement
// Send the message for processing to whoever is waiting.
_clientEventBus!.fire(MessageReceived(topic, msg));
_fireMessageReceived(topic, msg);
_notifyPublish(msg);
// If configured the client will send the acknowledgement, else the user must.
final messageIdentifier = pubMsg.variableHeader!.messageIdentifier;
Expand Down Expand Up @@ -205,7 +205,7 @@ class PublishingManager implements IPublishingManager {
if (pubMsg != null) {
// Send the message for processing to whoever is waiting.
final topic = PublicationTopic(pubMsg.variableHeader!.topicName);
_clientEventBus!.fire(MessageReceived(topic, pubMsg));
_fireMessageReceived(topic, pubMsg);
final compMsg = MqttPublishCompleteMessage()
.withMessageIdentifier(pubMsg.variableHeader!.messageIdentifier);
connectionHandler!.sendMessage(compMsg);
Expand Down Expand Up @@ -244,12 +244,23 @@ class PublishingManager implements IPublishingManager {
return true;
}

/// On publish complete add the message to the published stream if needed
// On publish complete add the message to the published stream if needed
void _notifyPublish(MqttPublishMessage? message) {
if (_published.hasListener && message != null) {
MqttLogger.log(
'PublishingManager::_notifyPublish - adding message to published stream for topic ${message.variableHeader!.topicName}');
_published.add(message);
}
}

// Guarded event bus fire for the received message.
void _fireMessageReceived(PublicationTopic topic, MqttMessage msg) {
if (_clientEventBus != null &&
!_clientEventBus!.streamController.isClosed) {
_clientEventBus?.fire(MessageReceived(topic, msg));
} else {
MqttLogger.log(
'PublishingManager::_fireMessageReceived - event not fired, event bus closed');
}
}
}

0 comments on commit a3c376d

Please sign in to comment.