From 4f2958568a25eb4ded02cc367491b44d73a11977 Mon Sep 17 00:00:00 2001 From: virajdere Date: Thu, 23 Oct 2025 19:02:45 +0000 Subject: [PATCH 1/9] added feature that publishes observations in a per-data-item topic structure --- agent_lib/CMakeLists.txt | 16 + src/mtconnect/configuration/agent_config.cpp | 2 + .../configuration/config_options.hpp | 3 + .../mqtt_entity_sink/mqtt_entity_sink.cpp | 525 ++++++++++++++++++ .../mqtt_entity_sink/mqtt_entity_sink.hpp | 121 ++++ 5 files changed, 667 insertions(+) create mode 100644 src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp create mode 100644 src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 37cfdef25..343a2c1c8 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -256,6 +256,22 @@ set(AGENT_SOURCES #src/sink/mqtt_sink SOURCE_FILES_ONLY "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" + +# src/sink/mqtt_sink HEADER_FILE_ONLY + + "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp" + +#src/sink/mqtt_sink SOURCE_FILES_ONLY + + "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" + +# src/sink/mqtt_entity_sink HEADER_FILE_ONLY + + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" + +#src/sink/mqtt_entity_sink SOURCE_FILES_ONLY + + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp" # src/sink/rest_sink HEADER_FILE_ONLY diff --git a/src/mtconnect/configuration/agent_config.cpp b/src/mtconnect/configuration/agent_config.cpp index 48ee6f8b2..90941458a 100644 --- a/src/mtconnect/configuration/agent_config.cpp +++ b/src/mtconnect/configuration/agent_config.cpp @@ -59,6 +59,7 @@ #include "mtconnect/device_model/device.hpp" #include "mtconnect/printer/xml_printer.hpp" #include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" #include "mtconnect/sink/rest_sink/rest_service.hpp" #include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp" #include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp" @@ -111,6 +112,7 @@ namespace mtconnect::configuration { sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory); sink::rest_sink::RestService::registerFactory(m_sinkFactory); + sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory); adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory); adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory); adapter::agent_adapter::AgentAdapter::registerFactory(m_sourceFactory); diff --git a/src/mtconnect/configuration/config_options.hpp b/src/mtconnect/configuration/config_options.hpp index b192c70d4..e2ac36aec 100644 --- a/src/mtconnect/configuration/config_options.hpp +++ b/src/mtconnect/configuration/config_options.hpp @@ -107,6 +107,9 @@ namespace mtconnect { DECLARE_CONFIGURATION(MqttMaxTopicDepth); DECLARE_CONFIGURATION(MqttLastWillTopic); DECLARE_CONFIGURATION(MqttXPath); + DECLARE_CONFIGURATION(ObservationTopicPrefix); + DECLARE_CONFIGURATION(DeviceTopicPrefix); + DECLARE_CONFIGURATION(AssetTopicPrefix); ///@} /// @name Adapter Configuration diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp new file mode 100644 index 000000000..488e50ba8 --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -0,0 +1,525 @@ +#include "mqtt_entity_sink.hpp" + +#include +#include + +#include + +#include "mtconnect/configuration/config_options.hpp" +#include "mtconnect/entity/entity.hpp" +#include "mtconnect/mqtt/mqtt_client_impl.hpp" +#include "mtconnect/observation/observation.hpp" + +using ptree = boost::property_tree::ptree; +using json = nlohmann::json; + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::asset; +using namespace mtconnect::observation; + +namespace asio = boost::asio; +namespace config = ::mtconnect::configuration; + +namespace mtconnect { + namespace sink { + namespace mqtt_entity_sink { + + MqttEntitySink::MqttEntitySink(boost::asio::io_context& context, + sink::SinkContractPtr&& contract, const ConfigOptions& options, + const ptree& config) + : Sink("MqttEntitySink", std::move(contract)), + m_context(context), + m_strand(context), + m_options(options) + { + GetOptions(config, m_options, options); + + AddOptions(config, m_options, + {{configuration::MqttCaCert, string()}, + {configuration::MqttPrivateKey, string()}, + {configuration::MqttCert, string()}, + {configuration::MqttClientId, string()}, + {configuration::MqttUserName, string()}, + {configuration::MqttPassword, string()}}); + + AddDefaultedOptions( + config, m_options, + {{configuration::MqttHost, "127.0.0.1"s}, + {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, + {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, + {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, + {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, + {configuration::MqttPort, 1883}, + {configuration::MqttTls, false}, + {configuration::MqttQOS, 1}, + {configuration::MqttRetain, false}}); + + m_observationTopicPrefix = get(m_options[configuration::ObservationTopicPrefix]); + m_deviceTopicPrefix = get(m_options[configuration::DeviceTopicPrefix]); + m_assetTopicPrefix = get(m_options[configuration::AssetTopicPrefix]); + } + + void MqttEntitySink::start() + { + if (!m_client) + { + auto clientHandler = make_unique(); + clientHandler->m_connected = [this](shared_ptr client) { + LOG(debug) << "MqttEntitySink: Client connected to broker"; + client->connectComplete(); + + auto agentDevice = m_sinkContract->getDeviceByName("Agent"); + if (agentDevice) + { + auto lwtTopic = get(m_options[configuration::MqttLastWillTopic]); + boost::replace_all(lwtTopic, "[device]", *agentDevice->getUuid()); + + // Get QoS and Retain from options + auto qos = static_cast( + GetOption(m_options, configuration::MqttQOS).value_or(1)); + bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); + LOG(debug) << "Publishing availability to: " << lwtTopic; + + client->publish(lwtTopic, "AVAILABLE", retain, qos); + } + + // Publish initial content + publishInitialContent(); + }; + + auto agentDevice = m_sinkContract->getDeviceByName("Agent"); + auto lwtTopic = get(m_options[configuration::MqttLastWillTopic]); + if (agentDevice) + { + boost::replace_all(lwtTopic, "[device]", *agentDevice->getUuid()); + } + m_lastWillTopic = lwtTopic; + + if (IsOptionSet(m_options, configuration::MqttTls)) + { + m_client = make_shared(m_context, m_options, std::move(clientHandler), + m_lastWillTopic, "UNAVAILABLE"s); + } + else + { + m_client = make_shared(m_context, m_options, std::move(clientHandler), + m_lastWillTopic, "UNAVAILABLE"s); + } + } + LOG(debug) << "Starting MQTT Entity Sink client"; + m_client->start(); + } + + void MqttEntitySink::stop() + { + if (m_client) + { + // Publish UNAVAILABLE before disconnecting + if (m_client->isConnected()) + { + auto qos = static_cast( + GetOption(m_options, configuration::MqttQOS).value_or(1)); + m_client->publish(m_lastWillTopic, "UNAVAILABLE", true, qos); + } + m_client->stop(); + } + } + + void MqttEntitySink::publishInitialContent() + { + LOG(debug) << "MqttEntitySink: Publishing initial content"; + + // Publish all devices + int deviceCount = 0; + for (auto& dev : m_sinkContract->getDevices()) + { + publish(dev); + deviceCount++; + } + LOG(debug) << "Published " << deviceCount << " devices"; + + // Publish current observations for all devices + int obsCount = 0; + for (auto& dev : m_sinkContract->getDevices()) + { + auto& buffer = m_sinkContract->getCircularBuffer(); + std::lock_guard lock(buffer); + + auto latest = buffer.getLatest(); + observation::ObservationList observations; + + for (auto& di : dev->getDeviceDataItems()) + { + auto dataItem = di.lock(); + if (dataItem) + { + auto obs = latest.getObservation(dataItem->getId()); + if (obs) + { + observations.push_back(obs); + } + } + } + + // Publish each observation + for (auto& obs : observations) + { + auto obsCopy = obs; + if (m_client && m_client->isConnected()) + { + publish(obsCopy); + obsCount++; + } + else + { + std::lock_guard lock(m_queueMutex); + if (m_queuedObservations.size() >= MAX_QUEUE_SIZE) + { + m_queuedObservations.erase(m_queuedObservations.begin()); + } + m_queuedObservations.push_back(obsCopy); + obsCount++; + } + } + } + LOG(debug) << "Published " << obsCount << " initial observations"; + } + + std::string MqttEntitySink::formatTimestamp(const Timestamp& timestamp) + { + using namespace date; + using namespace std::chrono; + + std::ostringstream oss; + oss << date::format("%FT%TZ", floor(timestamp)); + return oss.str(); + } + + std::string MqttEntitySink::getObservationValue( + const observation::ObservationPtr& observation) + { + if (observation->isUnavailable()) + { + return "UNAVAILABLE"; + } + + auto value = observation->getValue(); + + if (holds_alternative(value)) + { + return get(value); + } + else if (holds_alternative(value)) + { + return std::to_string(get(value)); + } + else if (holds_alternative(value)) + { + std::ostringstream oss; + oss << std::fixed << std::setprecision(6) << get(value); + return oss.str(); + } + else if (holds_alternative(value)) + { + auto& vec = get(value); + std::ostringstream oss; + for (size_t i = 0; i < vec.size(); ++i) + { + if (i > 0) + oss << " "; + oss << std::fixed << std::setprecision(6) << vec[i]; + } + return oss.str(); + } + else if (holds_alternative(value)) + { + // For DataSet, return as JSON string + json j = json::object(); + auto& ds = get(value); + for (auto& entry : ds) + { + // Convert the variant value to appropriate JSON type + if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + else if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + else if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + } + return j.dump(); + } + + return "UNAVAILABLE"; + } + + std::string MqttEntitySink::formatObservationJson( + const observation::ObservationPtr& observation) + { + json j; + + try + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + LOG(error) << "Observation has no data item"; + return "{}"; + } + + j["dataItemId"] = dataItem->getId(); + + const auto& name = dataItem->getName(); + if (name && !name->empty()) + { + j["name"] = *name; + } + + j["type"] = dataItem->getType(); + + auto subType = dataItem->maybeGet("subType"); + if (subType && !subType->empty()) + { + j["subType"] = *subType; + } + + j["timestamp"] = formatTimestamp(observation->getTimestamp()); + + // Get the category + auto category = dataItem->getCategory(); + if (category == device_model::data_item::DataItem::SAMPLE) + { + j["category"] = "SAMPLE"; + } + else if (category == device_model::data_item::DataItem::EVENT) + { + j["category"] = "EVENT"; + } + else if (category == device_model::data_item::DataItem::CONDITION) + { + j["category"] = "CONDITION"; + } + + // Add the result/value + j["result"] = getObservationValue(observation); + + // Add sequence number + j["sequence"] = observation->getSequence(); + + auto result = j.dump(); + LOG(trace) << "Formatted observation JSON: " << result; + return result; + } + catch (const std::exception& e) + { + LOG(error) << "Exception formatting observation: " << e.what(); + return "{}"; + } + } + + std::string MqttEntitySink::formatConditionJson(const observation::ConditionPtr& condition) + { + json j; + + auto dataItem = condition->getDataItem(); + if (!dataItem) + { + return "{}"; + } + + j["dataItemId"] = dataItem->getId(); + + const auto& name = dataItem->getName(); + if (name && !name->empty()) + { + j["name"] = *name; + } + + j["type"] = dataItem->getType(); + + auto subType = dataItem->maybeGet("subType"); + if (subType && !subType->empty()) + { + j["subType"] = *subType; + } + + j["timestamp"] = formatTimestamp(condition->getTimestamp()); + j["category"] = "CONDITION"; + + // Add condition-specific fields + switch (condition->getLevel()) + { + case Condition::NORMAL: + j["level"] = "NORMAL"; + break; + case Condition::WARNING: + j["level"] = "WARNING"; + break; + case Condition::FAULT: + j["level"] = "FAULT"; + break; + case Condition::UNAVAILABLE: + j["level"] = "UNAVAILABLE"; + break; + } + + // Add native code if present + if (condition->hasProperty("nativeCode")) + { + j["nativeCode"] = condition->get("nativeCode"); + } + + // Add condition ID if present + if (!condition->getCode().empty()) + { + j["conditionId"] = condition->getCode(); + } + + // Add message/value if present + if (condition->hasValue()) + { + j["message"] = getObservationValue(condition); + } + + // Add sequence number + j["sequence"] = condition->getSequence(); + + return j.dump(); + } + + std::string MqttEntitySink::getObservationTopic( + const observation::ObservationPtr& observation) + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + return ""; + } + + auto device = dataItem->getComponent()->getDevice(); + if (!device) + { + return ""; + } + + std::string topic = m_observationTopicPrefix; + boost::replace_all(topic, "[device]", *device->getUuid()); + + // Append data item ID for flat structure + topic += "/" + dataItem->getId(); + + return topic; + } + + bool MqttEntitySink::publish(observation::ObservationPtr& observation) + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + LOG(warning) << "MqttEntitySink::publish: Observation has no data item"; + return false; + } + + if (!m_client || !m_client->isConnected()) + { + std::lock_guard lock(m_queueMutex); + if (m_queuedObservations.size() >= MAX_QUEUE_SIZE) + { + LOG(warning) << "MqttEntitySink::publish: Observation queue full (" << MAX_QUEUE_SIZE + << "), dropping oldest observation for " + << m_queuedObservations.front()->getDataItem()->getId(); + m_queuedObservations.erase(m_queuedObservations.begin()); + } + LOG(debug) << "MqttEntitySink::publish: Client not connected, queuing observation for " + << dataItem->getId(); + m_queuedObservations.push_back(observation); + return false; + } + + std::string topic = getObservationTopic(observation); + if (topic.empty()) + { + LOG(warning) << "MqttEntitySink::publish: Empty topic for " << dataItem->getId(); + return false; + } + + // Get QoS setting + auto qos = static_cast( + GetOption(m_options, configuration::MqttQOS).value_or(1)); + bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); + + try + { + auto condition = dynamic_pointer_cast(observation); + if (condition) + { + observation::ConditionList condList; + condition->getFirst()->getConditionList(condList); + + for (auto& cond : condList) + { + std::string payload = formatConditionJson(cond); + LOG(debug) << "Publishing condition to: " << topic + << ", payload size: " << payload.size(); + m_client->publish(topic, payload, retain, qos); + } + } + else + { + std::string payload = formatObservationJson(observation); + LOG(debug) << "Publishing observation to: " << topic << ", size: " << payload.size(); + m_client->publish(topic, payload, retain, qos); + } + + return true; + } + catch (const std::exception& e) + { + LOG(error) << "Exception publishing observation: " << e.what(); + return false; + } + } + + bool MqttEntitySink::publish(device_model::DevicePtr device) + { + if (!m_client || !m_client->isConnected()) + { + return false; + } + + // For device, we could publish the device XML or JSON + // For now, just return true as device publishing is optional + return true; + } + + bool MqttEntitySink::publish(asset::AssetPtr asset) + { + if (!m_client || !m_client->isConnected()) + { + return false; + } + + // Asset publishing can be added here if needed + return true; + } + + void MqttEntitySink::registerFactory(SinkFactory& factory) + { + factory.registerFactory( + "MqttEntitySink", + [](const std::string& name, boost::asio::io_context& io, SinkContractPtr&& contract, + const ConfigOptions& options, const boost::property_tree::ptree& block) -> SinkPtr { + auto sink = std::make_shared(io, std::move(contract), options, block); + return sink; + }); + } + + } // namespace mqtt_entity_sink + } // namespace sink +} // namespace mtconnect diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp new file mode 100644 index 000000000..a0ae72f15 --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -0,0 +1,121 @@ +#pragma once + +#include "boost/asio/io_context.hpp" +#include + +#include + +#include "mtconnect/buffer/checkpoint.hpp" +#include "mtconnect/config.hpp" +#include "mtconnect/configuration/agent_config.hpp" +#include "mtconnect/mqtt/mqtt_client.hpp" +#include "mtconnect/observation/observation.hpp" +#include "mtconnect/sink/sink.hpp" +#include "mtconnect/utilities.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::mqtt_client; +using json = nlohmann::json; + +namespace mtconnect { + namespace sink { + namespace mqtt_entity_sink { + + /// @brief MTConnect Entity MQTT Sink - publishes observations per data item + class AGENT_LIB_API MqttEntitySink : public sink::Sink + { + public: + /// @brief Create an MQTT Entity Sink + /// @param context the boost asio io_context + /// @param contract the Sink Contract from the agent + /// @param options configuration options + /// @param config additional configuration options + MqttEntitySink(boost::asio::io_context& context, sink::SinkContractPtr&& contract, + const ConfigOptions& options, const boost::property_tree::ptree& config); + + ~MqttEntitySink() = default; + + // Sink Methods + /// @brief Start the MQTT Entity service + void start() override; + + /// @brief Shutdown the MQTT Entity service + void stop() override; + + /// @brief Receive an observation and publish it + /// @param observation shared pointer to the observation + /// @return `true` if the publishing was successful + bool publish(observation::ObservationPtr& observation) override; + + /// @brief Receive an asset + /// @param asset shared point to the asset + /// @return `true` if successful + bool publish(asset::AssetPtr asset) override; + + /// @brief Receive a device + /// @param device shared pointer to the device + /// @return `true` if successful + bool publish(device_model::DevicePtr device) override; + + /// @brief Register the Sink factory to create this sink + /// @param factory + static void registerFactory(SinkFactory& factory); + + /// @brief Gets the MQTT Client + /// @return MqttClient + std::shared_ptr getClient() { return m_client; } + + /// @brief Check if MQTT Client is Connected + /// @return `true` when the client is connected + bool isConnected() { return m_client && m_client->isConnected(); } + + protected: + /// @brief Format observation as JSON matching MTConnect.NET format + /// @param observation the observation to format + /// @return JSON string + std::string formatObservationJson(const observation::ObservationPtr& observation); + + /// @brief Format condition observation as JSON + /// @param condition the condition observation + /// @return JSON string + std::string formatConditionJson(const observation::ConditionPtr& condition); + + /// @brief Get topic for observation using flat structure + /// @param observation the observation + /// @return formatted topic string + std::string getObservationTopic(const observation::ObservationPtr& observation); + + /// @brief Get value from observation as string + /// @param observation the observation + /// @return value as string + std::string getObservationValue(const observation::ObservationPtr& observation); + + /// @brief Publish initial device and current observations + void publishInitialContent(); + + /// @brief Convert timestamp to ISO 8601 format + /// @param timestamp the timestamp + /// @return ISO 8601 string + std::string formatTimestamp(const Timestamp& timestamp); + + protected: + static constexpr size_t MAX_QUEUE_SIZE = 10000; // Maximum queued observations + + std::string m_observationTopicPrefix; //! Observation topic prefix + std::string m_deviceTopicPrefix; //! Device topic prefix + std::string m_assetTopicPrefix; //! Asset topic prefix + std::string m_lastWillTopic; //! Topic to publish last will + + boost::asio::io_context& m_context; + boost::asio::io_context::strand m_strand; + + ConfigOptions m_options; + + std::shared_ptr m_client; + std::vector m_queuedObservations; + std::mutex m_queueMutex; + }; + } // namespace mqtt_entity_sink + } // namespace sink +} // namespace mtconnect From f1629206e71c8db8c897e6c6fcce51be72c07659 Mon Sep 17 00:00:00 2001 From: virajdere Date: Fri, 24 Oct 2025 21:53:44 +0000 Subject: [PATCH 2/9] updated qos config and added tests with feature documentation --- README.md | 4 + schemas/cfg.schema.json | 67 ++ src/mtconnect/mqtt/mqtt_server_impl.hpp | 11 +- src/mtconnect/sink/mqtt_entity_sink/README.md | 243 ++++++ .../mqtt_entity_sink/mqtt_entity_sink.cpp | 46 +- .../mqtt_entity_sink/mqtt_entity_sink.hpp | 17 + test_package/CMakeLists.txt | 1 + test_package/agent_test_helper.hpp | 22 + test_package/mqtt_entity_sink_test.cpp | 706 ++++++++++++++++++ 9 files changed, 1106 insertions(+), 11 deletions(-) create mode 100644 src/mtconnect/sink/mqtt_entity_sink/README.md create mode 100644 test_package/mqtt_entity_sink_test.cpp diff --git a/README.md b/README.md index da2f859ce..e1647b101 100755 --- a/README.md +++ b/README.md @@ -904,6 +904,10 @@ Sinks { *Default*: All data items +## MQTT Entity Sink Documentation + +For detailed configuration, usage, and message format for the MQTT Entity Sink, see: [docs: MTConnect MQTT Entity Sink](src/mtconnect/sink/mqtt_entity_sink/README.md) + ### Adapter Configuration Items ### * `Adapters` - Contains a list of device blocks. If there are no Adapters diff --git a/schemas/cfg.schema.json b/schemas/cfg.schema.json index 182341b3a..b81c9c9bf 100644 --- a/schemas/cfg.schema.json +++ b/schemas/cfg.schema.json @@ -314,6 +314,73 @@ "enum":["at_least_once", "at_most_once", "exactly_once"] } } + }, + "MqttEntitySink": { + "type": "object", + "properties": { + "MqttHost": { + "type": "string", + "description": "IP Address or name of the MQTT Broker", + "default": "127.0.0.1" + }, + "MqttPort": { + "type": "integer", + "description": "Port number of MQTT Broker", + "default": 1883 + }, + "MqttTls": { + "type": "boolean", + "description": "TLS Certificate for secure connection to the MQTT Broker", + "default": false + }, + "MqttUserName": { + "type": "string", + "description": "Username for MQTT authentication" + }, + "MqttPassword": { + "type": "string", + "description": "Password for MQTT authentication" + }, + "MqttClientId": { + "type": "string", + "description": "MQTT client identifier" + }, + "MqttQOS": { + "type": "string", + "description": "The quality of service level for the MQTT connection. Options are: at_least_once, at_most_once, and exactly_once", + "default": "at_least_once", + "enum": [ + "at_least_once", + "at_most_once", + "exactly_once" + ] + }, + "MqttRetain": { + "type": "boolean", + "description": "Retain the last message sent to the broker", + "default": false + }, + "ObservationTopicPrefix": { + "type": "string", + "description": "Prefix for the Observations topic", + "default": "MTConnect/Devices/[device]/Observations" + }, + "DeviceTopicPrefix": { + "type": "string", + "description": "Prefix for the Device topic", + "default": "MTConnect/Probe/[device]" + }, + "AssetTopicPrefix": { + "type": "string", + "description": "Prefix for the Asset topic", + "default": "MTConnect/Asset/[device]" + }, + "MqttLastWillTopic": { + "type": "string", + "description": "The topic used for the last will and testament for an agent", + "default": "MTConnect/Probe/[device]/Availability" + } + } } } }, diff --git a/src/mtconnect/mqtt/mqtt_server_impl.hpp b/src/mtconnect/mqtt/mqtt_server_impl.hpp index ce8a9f621..b962d14ab 100644 --- a/src/mtconnect/mqtt/mqtt_server_impl.hpp +++ b/src/mtconnect/mqtt/mqtt_server_impl.hpp @@ -26,6 +26,7 @@ #include #include #include +#include #include "mqtt_server.hpp" #include "mtconnect/configuration/config_options.hpp" @@ -217,12 +218,12 @@ namespace mtconnect { LOG(debug) << "Server topic_name: " << topic_name; LOG(debug) << "Server contents: " << contents; - auto const &idx = m_subs.get(); - auto r = idx.equal_range(topic_name); - for (; r.first != r.second; ++r.first) + for (const auto& sub : m_subs) { - r.first->con->publish(topic_name, contents, - std::min(r.first->qos_value, pubopts.get_qos())); + if (mqtt::broker::compare_topic_filter(sub.topic, topic_name)) + { + sub.con->publish(topic_name, contents, std::min(sub.qos_value, pubopts.get_qos())); + } } return true; diff --git a/src/mtconnect/sink/mqtt_entity_sink/README.md b/src/mtconnect/sink/mqtt_entity_sink/README.md new file mode 100644 index 000000000..c01d2eea7 --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/README.md @@ -0,0 +1,243 @@ +# MTConnect MQTT Entity Sink + +## Overview + +The MQTT Entity Sink publishes MTConnect observations to an external MQTT broker in a flat, per-data-item topic structure compatible with MTConnect.NET's MQTT Relay Entity format. + +## Features + +- **Real-time streaming** of observations +- **Flat topic structure**: `MTConnect/Devices/[device-uuid]/Observations/[dataItemId]` +- **JSON format** with full observation metadata +- **ISO 8601 timestamps** +- **Supports SAMPLE, EVENT, and CONDITION observations** +- **Last Will Testament** for availability +- **TLS/SSL support** for secure communication +- **Configurable QoS levels** for MQTT message delivery +- **Message retention** support for new subscribers + +## Quick Start + +### 1. Configuration + +Add to `agent.cfg`: + +```properties +Sinks { + MqttEntitySink { + MqttHost = localhost + MqttPort = 1883 + MqttTls = false + + # QoS levels (string values): + # at_most_once = Fire and forget + # at_least_once = Acknowledged delivery (default) + # exactly_once = Guaranteed delivery + MqttQOS = at_least_once + + # Retain last message on broker for new subscribers + MqttRetain = false + } +} +``` + +## Configuration Options + +### Connection Settings + +| Parameter | Type | Default | Description | +|-------------------|---------|-------------|-----------------------------------------------| +| **MqttHost** | string | `127.0.0.1` | MQTT broker hostname or IP address | +| **MqttPort** | integer | `1883` | MQTT broker port (1883 for TCP, 8883 for TLS) | +| **MqttTls** | boolean | `false` | Enable TLS encryption | +| **MqttClientId** | string | auto-gen | MQTT client identifier (unique per connection) | + +### Authentication + +| Parameter | Type | Default | Description | +|-------------------|---------|---------|-----------------------------------------------| +| **MqttUserName** | string | - | Username for MQTT authentication (optional) | +| **MqttPassword** | string | - | Password for MQTT authentication (optional) | + +### TLS/SSL Settings + +| Parameter | Type | Default | Description | +|-------------------|---------|---------|-----------------------------------------------| +| **MqttCaCert** | string | - | Path to CA certificate for TLS verification | +| **MqttCert** | string | - | Path to client certificate for mutual TLS | +| **MqttPrivateKey**| string | - | Path to client private key | + +### Topic Configuration + +| Parameter | Type | Default | Description | +|--------------------------|---------|-------------------------------------------|----------------------------------------------| +| **ObservationTopicPrefix**| string | `MTConnect/Devices/[device]/Observations` | Topic prefix for observations. `[device]` is replaced with device UUID | +| **DeviceTopicPrefix** | string | `MTConnect/Probe/[device]` | Topic prefix for device models | +| **AssetTopicPrefix** | string | `MTConnect/Asset/[device]` | Topic prefix for assets | +| **MqttLastWillTopic** | string | `MTConnect/Probe/[device]/Availability` | Last Will Testament topic for availability | + +### MQTT Protocol Settings + +| Parameter | Type | Default | Description | +|---------------|---------|-----------------|---------------------------------------------| +| **MqttQOS** | string | `at_least_once` | Quality of Service level for MQTT messages | +| **MqttRetain**| boolean | `false` | Retain messages on the broker for new subscribers | + +#### MqttQOS Values + +The Quality of Service (QoS) level determines how messages are delivered: + +- **at_most_once** (Fire and forget) + - Fastest delivery + - No acknowledgment required + - Message may be lost if network fails + - **Use case**: High-frequency data where occasional loss is acceptable + +- **at_least_once** (Default) + - Guaranteed delivery with acknowledgment + - Messages may be duplicated + - Balance of reliability and performance + - **Use case**: General-purpose MTConnect data streaming + +- **exactly_once** + - Guaranteed exactly-once delivery + - Highest reliability, slowest performance + - Four-way handshake protocol + - **Use case**: Critical alarms, conditions, or important events + +```properties +# Example: High-reliability configuration for critical data +MqttQOS = exactly_once +``` + +#### MqttRetain Flag + +The retain flag determines if the broker stores the last message for each topic: + +- **false** (Default) + - Messages are not retained + - New subscribers only receive future messages + - Lower broker storage requirements + - **Use case**: High-frequency streaming data + +- **true** + - Broker stores the last message per topic + - New subscribers immediately receive last known value + - Useful for status and current values + - **Use case**: Availability, current state, alarms + +```properties +# Example: Retain messages for immediate status updates +MqttRetain = true +``` + +**⚠️ Important Notes:** +- Retained messages persist on the broker until explicitly cleared +- Use retention sparingly with high-frequency data to avoid broker storage issues +- Retained availability messages help dashboards show immediate device status + +## Complete Configuration Example + +```properties +Sinks { + MqttEntitySink { + MqttHost = localhost + MqttPort = 1883 + MqttTls = false + MqttQOS = at_least_once + MqttRetain = false + ObservationTopicPrefix = MTConnect/Devices/[device]/Observations + DeviceTopicPrefix = MTConnect/Probe/[device] + AssetTopicPrefix = MTConnect/Asset/[device] + MqttLastWillTopic = MTConnect/Probe/[device]/Availability + } +} +``` + +## Message Format + +### SAMPLE Observation +```json +{ + "dataItemId": "Xact", + "name": "X_actual", + "type": "POSITION", + "subType": "ACTUAL", + "category": "SAMPLE", + "timestamp": "2025-10-20T15:30:45.123456Z", + "result": "150.500000", + "sequence": 1234 +} +``` + +### EVENT Observation +```json +{ + "dataItemId": "mode", + "name": "ControllerMode", + "type": "CONTROLLER_MODE", + "category": "EVENT", + "timestamp": "2025-10-20T15:30:46.000000Z", + "result": "AUTOMATIC", + "sequence": 1235 +} +``` + +### CONDITION Observation +```json +{ + "dataItemId": "system", + "name": "SystemCondition", + "type": "SYSTEM", + "category": "CONDITION", + "level": "FAULT", + "nativeCode": "E001", + "conditionId": "E001", + "message": "Hydraulic pressure low", + "timestamp": "2025-10-20T15:30:47.000000Z", + "sequence": 1236 +} +``` + +## Topic Structure + +``` +MTConnect/ +├── Devices/ +│ └── {device-uuid}/ +│ └── Observations/ +│ ├── {dataItemId1} → Observation JSON +│ ├── {dataItemId2} → Observation JSON +│ └── ... +└── Probe/ + └── {device-uuid}/ + └── Availability → AVAILABLE/UNAVAILABLE (LWT) +``` + +**Example Topics:** +``` +MTConnect/Devices/OKUMA.123456/Observations/Xact +MTConnect/Devices/OKUMA.123456/Observations/avail +MTConnect/Probe/OKUMA.123456/Availability +``` + +## Performance Considerations + +### QoS Impact + +- **at_most_once**: ~2x faster than at_least_once, best for high-frequency sampling data +- **at_least_once**: Balanced performance, suitable for most use cases +- **exactly_once**: ~2x slower than at_least_once, use only for critical data + +### Retain Flag Impact + +- **Without Retain**: Minimal broker storage, suitable for streaming +- **With Retain**: Increases broker storage per topic, use selectively for: + - Availability status + - Current operating mode + - Alarm states + - Device health indicators + +**💡 Recommendation**: Use `MqttRetain = true` only for low-frequency, stateful data. + +--- diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp index 488e50ba8..8c288919a 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -1,3 +1,20 @@ +// +// Copyright Copyright 2009-2023, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + #include "mqtt_entity_sink.hpp" #include @@ -60,6 +77,26 @@ namespace mtconnect { m_assetTopicPrefix = get(m_options[configuration::AssetTopicPrefix]); } + static MqttClient::QOS parseQos(const ConfigOptions& options) + { + if (auto qosInt = GetOption(options, configuration::MqttQOS)) + { + return *qosInt == 0 ? MqttClient::QOS::at_most_once + : *qosInt == 2 ? MqttClient::QOS::exactly_once + : MqttClient::QOS::at_least_once; + } + + if (auto qosStr = GetOption(options, configuration::MqttQOS)) + { + if (*qosStr == "at_most_once" || *qosStr == "0") + return MqttClient::QOS::at_most_once; + if (*qosStr == "exactly_once" || *qosStr == "2") + return MqttClient::QOS::exactly_once; + } + + return MqttClient::QOS::at_least_once; + } + void MqttEntitySink::start() { if (!m_client) @@ -76,8 +113,7 @@ namespace mtconnect { boost::replace_all(lwtTopic, "[device]", *agentDevice->getUuid()); // Get QoS and Retain from options - auto qos = static_cast( - GetOption(m_options, configuration::MqttQOS).value_or(1)); + auto qos = parseQos(m_options); bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); LOG(debug) << "Publishing availability to: " << lwtTopic; @@ -118,8 +154,7 @@ namespace mtconnect { // Publish UNAVAILABLE before disconnecting if (m_client->isConnected()) { - auto qos = static_cast( - GetOption(m_options, configuration::MqttQOS).value_or(1)); + auto qos = parseQos(m_options); m_client->publish(m_lastWillTopic, "UNAVAILABLE", true, qos); } m_client->stop(); @@ -450,8 +485,7 @@ namespace mtconnect { } // Get QoS setting - auto qos = static_cast( - GetOption(m_options, configuration::MqttQOS).value_or(1)); + auto qos = parseQos(m_options); bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); try diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp index a0ae72f15..34e248ad6 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -1,3 +1,20 @@ +// +// Copyright Copyright 2009-2023, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + #pragma once #include "boost/asio/io_context.hpp" diff --git a/test_package/CMakeLists.txt b/test_package/CMakeLists.txt index 2cd035cc7..2dc65edb9 100644 --- a/test_package/CMakeLists.txt +++ b/test_package/CMakeLists.txt @@ -297,6 +297,7 @@ add_agent_test(table TRUE observation) add_agent_test(checkpoint FALSE buffer) add_agent_test(circular_buffer FALSE buffer) +add_agent_test(mqtt_entity_sink FALSE sink/mqtt_entity_sink TRUE) if (WITH_RUBY) diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 45ff79c00..fcbeb0402 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -29,6 +29,7 @@ #include "mtconnect/configuration/agent_config.hpp" #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/pipeline/pipeline.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" #include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" #include "mtconnect/sink/rest_sink/response.hpp" #include "mtconnect/sink/rest_sink/rest_service.hpp" @@ -169,6 +170,13 @@ class AgentTestHelper return rest; } + std::shared_ptr getMqttEntitySink() + { + using namespace mtconnect::sink::mqtt_entity_sink; + auto sink = m_agent->findSink("MqttEntitySink"); + return std::dynamic_pointer_cast(sink); + } + std::shared_ptr getMqttService() { using namespace mtconnect; @@ -189,6 +197,7 @@ class AgentTestHelper sink::rest_sink::RestService::registerFactory(m_sinkFactory); sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory); + sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory); source::adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory); ConfigOptions options = ops; @@ -230,6 +239,17 @@ class AgentTestHelper m_agent->addSink(m_mqttService); } + if (HasOption(options, "MqttEntitySink")) + { + auto mqttEntityContract = m_agent->makeSinkContract(); + mqttEntityContract->m_pipelineContext = m_context; + auto mqttEntitySink = m_sinkFactory.make("MqttEntitySink", "MqttEntitySink", m_ioContext, + std::move(mqttEntityContract), options, ptree {}); + m_mqttEntitySink = + std::dynamic_pointer_cast(mqttEntitySink); + m_agent->addSink(m_mqttEntitySink); + } + m_agent->initialize(m_context); if (observe) @@ -303,6 +323,8 @@ class AgentTestHelper std::shared_ptr m_context; std::shared_ptr m_adapter; std::shared_ptr m_mqttService; + std::shared_ptr + m_mqttEntitySink; std::shared_ptr m_restService; std::shared_ptr m_loopback; diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp new file mode 100644 index 000000000..7ea003c7a --- /dev/null +++ b/test_package/mqtt_entity_sink_test.cpp @@ -0,0 +1,706 @@ +// +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology ("AMT") +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Ensure that gtest is the first header otherwise Windows raises an error +#include +// Keep this comment to keep gtest.h above. (clang-format off/on is not working here!) + +#include +#include + +#include + +#include "agent_test_helper.hpp" +#include "json_helper.hpp" +#include "mtconnect/buffer/checkpoint.hpp" +#include "mtconnect/device_model/data_item/data_item.hpp" +#include "mtconnect/entity/entity.hpp" +#include "mtconnect/entity/json_parser.hpp" +#include "mtconnect/mqtt/mqtt_client_impl.hpp" +#include "mtconnect/mqtt/mqtt_server_impl.hpp" +#include "mtconnect/printer/json_printer.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" +#include "test_utilities.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::device_model::data_item; +using namespace mtconnect::sink::mqtt_entity_sink; +using namespace mtconnect::asset; +using namespace mtconnect::configuration; + +// main +int main(int argc, char* argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +using json = nlohmann::json; + +class MqttEntitySinkTest : public testing::Test +{ +protected: + std::unique_ptr m_agentTestHelper; + std::shared_ptr m_server; + std::shared_ptr m_client; + std::unique_ptr m_jsonPrinter; + uint16_t m_port {0}; + + void SetUp() override { + m_agentTestHelper = std::make_unique(); + m_jsonPrinter = std::make_unique(2, true); + } + + void TearDown() override { + if (m_client) { + m_client->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_client.reset(); + } + if (m_server) { + m_server->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_server.reset(); + } + m_agentTestHelper.reset(); + m_jsonPrinter.reset(); + } + + void createAgent(std::string testFile = {}, ConfigOptions options = {}) { + if (testFile == "") + testFile = "/samples/test_config.xml"; + ConfigOptions opts(options); + MergeOptions( + opts, + { + {"MqttEntitySink", true}, + {configuration::MqttPort, m_port}, + {MqttCurrentInterval, 200ms}, + {MqttSampleInterval, 100ms}, + {configuration::MqttHost, "127.0.0.1"s}, + {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, + {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, + {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, + {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, + }); + m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); + addAdapter(); + m_agentTestHelper->getAgent()->start(); + } + + void createServer(const ConfigOptions& options) { + using namespace mtconnect::configuration; + ConfigOptions opts(options); + MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, + {MqttPort, 0}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); + m_server = std::make_shared(m_agentTestHelper->m_ioContext, opts); + } + + template + bool waitFor(const chrono::duration& time, function pred) { + boost::asio::steady_timer timer(m_agentTestHelper->m_ioContext); + timer.expires_after(time); + bool timeout = false; + timer.async_wait([&timeout](boost::system::error_code ec) { + if (!ec) timeout = true; + }); + while (!timeout && !pred()) { + m_agentTestHelper->m_ioContext.run_for(100ms); + } + timer.cancel(); + return pred(); + } + + void startServer() { + if (m_server) { + bool start = m_server->start(); + if (start) { + m_port = m_server->getPort(); + m_agentTestHelper->m_ioContext.run_for(500ms); + } + } + } + + void createClient(const ConfigOptions& options, unique_ptr&& handler) { + ConfigOptions opts(options); + MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, + {MqttPort, m_port}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, std::move(handler)); + } + + bool startClient() { + bool started = m_client && m_client->start(); + if (started) { + return waitFor(1s, [this]() { return m_client->isConnected(); }); + } + return started; + } + + void addAdapter(ConfigOptions options = ConfigOptions {}) { + m_agentTestHelper->addAdapter(options, "localhost", 0, + m_agentTestHelper->m_agent->getDefaultDevice()->getName()); + } +}; + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_use_flat_topic_structure) +{ + bool gotMessage = false; + std::string receivedTopic; + + createServer({}); + startServer(); + + auto client = mqtt::make_async_client(m_agentTestHelper->m_ioContext.get(), "localhost", m_port); + + client->set_client_id("test_client"); + client->set_clean_session(true); + client->set_keep_alive_sec(30); + + bool subscribed = false; + client->set_connack_handler( + [client, &subscribed](bool sp, mqtt::connect_return_code connack_return_code) { + if (connack_return_code == mqtt::connect_return_code::accepted) + { + auto pid = client->acquire_unique_packet_id(); + client->async_subscribe(pid, "MTConnect/#", MQTT_NS::qos::at_least_once, + [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); }); + } + return true; + }); + + client->set_suback_handler( + [&subscribed](std::uint16_t packet_id, std::vector results) { + subscribed = true; + return true; + }); + + client->set_publish_handler([&gotMessage, &receivedTopic](mqtt::optional packet_id, + mqtt::publish_options pubopts, + mqtt::buffer topic_name, + mqtt::buffer contents) { + receivedTopic = topic_name; + std::cout << "Received topic: " << topic_name << " payload: " << contents << std::endl; + if (topic_name.find("MTConnect/Devices/") == 0 && + topic_name.find("/Observations/") != std::string::npos) + { + gotMessage = true; + } + return true; + }); + + client->async_connect([](mqtt::error_code ec) { ASSERT_FALSE(ec) << "Cannot connect"; }); + + // Wait for subscription to complete + ASSERT_TRUE(waitFor(10s, [&subscribed]() { return subscribed; })) + << "Subscription never completed"; + + // Create the agent and wait for its MQTT sink to connect. + createAgent(); + auto sink = m_agentTestHelper->getMqttEntitySink(); + ASSERT_TRUE(sink != nullptr); + ASSERT_TRUE(waitFor(10s, [&sink]() { return sink->isConnected(); })) + << "MqttEntitySink failed to connect to broker"; + + gotMessage = false; + receivedTopic.clear(); + + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })) + << "Timeout waiting for adapter data. Last topic: " << receivedTopic; + + EXPECT_TRUE(receivedTopic.find("MTConnect/Devices/") == 0) + << "Topic doesn't start with MTConnect/Devices/: " << receivedTopic; + EXPECT_TRUE(receivedTopic.find("/Observations/") != std::string::npos) + << "Topic doesn't contain /Observations/: " << receivedTopic; + EXPECT_EQ("MTConnect/Devices/000/Observations/p3", receivedTopic) + << "Topic does not match expected format: " << receivedTopic; + + client->async_disconnect(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_entity_json_format) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotMessage = false; + json receivedJson; + + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + // Only set gotMessage if result matches expected value + if (receivedJson.contains("result") && receivedJson["result"] == "204") + { + gotMessage = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + // Ensure subscription is active before sending data + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + // Wait a bit to ensure sink and client are ready + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); + + // Verify Entity JSON format + EXPECT_TRUE(receivedJson.contains("dataItemId")); + EXPECT_TRUE(receivedJson.contains("timestamp")); + EXPECT_TRUE(receivedJson.contains("result")); + EXPECT_TRUE(receivedJson.contains("sequence")); + EXPECT_TRUE(receivedJson.contains("type")); + EXPECT_TRUE(receivedJson.contains("category")); + + EXPECT_EQ("204", receivedJson["result"].get()); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_include_optional_fields) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotMessage = false; + json receivedJson; + + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + receivedJson = json::parse(payload); + if (receivedJson.contains("name")) + { + gotMessage = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); + + // Verify optional fields + if (receivedJson.contains("name")) + { + EXPECT_TRUE(receivedJson["name"].is_string()); + } + if (receivedJson.contains("subType")) + { + EXPECT_TRUE(receivedJson["subType"].is_string()); + } +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_samples) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotSample = false; + json receivedJson; + + handler->m_receive = [&gotSample, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + if (receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "z2" && + receivedJson.contains("category") && receivedJson["category"] == "SAMPLE" && + receivedJson.contains("result") && receivedJson["result"] == "204.000000") + { + gotSample = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|z2|204"); + ASSERT_TRUE(waitFor(10s, [&gotSample]() { return gotSample; })); + + EXPECT_EQ("SAMPLE", receivedJson["category"].get()); + EXPECT_EQ("204.000000", receivedJson["result"].get()); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_events) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotEvent = false; + json receivedJson; + + handler->m_receive = [&gotEvent, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try { + receivedJson = json::parse(payload); + if (receivedJson.contains("category") && receivedJson["category"] == "EVENT" && + receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "p4" && + receivedJson.contains("result") && receivedJson["result"] == "READY") + { + gotEvent = true; + } + } catch (const std::exception& e) { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|p4|READY"); + ASSERT_TRUE(waitFor(10s, [&gotEvent]() { return gotEvent; })); + + EXPECT_EQ("EVENT", receivedJson["category"].get()); + EXPECT_EQ("READY", receivedJson["result"].get()); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_conditions) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotCondition = false; + json receivedJson; + + handler->m_receive = [&gotCondition, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try { + receivedJson = json::parse(payload); + if (receivedJson.contains("category") && receivedJson["category"] == "CONDITION" && + receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "zlc" && + receivedJson.contains("level") && receivedJson["level"] == "FAULT") + { + gotCondition = true; + } + } catch (const std::exception& e) { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|zlc|FAULT|1234|LOW|Hydraulic pressure low"); + ASSERT_TRUE(waitFor(10s, [&gotCondition]() { return gotCondition; })); + + EXPECT_EQ("CONDITION", receivedJson["category"].get()); + EXPECT_EQ("FAULT", receivedJson["level"].get()); + if (receivedJson.contains("nativeCode")) + { + EXPECT_EQ("1234", receivedJson["nativeCode"].get()); + } +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_availability) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotAvailable = false; + std::string availabilityValue; + + handler->m_receive = [&gotAvailable, &availabilityValue](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + if (topic.find("Availability") != std::string::npos) + { + availabilityValue = payload; + gotAvailable = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Probe/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&gotAvailable]() { return gotAvailable; })); + EXPECT_EQ("AVAILABLE", availabilityValue); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_initial_observations) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + int messageCount = 0; + + handler->m_receive = [&messageCount](std::shared_ptr client, const std::string& topic, + const std::string& payload) { + if (topic.find("/Observations/") != std::string::npos) + { + messageCount++; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + ASSERT_TRUE(waitFor(10s, [&messageCount]() { return messageCount > 0; })); + EXPECT_GT(messageCount, 0); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotUnavailable = false; + json receivedJson; + + handler->m_receive = [&gotUnavailable, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + receivedJson = json::parse(payload); + if (receivedJson["result"] == "UNAVAILABLE") + { + gotUnavailable = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + // Initial observations should include UNAVAILABLE values + ASSERT_TRUE(waitFor(10s, [&gotUnavailable]() { return gotUnavailable; })); + EXPECT_EQ("UNAVAILABLE", receivedJson["result"].get()); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_authentication) +{ + ConfigOptions options; + options["MqttUserName"] = "mtconnect"; + options["MqttPassword"] = "password123"; + options["MqttClientId"] = "auth-client"; + + createServer({}); + startServer(); + + bool connected = false; + auto handler = std::make_unique(); + handler->m_connected = [&connected](std::shared_ptr) { + connected = true; + }; + + createClient(options, std::move(handler)); + startClient(); + + auto start = std::chrono::steady_clock::now(); + while (!connected && std::chrono::steady_clock::now() - start < std::chrono::seconds(5)) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + ASSERT_TRUE(connected) << "MQTT client did not connect with authentication"; +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_qos_levels) +{ + ConfigOptions options; + options["MqttQOS"] = "exactly_once"; + options["MqttClientId"] = "qos-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool received = false; + handler->m_receive = [&received](std::shared_ptr client, const std::string& topic, const std::string& payload) { + received = true; + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&received]() { return received; })); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_retained_messages) +{ + ConfigOptions options; + options["MqttRetain"] = "true"; + options["MqttClientId"] = "retain-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool retainedReceived = false; + std::string retainedPayload; + handler->m_receive = [&retainedReceived, &retainedPayload](std::shared_ptr client, const std::string& topic, const std::string& payload) { + retainedReceived = true; + retainedPayload = payload; + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&retainedReceived]() { return retainedReceived; })); + ASSERT_FALSE(retainedPayload.empty()); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_last_will) +{ + ConfigOptions options; + options["MqttLastWillTopic"] = "MTConnect/Probe/J55-411045-cpp/Availability"; + options["MqttClientId"] = "lastwill-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool lastWillReceived = false; + handler->m_receive = [&lastWillReceived](std::shared_ptr client, const std::string& topic, const std::string& payload) { + if (topic.find("Availability") != std::string::npos) { + lastWillReceived = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Probe/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + m_client->stop(); + ASSERT_TRUE(waitFor(5s, [&lastWillReceived]() { return lastWillReceived; })); +} From bb5953b16e5f4898fbf4e85e4a75a1e0039136cb Mon Sep 17 00:00:00 2001 From: virajdere Date: Fri, 24 Oct 2025 21:58:12 +0000 Subject: [PATCH 3/9] removed duplicate entry --- agent_lib/CMakeLists.txt | 8 -------- 1 file changed, 8 deletions(-) diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 343a2c1c8..780bb73d8 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -256,14 +256,6 @@ set(AGENT_SOURCES #src/sink/mqtt_sink SOURCE_FILES_ONLY "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" - -# src/sink/mqtt_sink HEADER_FILE_ONLY - - "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp" - -#src/sink/mqtt_sink SOURCE_FILES_ONLY - - "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" # src/sink/mqtt_entity_sink HEADER_FILE_ONLY From 31aa5160e40ab9c69bcb5ef14d3ef88543bf0b27 Mon Sep 17 00:00:00 2001 From: virajdere Date: Sat, 25 Oct 2025 05:35:23 +0000 Subject: [PATCH 4/9] updated cmake for entity_sink to use /bigobj for windows --- agent_lib/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 780bb73d8..4e979e0c1 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -349,6 +349,7 @@ if(MSVC) # The modules including Beast required the /bigobj option in Windows set_property(SOURCE "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp" "${SOURCE_DIR}/sink/rest_sink/session_impl.cpp" "${SOURCE_DIR}/source/adapter/mqtt/mqtt_adapter.cpp" "${SOURCE_DIR}/source/adapter/agent_adapter/agent_adapter.cpp" From a8f5ba88447f574f8dcea9bf26ce2def0a886cec Mon Sep 17 00:00:00 2001 From: virajdere Date: Sat, 25 Oct 2025 17:16:44 +0000 Subject: [PATCH 5/9] Changed copyright from 2009 to 2025 --- src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp | 2 +- src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp index 8c288919a..c826d0871 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2023, AMT – The Association For Manufacturing Technology (“AMT”) +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp index 34e248ad6..76586a92f 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -1,5 +1,5 @@ // -// Copyright Copyright 2009-2023, AMT – The Association For Manufacturing Technology (“AMT”) +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); From 3b0881227d6892dfc3dbfdf8eb92d6b3c919fbbf Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Fri, 31 Oct 2025 16:53:20 +0100 Subject: [PATCH 6/9] Version 2.6.0.4 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d1220305c..c89d74038 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ set(AGENT_VERSION_MAJOR 2) set(AGENT_VERSION_MINOR 6) set(AGENT_VERSION_PATCH 0) -set(AGENT_VERSION_BUILD 3) +set(AGENT_VERSION_BUILD 4) set(AGENT_VERSION_RC "") # This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent From aedfff3928fd928af6c5c7f8889b63b4fe1de763 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 3 Nov 2025 12:25:37 +0100 Subject: [PATCH 7/9] Fixed test issues --- src/mtconnect/configuration/agent_config.cpp | 3 +- src/mtconnect/entity/xml_printer.cpp | 2 +- src/mtconnect/mqtt/mqtt_server_impl.hpp | 4 +- src/mtconnect/pipeline/pipeline.hpp | 3 +- src/mtconnect/printer/xml_printer.cpp | 2 +- .../mqtt_entity_sink/mqtt_entity_sink.cpp | 2 +- .../mqtt_entity_sink/mqtt_entity_sink.hpp | 4 +- src/mtconnect/sink/rest_sink/rest_service.cpp | 1 - .../adapter/agent_adapter/agent_adapter.cpp | 2 +- .../source/adapter/shdr/connector.hpp | 2 +- src/mtconnect/source/source.cpp | 12 +- src/mtconnect/source/source.hpp | 2 +- src/mtconnect/utilities.hpp | 11 +- test_package/agent_test_helper.hpp | 4 +- test_package/http_server_test.cpp | 8 +- test_package/json_printer_probe_test.cpp | 6 +- test_package/mqtt_entity_sink_test.cpp | 437 ++++++++++-------- test_package/tls_http_server_test.cpp | 6 +- test_package/websockets_test.cpp | 15 +- 19 files changed, 300 insertions(+), 226 deletions(-) diff --git a/src/mtconnect/configuration/agent_config.cpp b/src/mtconnect/configuration/agent_config.cpp index 90941458a..2d7db5882 100644 --- a/src/mtconnect/configuration/agent_config.cpp +++ b/src/mtconnect/configuration/agent_config.cpp @@ -58,8 +58,8 @@ #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/device_model/device.hpp" #include "mtconnect/printer/xml_printer.hpp" -#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" #include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" +#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" #include "mtconnect/sink/rest_sink/rest_service.hpp" #include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp" #include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp" @@ -1254,4 +1254,3 @@ namespace mtconnect::configuration { return false; } } // namespace mtconnect::configuration - diff --git a/src/mtconnect/entity/xml_printer.cpp b/src/mtconnect/entity/xml_printer.cpp index 3cfb0ccb4..66d461886 100644 --- a/src/mtconnect/entity/xml_printer.cpp +++ b/src/mtconnect/entity/xml_printer.cpp @@ -19,8 +19,8 @@ #include -#include #include +#include #include "mtconnect/logging.hpp" #include "mtconnect/printer/xml_printer_helper.hpp" diff --git a/src/mtconnect/mqtt/mqtt_server_impl.hpp b/src/mtconnect/mqtt/mqtt_server_impl.hpp index b962d14ab..0a147d9ac 100644 --- a/src/mtconnect/mqtt/mqtt_server_impl.hpp +++ b/src/mtconnect/mqtt/mqtt_server_impl.hpp @@ -24,9 +24,9 @@ #include #include +#include #include #include -#include #include "mqtt_server.hpp" #include "mtconnect/configuration/config_options.hpp" @@ -218,7 +218,7 @@ namespace mtconnect { LOG(debug) << "Server topic_name: " << topic_name; LOG(debug) << "Server contents: " << contents; - for (const auto& sub : m_subs) + for (const auto &sub : m_subs) { if (mqtt::broker::compare_topic_filter(sub.topic, topic_name)) { diff --git a/src/mtconnect/pipeline/pipeline.hpp b/src/mtconnect/pipeline/pipeline.hpp index 8fa7b8594..a6ce14c67 100644 --- a/src/mtconnect/pipeline/pipeline.hpp +++ b/src/mtconnect/pipeline/pipeline.hpp @@ -17,9 +17,10 @@ #pragma once -#include #include +#include + #include "mtconnect/config.hpp" #include "pipeline_context.hpp" #include "pipeline_contract.hpp" diff --git a/src/mtconnect/printer/xml_printer.cpp b/src/mtconnect/printer/xml_printer.cpp index 3ea09f277..06c0da048 100644 --- a/src/mtconnect/printer/xml_printer.cpp +++ b/src/mtconnect/printer/xml_printer.cpp @@ -24,8 +24,8 @@ #include #include -#include #include +#include #include "mtconnect/asset/asset.hpp" #include "mtconnect/asset/cutting_tool.hpp" diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp index c826d0871..6dbdd23fc 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -555,5 +555,5 @@ namespace mtconnect { } } // namespace mqtt_entity_sink - } // namespace sink + } // namespace sink } // namespace mtconnect diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp index 76586a92f..db71cf0ad 100644 --- a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -117,7 +117,7 @@ namespace mtconnect { std::string formatTimestamp(const Timestamp& timestamp); protected: - static constexpr size_t MAX_QUEUE_SIZE = 10000; // Maximum queued observations + static constexpr size_t MAX_QUEUE_SIZE = 10000; // Maximum queued observations std::string m_observationTopicPrefix; //! Observation topic prefix std::string m_deviceTopicPrefix; //! Device topic prefix @@ -134,5 +134,5 @@ namespace mtconnect { std::mutex m_queueMutex; }; } // namespace mqtt_entity_sink - } // namespace sink + } // namespace sink } // namespace mtconnect diff --git a/src/mtconnect/sink/rest_sink/rest_service.cpp b/src/mtconnect/sink/rest_sink/rest_service.cpp index 46775087c..fe19d8436 100644 --- a/src/mtconnect/sink/rest_sink/rest_service.cpp +++ b/src/mtconnect/sink/rest_sink/rest_service.cpp @@ -1616,4 +1616,3 @@ namespace mtconnect { } // namespace sink::rest_sink } // namespace mtconnect - diff --git a/src/mtconnect/source/adapter/agent_adapter/agent_adapter.cpp b/src/mtconnect/source/adapter/agent_adapter/agent_adapter.cpp index b99bff088..255c44872 100644 --- a/src/mtconnect/source/adapter/agent_adapter/agent_adapter.cpp +++ b/src/mtconnect/source/adapter/agent_adapter/agent_adapter.cpp @@ -135,7 +135,7 @@ namespace mtconnect::source::adapter::agent_adapter { m_name = m_url.getUrlText(m_sourceDevice); m_identity = CreateIdentityHash(m_name); - + m_options.insert_or_assign(configuration::AdapterIdentity, m_identity); m_feedbackId = "XmlTransformFeedback:" + m_identity; diff --git a/src/mtconnect/source/adapter/shdr/connector.hpp b/src/mtconnect/source/adapter/shdr/connector.hpp index ad0e9ed14..66d4ff3c4 100644 --- a/src/mtconnect/source/adapter/shdr/connector.hpp +++ b/src/mtconnect/source/adapter/shdr/connector.hpp @@ -105,7 +105,7 @@ namespace mtconnect::source::adapter::shdr { void resolved(const boost::system::error_code &error, boost::asio::ip::tcp::resolver::results_type results); void connected(const boost::system::error_code &error, - const boost::asio::ip::tcp::endpoint& endpoint); + const boost::asio::ip::tcp::endpoint &endpoint); void writer(boost::system::error_code ec, std::size_t length); void reader(boost::system::error_code ec, std::size_t length); bool parseSocketBuffer(); diff --git a/src/mtconnect/source/source.cpp b/src/mtconnect/source/source.cpp index d67880b97..97d5ebb83 100644 --- a/src/mtconnect/source/source.cpp +++ b/src/mtconnect/source/source.cpp @@ -15,12 +15,12 @@ // limitations under the License. // -#include - #include "mtconnect/source/source.hpp" #include +#include + #include "mtconnect/logging.hpp" namespace mtconnect::source { @@ -45,18 +45,18 @@ namespace mtconnect::source { std::string CreateIdentityHash(const std::string &input) { using namespace std; - + boost::uuids::detail::sha1 sha1; sha1.process_bytes(input.c_str(), input.length()); boost::uuids::detail::sha1::digest_type digest; sha1.get_digest(digest); - + ostringstream identity; identity << '_' << std::hex; for (int i = 0; i < 5; i++) - identity << (uint16_t) digest[i]; + identity << (uint16_t)digest[i]; return identity.str(); } - + } // namespace mtconnect::source diff --git a/src/mtconnect/source/source.hpp b/src/mtconnect/source/source.hpp index 8ff888bb4..cdef79b11 100644 --- a/src/mtconnect/source/source.hpp +++ b/src/mtconnect/source/source.hpp @@ -95,7 +95,7 @@ namespace mtconnect { std::string m_name; boost::asio::io_context::strand m_strand; }; - + /// @brief create a unique identity hash for an XML id starting with an `_` and 10 hex digits /// @param text the text to create the hashed id /// @returns a string with the hashed result diff --git a/src/mtconnect/utilities.hpp b/src/mtconnect/utilities.hpp index 70bddd58f..c303caffc 100644 --- a/src/mtconnect/utilities.hpp +++ b/src/mtconnect/utilities.hpp @@ -224,7 +224,7 @@ namespace mtconnect { #else namespace tzchrono = date; #endif - + switch (format) { case HUM_READ: @@ -819,7 +819,8 @@ namespace mtconnect { /// @param[in] sha the sha1 namespace to use as context /// @param[in] id the id to use transform /// @returns Returns the first 16 characters of the base 64 encoded sha1 - inline std::string makeUniqueId(const ::boost::uuids::detail::sha1 &contextSha, const std::string &id) + inline std::string makeUniqueId(const ::boost::uuids::detail::sha1 &contextSha, + const std::string &id) { using namespace std; using namespace boost::uuids::detail; @@ -833,11 +834,11 @@ namespace mtconnect { }; sha.process_bytes(id.data(), id.length()); - sha1::digest_type digest; + sha1::digest_type digest; sha.get_digest(digest); - auto data = (unsigned int *) digest; - + auto data = (unsigned int *)digest; + string s(32, ' '); auto len = boost::beast::detail::base64::encode(s.data(), data, sizeof(digest)); diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index fcbeb0402..f1aa59a7d 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -125,6 +125,7 @@ class AgentTestHelper ~AgentTestHelper() { m_mqttService.reset(); + m_mqttEntitySink.reset(); m_restService.reset(); m_adapter.reset(); if (m_agent) @@ -323,8 +324,7 @@ class AgentTestHelper std::shared_ptr m_context; std::shared_ptr m_adapter; std::shared_ptr m_mqttService; - std::shared_ptr - m_mqttEntitySink; + std::shared_ptr m_mqttEntitySink; std::shared_ptr m_restService; std::shared_ptr m_loopback; diff --git a/test_package/http_server_test.cpp b/test_package/http_server_test.cpp index ff73298c3..81c0a1c56 100644 --- a/test_package/http_server_test.cpp +++ b/test_package/http_server_test.cpp @@ -243,8 +243,9 @@ class Client cout << "spawnRequest: done: false" << endl; m_done = false; m_count = 0; - asio::spawn(m_context, std::bind(&Client::request, this, verb, target, body, close, contentType, - std::placeholders::_1), + asio::spawn(m_context, + std::bind(&Client::request, this, verb, target, body, close, contentType, + std::placeholders::_1), boost::asio::detached); while (!m_done && m_context.run_for(20ms) > 0) @@ -311,8 +312,7 @@ class HttpServerTest : public testing::Test asio::spawn(m_context, std::bind(&Client::connect, m_client.get(), static_cast(m_server->getPort()), std::placeholders::_1), - boost::asio::detached); - + boost::asio::detached); while (!m_client->m_connected) m_context.run_one(); diff --git a/test_package/json_printer_probe_test.cpp b/test_package/json_printer_probe_test.cpp index 1c2a1cb65..2aae84e22 100644 --- a/test_package/json_printer_probe_test.cpp +++ b/test_package/json_printer_probe_test.cpp @@ -362,7 +362,8 @@ TEST_F(JsonPrinterProbeTest, PrintDataItemRelationships) auto dir2 = load.at("/Relationships/1"_json_pointer); ASSERT_TRUE(dir2.is_object()); ASSERT_EQ(string("LIMIT"), dir2.at("/SpecificationRelationship/type"_json_pointer).get()); - ASSERT_EQ(string("spec1"), dir2.at("/SpecificationRelationship/idRef"_json_pointer).get()); + ASSERT_EQ(string("spec1"), + dir2.at("/SpecificationRelationship/idRef"_json_pointer).get()); auto limits = linear.at("/DataItems/5/DataItem"_json_pointer); ASSERT_TRUE(load.is_object()); @@ -371,7 +372,8 @@ TEST_F(JsonPrinterProbeTest, PrintDataItemRelationships) auto dir3 = limits.at("/Relationships/0"_json_pointer); ASSERT_TRUE(dir3.is_object()); ASSERT_EQ(string("bob"), dir3.at("/DataItemRelationship/name"_json_pointer).get()); - ASSERT_EQ(string("OBSERVATION"), dir3.at("/DataItemRelationship/type"_json_pointer).get()); + ASSERT_EQ(string("OBSERVATION"), + dir3.at("/DataItemRelationship/type"_json_pointer).get()); ASSERT_EQ(string("xlc"), dir3.at("/DataItemRelationship/idRef"_json_pointer).get()); } diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp index 7ea003c7a..bb868908e 100644 --- a/test_package/mqtt_entity_sink_test.cpp +++ b/test_package/mqtt_entity_sink_test.cpp @@ -60,142 +60,181 @@ class MqttEntitySinkTest : public testing::Test std::shared_ptr m_client; std::unique_ptr m_jsonPrinter; uint16_t m_port {0}; - - void SetUp() override { + + void SetUp() override + { m_agentTestHelper = std::make_unique(); m_jsonPrinter = std::make_unique(2, true); } + + void TearDown() override + { + stopAgent(); + stopClient(); + stopServer(); - void TearDown() override { - if (m_client) { - m_client->stop(); - m_agentTestHelper->m_ioContext.run_for(500ms); - m_client.reset(); - } - if (m_server) { - m_server->stop(); - m_agentTestHelper->m_ioContext.run_for(500ms); - m_server.reset(); - } m_agentTestHelper.reset(); m_jsonPrinter.reset(); } - - void createAgent(std::string testFile = {}, ConfigOptions options = {}) { + + void createAgent(std::string testFile = {}, ConfigOptions options = {}) + { if (testFile == "") testFile = "/samples/test_config.xml"; ConfigOptions opts(options); MergeOptions( - opts, - { - {"MqttEntitySink", true}, - {configuration::MqttPort, m_port}, - {MqttCurrentInterval, 200ms}, - {MqttSampleInterval, 100ms}, - {configuration::MqttHost, "127.0.0.1"s}, - {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, - {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, - {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, - {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, - }); + opts, + { + {"MqttEntitySink", true}, + {configuration::MqttPort, m_port}, + {MqttCurrentInterval, 200ms}, + {MqttSampleInterval, 100ms}, + {configuration::MqttHost, "127.0.0.1"s}, + {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, + {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, + {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, + {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, + }); m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); addAdapter(); m_agentTestHelper->getAgent()->start(); } - - void createServer(const ConfigOptions& options) { + + void createServer(const ConfigOptions& options) + { using namespace mtconnect::configuration; ConfigOptions opts(options); MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, - {MqttPort, 0}, - {MqttTls, false}, - {AutoAvailable, false}, - {RealTime, false}}); - m_server = std::make_shared(m_agentTestHelper->m_ioContext, opts); + {MqttPort, 0}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); + m_server = std::make_shared( + m_agentTestHelper->m_ioContext, opts); } - + template - bool waitFor(const chrono::duration& time, function pred) { + bool waitFor(const chrono::duration& time, function pred) + { boost::asio::steady_timer timer(m_agentTestHelper->m_ioContext); timer.expires_after(time); bool timeout = false; timer.async_wait([&timeout](boost::system::error_code ec) { - if (!ec) timeout = true; + if (!ec) + timeout = true; }); - while (!timeout && !pred()) { + while (!timeout && !pred()) + { m_agentTestHelper->m_ioContext.run_for(100ms); } timer.cancel(); return pred(); } - - void startServer() { - if (m_server) { + + void startServer() + { + if (m_server) + { bool start = m_server->start(); - if (start) { + if (start) + { m_port = m_server->getPort(); m_agentTestHelper->m_ioContext.run_for(500ms); } } } - - void createClient(const ConfigOptions& options, unique_ptr&& handler) { + + void createClient(const ConfigOptions& options, unique_ptr&& handler) + { ConfigOptions opts(options); MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, - {MqttPort, m_port}, - {MqttTls, false}, - {AutoAvailable, false}, - {RealTime, false}}); + {MqttPort, m_port}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); m_client = make_shared(m_agentTestHelper->m_ioContext, opts, std::move(handler)); } - - bool startClient() { + + bool startClient() + { bool started = m_client && m_client->start(); - if (started) { + if (started) + { return waitFor(1s, [this]() { return m_client->isConnected(); }); } return started; } - - void addAdapter(ConfigOptions options = ConfigOptions {}) { + + void addAdapter(ConfigOptions options = ConfigOptions {}) + { m_agentTestHelper->addAdapter(options, "localhost", 0, m_agentTestHelper->m_agent->getDefaultDevice()->getName()); } + + void stopAgent() + { + const auto agent = m_agentTestHelper->getAgent(); + if (agent) + { + m_agentTestHelper->getAgent()->stop(); + m_agentTestHelper->m_ioContext.run_for(100ms); + } + } + + void stopClient() + { + if (m_client) + { + m_client->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_client.reset(); + } + } + + void stopServer() + { + if (m_server) + { + m_server->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_server.reset(); + } + } }; TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_use_flat_topic_structure) { bool gotMessage = false; std::string receivedTopic; - + createServer({}); startServer(); - + auto client = mqtt::make_async_client(m_agentTestHelper->m_ioContext.get(), "localhost", m_port); - + client->set_client_id("test_client"); client->set_clean_session(true); client->set_keep_alive_sec(30); - + bool subscribed = false; client->set_connack_handler( - [client, &subscribed](bool sp, mqtt::connect_return_code connack_return_code) { - if (connack_return_code == mqtt::connect_return_code::accepted) - { - auto pid = client->acquire_unique_packet_id(); - client->async_subscribe(pid, "MTConnect/#", MQTT_NS::qos::at_least_once, - [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); }); - } - return true; - }); - + [client, &subscribed](bool sp, mqtt::connect_return_code connack_return_code) { + if (connack_return_code == mqtt::connect_return_code::accepted) + { + auto pid = client->acquire_unique_packet_id(); + client->async_subscribe(pid, "MTConnect/#", MQTT_NS::qos::at_least_once, + [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); }); + } + return true; + }); + client->set_suback_handler( - [&subscribed](std::uint16_t packet_id, std::vector results) { - subscribed = true; - return true; - }); - + [&subscribed](std::uint16_t packet_id, std::vector results) { + subscribed = true; + return true; + }); + client->set_publish_handler([&gotMessage, &receivedTopic](mqtt::optional packet_id, mqtt::publish_options pubopts, mqtt::buffer topic_name, @@ -209,48 +248,49 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_use_flat_topic_structure) } return true; }); - + client->async_connect([](mqtt::error_code ec) { ASSERT_FALSE(ec) << "Cannot connect"; }); - + // Wait for subscription to complete ASSERT_TRUE(waitFor(10s, [&subscribed]() { return subscribed; })) - << "Subscription never completed"; - + << "Subscription never completed"; + // Create the agent and wait for its MQTT sink to connect. createAgent(); auto sink = m_agentTestHelper->getMqttEntitySink(); ASSERT_TRUE(sink != nullptr); ASSERT_TRUE(waitFor(10s, [&sink]() { return sink->isConnected(); })) - << "MqttEntitySink failed to connect to broker"; - + << "MqttEntitySink failed to connect to broker"; + gotMessage = false; receivedTopic.clear(); - + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })) - << "Timeout waiting for adapter data. Last topic: " << receivedTopic; - + << "Timeout waiting for adapter data. Last topic: " << receivedTopic; + EXPECT_TRUE(receivedTopic.find("MTConnect/Devices/") == 0) - << "Topic doesn't start with MTConnect/Devices/: " << receivedTopic; + << "Topic doesn't start with MTConnect/Devices/: " << receivedTopic; EXPECT_TRUE(receivedTopic.find("/Observations/") != std::string::npos) - << "Topic doesn't contain /Observations/: " << receivedTopic; + << "Topic doesn't contain /Observations/: " << receivedTopic; EXPECT_EQ("MTConnect/Devices/000/Observations/p3", receivedTopic) - << "Topic does not match expected format: " << receivedTopic; - + << "Topic does not match expected format: " << receivedTopic; + client->async_disconnect(); + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_entity_json_format) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotMessage = false; json receivedJson; - + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { @@ -268,24 +308,24 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_entity_json_format) LOG(error) << "Failed to parse JSON: " << e.what(); } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); // Ensure subscription is active before sending data m_agentTestHelper->m_ioContext.run_for(200ms); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + // Wait a bit to ensure sink and client are ready m_agentTestHelper->m_ioContext.run_for(200ms); m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); - + // Verify Entity JSON format EXPECT_TRUE(receivedJson.contains("dataItemId")); EXPECT_TRUE(receivedJson.contains("timestamp")); @@ -293,44 +333,47 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_entity_json_format) EXPECT_TRUE(receivedJson.contains("sequence")); EXPECT_TRUE(receivedJson.contains("type")); EXPECT_TRUE(receivedJson.contains("category")); - + EXPECT_EQ("204", receivedJson["result"].get()); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_include_optional_fields) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotMessage = false; json receivedJson; - + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { - receivedJson = json::parse(payload); - if (receivedJson.contains("name")) + json js = json::parse(payload); + if (js.contains("name")) { gotMessage = true; } + receivedJson = js; }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); - + // Verify optional fields if (receivedJson.contains("name")) { @@ -340,19 +383,21 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_include_optional_fields) { EXPECT_TRUE(receivedJson["subType"].is_string()); } + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_samples) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotSample = false; json receivedJson; - + handler->m_receive = [&gotSample, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { @@ -371,42 +416,45 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_samples) LOG(error) << "Failed to parse JSON: " << e.what(); } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); m_agentTestHelper->m_ioContext.run_for(200ms); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); - + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + m_agentTestHelper->m_ioContext.run_for(200ms); m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|z2|204"); ASSERT_TRUE(waitFor(10s, [&gotSample]() { return gotSample; })); - + EXPECT_EQ("SAMPLE", receivedJson["category"].get()); EXPECT_EQ("204.000000", receivedJson["result"].get()); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_events) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotEvent = false; json receivedJson; - + handler->m_receive = [&gotEvent, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { - try { + try + { receivedJson = json::parse(payload); if (receivedJson.contains("category") && receivedJson["category"] == "EVENT" && receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "p4" && @@ -414,45 +462,50 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_events) { gotEvent = true; } - } catch (const std::exception& e) { + } + catch (const std::exception& e) + { LOG(error) << "Failed to parse JSON: " << e.what(); } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); m_agentTestHelper->m_ioContext.run_for(200ms); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + m_agentTestHelper->m_ioContext.run_for(200ms); m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|p4|READY"); ASSERT_TRUE(waitFor(10s, [&gotEvent]() { return gotEvent; })); - + EXPECT_EQ("EVENT", receivedJson["category"].get()); EXPECT_EQ("READY", receivedJson["result"].get()); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_conditions) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotCondition = false; json receivedJson; - + handler->m_receive = [&gotCondition, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { - try { + try + { receivedJson = json::parse(payload); if (receivedJson.contains("category") && receivedJson["category"] == "CONDITION" && receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "zlc" && @@ -460,46 +513,50 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_conditions) { gotCondition = true; } - } catch (const std::exception& e) { + } + catch (const std::exception& e) + { LOG(error) << "Failed to parse JSON: " << e.what(); } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); m_agentTestHelper->m_ioContext.run_for(200ms); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + m_agentTestHelper->m_ioContext.run_for(200ms); m_agentTestHelper->m_adapter->processData( - "2021-02-01T12:00:00Z|zlc|FAULT|1234|LOW|Hydraulic pressure low"); + "2021-02-01T12:00:00Z|zlc|FAULT|1234|LOW|Hydraulic pressure low"); ASSERT_TRUE(waitFor(10s, [&gotCondition]() { return gotCondition; })); - + EXPECT_EQ("CONDITION", receivedJson["category"].get()); EXPECT_EQ("FAULT", receivedJson["level"].get()); if (receivedJson.contains("nativeCode")) { EXPECT_EQ("1234", receivedJson["nativeCode"].get()); } + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_availability) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotAvailable = false; std::string availabilityValue; - + handler->m_receive = [&gotAvailable, &availabilityValue](std::shared_ptr client, const std::string& topic, const std::string& payload) { @@ -509,31 +566,33 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_availability) gotAvailable = true; } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Probe/#"); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + ASSERT_TRUE(waitFor(5s, [&gotAvailable]() { return gotAvailable; })); EXPECT_EQ("AVAILABLE", availabilityValue); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_initial_observations) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); int messageCount = 0; - + handler->m_receive = [&messageCount](std::shared_ptr client, const std::string& topic, const std::string& payload) { if (topic.find("/Observations/") != std::string::npos) @@ -541,31 +600,33 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_initial_observations) messageCount++; } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); ASSERT_TRUE(waitFor(10s, [&messageCount]() { return messageCount > 0; })); EXPECT_GT(messageCount, 0); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) { ConfigOptions options; - + createServer({}); startServer(); - + auto handler = make_unique(); bool gotUnavailable = false; json receivedJson; - + handler->m_receive = [&gotUnavailable, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { @@ -575,20 +636,22 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) gotUnavailable = true; } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); - + createAgent(); - + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); - + // Initial observations should include UNAVAILABLE values ASSERT_TRUE(waitFor(10s, [&gotUnavailable]() { return gotUnavailable; })); EXPECT_EQ("UNAVAILABLE", receivedJson["result"].get()); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_authentication) @@ -597,24 +660,23 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_authentication) options["MqttUserName"] = "mtconnect"; options["MqttPassword"] = "password123"; options["MqttClientId"] = "auth-client"; - + createServer({}); startServer(); - + bool connected = false; auto handler = std::make_unique(); - handler->m_connected = [&connected](std::shared_ptr) { - connected = true; - }; - + handler->m_connected = [&connected](std::shared_ptr) { connected = true; }; + createClient(options, std::move(handler)); startClient(); - + auto start = std::chrono::steady_clock::now(); - while (!connected && std::chrono::steady_clock::now() - start < std::chrono::seconds(5)) { + while (!connected && std::chrono::steady_clock::now() - start < std::chrono::seconds(5)) + { std::this_thread::sleep_for(std::chrono::milliseconds(50)); } - + ASSERT_TRUE(connected) << "MQTT client did not connect with authentication"; } @@ -623,25 +685,24 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_qos_levels) ConfigOptions options; options["MqttQOS"] = "exactly_once"; options["MqttClientId"] = "qos-client"; - + createServer({}); startServer(); - + auto handler = std::make_unique(); bool received = false; - handler->m_receive = [&received](std::shared_ptr client, const std::string& topic, const std::string& payload) { - received = true; - }; - + handler->m_receive = [&received](std::shared_ptr client, const std::string& topic, + const std::string& payload) { received = true; }; + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); - + createAgent(); auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = std::dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); - + ASSERT_TRUE(waitFor(5s, [&received]() { return received; })); } @@ -650,29 +711,33 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_retained_messages) ConfigOptions options; options["MqttRetain"] = "true"; options["MqttClientId"] = "retain-client"; - + createServer({}); startServer(); - + auto handler = std::make_unique(); bool retainedReceived = false; std::string retainedPayload; - handler->m_receive = [&retainedReceived, &retainedPayload](std::shared_ptr client, const std::string& topic, const std::string& payload) { + handler->m_receive = [&retainedReceived, &retainedPayload](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { retainedReceived = true; retainedPayload = payload; }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Devices/#"); - + createAgent(); auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = std::dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); - + ASSERT_TRUE(waitFor(5s, [&retainedReceived]() { return retainedReceived; })); ASSERT_FALSE(retainedPayload.empty()); + + stopClient(); } TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_last_will) @@ -680,27 +745,31 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_last_will) ConfigOptions options; options["MqttLastWillTopic"] = "MTConnect/Probe/J55-411045-cpp/Availability"; options["MqttClientId"] = "lastwill-client"; - + createServer({}); startServer(); - + auto handler = std::make_unique(); bool lastWillReceived = false; - handler->m_receive = [&lastWillReceived](std::shared_ptr client, const std::string& topic, const std::string& payload) { - if (topic.find("Availability") != std::string::npos) { + handler->m_receive = [&lastWillReceived](std::shared_ptr client, + const std::string& topic, const std::string& payload) { + if (topic.find("Availability") != std::string::npos) + { lastWillReceived = true; } }; - + createClient(options, std::move(handler)); ASSERT_TRUE(startClient()); m_client->subscribe("MTConnect/Probe/#"); - + createAgent(); auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); auto mqttSink = std::dynamic_pointer_cast(sink); ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); - + m_client->stop(); ASSERT_TRUE(waitFor(5s, [&lastWillReceived]() { return lastWillReceived; })); + + stopClient(); } diff --git a/test_package/tls_http_server_test.cpp b/test_package/tls_http_server_test.cpp index 13a047719..262e25081 100644 --- a/test_package/tls_http_server_test.cpp +++ b/test_package/tls_http_server_test.cpp @@ -252,8 +252,10 @@ class Client cout << "spawnRequest: done: false" << endl; m_done = false; m_count = 0; - asio::spawn(m_context, std::bind(&Client::request, this, verb, target, body, close, contentType, - std::placeholders::_1), boost::asio::detached); + asio::spawn(m_context, + std::bind(&Client::request, this, verb, target, body, close, contentType, + std::placeholders::_1), + boost::asio::detached); while (!m_done && !m_failed && m_context.run_for(20ms) > 0) ; diff --git a/test_package/websockets_test.cpp b/test_package/websockets_test.cpp index 07044401f..91b7a0e0f 100644 --- a/test_package/websockets_test.cpp +++ b/test_package/websockets_test.cpp @@ -241,8 +241,9 @@ TEST_F(WebsocketsTest, should_make_simple_request) start(); startClient(); - asio::spawn(m_context, std::bind(&Client::request, m_client.get(), - "{\"id\":\"1\",\"request\":\"probe\"}"s, std::placeholders::_1), + asio::spawn(m_context, + std::bind(&Client::request, m_client.get(), "{\"id\":\"1\",\"request\":\"probe\"}"s, + std::placeholders::_1), boost::asio::detached); m_client->waitFor(2s, [this]() { return m_client->m_done; }); @@ -270,8 +271,9 @@ TEST_F(WebsocketsTest, should_return_error_when_there_is_no_id) start(); startClient(); - asio::spawn(m_context, std::bind(&Client::request, m_client.get(), "{\"request\":\"probe\"}"s, - std::placeholders::_1), + asio::spawn(m_context, + std::bind(&Client::request, m_client.get(), "{\"request\":\"probe\"}"s, + std::placeholders::_1), boost::asio::detached); m_client->waitFor(2s, [this]() { return m_client->m_done; }); @@ -362,8 +364,7 @@ TEST_F(WebsocketsTest, should_return_error_when_bad_json_is_sent) start(); startClient(); - asio::spawn(m_context, - std::bind(&Client::request, m_client.get(), "!}}"s, std::placeholders::_1), + asio::spawn(m_context, std::bind(&Client::request, m_client.get(), "!}}"s, std::placeholders::_1), boost::asio::detached); m_client->waitFor(2s, [this]() { return m_client->m_done; }); @@ -399,7 +400,7 @@ TEST_F(WebsocketsTest, should_return_multiple_errors_when_parameters_are_invalid std::bind(&Client::request, m_client.get(), R"DOC({"id": 3, "request": "sample", "interval": 99999999999,"to": -1 })DOC", std::placeholders::_1), - boost::asio::detached); + boost::asio::detached); m_client->waitFor(2s, [this]() { return m_client->m_done; }); From b7ab1669b61828fb3d899b7c9d09a7693f4bcc00 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 3 Nov 2025 13:03:14 +0100 Subject: [PATCH 8/9] Added check to make sure unavailable check excludes conditions for validation --- test_package/mqtt_entity_sink_test.cpp | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp index bb868908e..46110ce9b 100644 --- a/test_package/mqtt_entity_sink_test.cpp +++ b/test_package/mqtt_entity_sink_test.cpp @@ -277,6 +277,8 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_use_flat_topic_structure) << "Topic does not match expected format: " << receivedTopic; client->async_disconnect(); + + stopAgent(); stopClient(); } @@ -630,10 +632,15 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) handler->m_receive = [&gotUnavailable, &receivedJson](std::shared_ptr client, const std::string& topic, const std::string& payload) { - receivedJson = json::parse(payload); - if (receivedJson["result"] == "UNAVAILABLE") + + json j = json::parse(payload); + if (j["category"] != "CONDITION") { - gotUnavailable = true; + if (j["result"] == "UNAVAILABLE") + { + gotUnavailable = true; + } + receivedJson = j; } }; @@ -648,6 +655,9 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); // Initial observations should include UNAVAILABLE values + // The issue is the initial values will be received in a random order and the conditions have a + // level instead of a result. If in the interviening time a condition is received, the json may + // not be correct. ASSERT_TRUE(waitFor(10s, [&gotUnavailable]() { return gotUnavailable; })); EXPECT_EQ("UNAVAILABLE", receivedJson["result"].get()); From 08caeea2ac4c2904a5c228ab09f7b8f8b7700808 Mon Sep 17 00:00:00 2001 From: Will Sobel Date: Mon, 3 Nov 2025 13:16:07 +0100 Subject: [PATCH 9/9] Exclude the agent device as well --- test_package/mqtt_entity_sink_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp index 46110ce9b..3a6a513b2 100644 --- a/test_package/mqtt_entity_sink_test.cpp +++ b/test_package/mqtt_entity_sink_test.cpp @@ -634,7 +634,7 @@ TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) const std::string& payload) { json j = json::parse(payload); - if (j["category"] != "CONDITION") + if (j["category"] != "CONDITION" && topic.starts_with("MTConnect/Devices/000/Observations/")) { if (j["result"] == "UNAVAILABLE") {