Skip to content

Conversation

@virajdere
Copy link
Contributor

@virajdere virajdere commented Oct 25, 2025

Adds feature #535

This pull request addresses #535 and introduces the MQTT Entity Sink feature to the cppagent, enabling observations to MQTT brokers using a flat, per-data-item topic structure.

Key changes include:

  • New MqttEntitySink class and integration with agent configuration.
  • Support for all relevant MQTT connection, authentication, and topic options.
  • Documentation and configuration examples for agent setup.

Checklist

  • Existing sinks unchanged
  • All tests passing/No failures
  • Builds cleanly
  • Added additional tests for the feature
  • Existing features remain unchanged/unaffected

Future development directions:

  • Identify and fix any performance bottlenecks for the Entity format sink (currently no performance issues identified for brokers emqx/moquette as a part of initial testing)
  • Add support for persistent buffer for all the data during broker disconnections/restarts.
  • All persisted data should be relayed once network is restored
  • Add support for durableRelay which allows users to configure whether they want to relay the persisted data
  • This feature will be improved further with additional asset and device publishing
Test Config
Sinks {
  MqttEntitySink {
    # MQTT Broker Connection
    MqttHost = localhost
    MqttPort = 1883
    
    # MQTT Authentication (optional)
    # MqttUserName = mtconnect
    # MqttPassword = password123
    
    # MQTT Client ID (optional, auto-generated if not specified)
    # MqttClientId = client_id
    
    # QoS Levels:
    # 0 = At Most Once (fire and forget)
    # 1 = At Least Once (acknowledged delivery) - DEFAULT
    # 2 = Exactly Once (guaranteed delivery)
    # MqttQOS = at_least_once
    
    # Retain Messages
    # When true, broker retains last message for new subscribers
    # MqttRetain = true
    
    # Topic Structure Configuration
    # These use [device] placeholder which gets replaced with device UUID
    ObservationTopicPrefix = MTConnect/Devices/[device]/Observations
    DeviceTopicPrefix = MTConnect/Probe/[device]
    AssetTopicPrefix = MTConnect/Asset/[device]
    MqttLastWillTopic = MTConnect/Probe/[device]/Availability
    
    # TLS Configuration (if needed)
    # MqttTls = true
    # MqttCaCert = GroupCACertificate.pem
    # MqttCert = certificate.pem
    # MqttPrivateKey = privateKey.pem
  }
}

@wsobel

@wsobel wsobel self-requested a review October 27, 2025 16:15
@wsobel
Copy link
Member

wsobel commented Oct 28, 2025

I'll review the code and test locally.

@wsobel
Copy link
Member

wsobel commented Oct 31, 2025

Reviewing issues with build in CI/CD process. Creating new branch on cppagent for testing.

@virajdere
Copy link
Contributor Author

Reviewing issues with build in CI/CD process. Creating new branch on cppagent for testing.

issues probably with the newly added test file mqtt_entity_sink_test.cpp or the underlying logic in mqtt_entity_sink. a bad use of shared pointers? I am currently not able to figure out the issue. a robust review and some code update will be helpful. Feel free to make any changes necessary.

@wsobel
Copy link
Member

wsobel commented Oct 31, 2025 via email

@wsobel
Copy link
Member

wsobel commented Nov 3, 2025

I have created a patch file that should fix the issue with the tests. The issue was the client was being closed after the test had exited and the local objects were not there. I just added a method to close the client before the function exits.

I also ran it through clangformat, so some of the changes are just whitespace related.

PR_571.patch

@wsobel
Copy link
Member

wsobel commented Nov 3, 2025

There are still some issues. Using a heap sanitizer and allocation debugger to test.

@wsobel
Copy link
Member

wsobel commented Nov 3, 2025

Fixed another timing issue when checking for unavailable to exclude conditions and the agent device.

PR_571_2.patch

@wsobel
Copy link
Member

wsobel commented Nov 3, 2025

I created a local branch: feature/mqtt-entity-sink if you want to merge against a branch.

Copy link
Member

@wsobel wsobel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. If it passes, I'll merge and update the version.


### SAMPLE Observation
```json
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try the json entity printer that generates a similar representation for the observations. It also supports all the extended types like data sets and tables. I didn't see that code in the formatter.

What are your thoughts?

}

void TearDown() override {
if (m_client) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to shut the agent down first. Otherwise there are issues. I can send patch.

const std::string& topic,
const std::string& payload) {
receivedJson = json::parse(payload);
if (receivedJson.contains("name"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback is called during cleanup of the client. You need to either shut down the client or remove the handler before exiting this test. Otherwise the json object has been destructed and is no longer viable.

@wsobel wsobel merged commit 77ff518 into mtconnect:main Nov 4, 2025
7 checks passed
@virajdere virajdere deleted the feature/mqtt-entity-sink branch November 7, 2025 05:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants