diff --git a/README.md b/README.md index da2f859ce..e1647b101 100755 --- a/README.md +++ b/README.md @@ -904,6 +904,10 @@ Sinks { *Default*: All data items +## MQTT Entity Sink Documentation + +For detailed configuration, usage, and message format for the MQTT Entity Sink, see: [docs: MTConnect MQTT Entity Sink](src/mtconnect/sink/mqtt_entity_sink/README.md) + ### Adapter Configuration Items ### * `Adapters` - Contains a list of device blocks. If there are no Adapters diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 37cfdef25..4e979e0c1 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -257,6 +257,14 @@ set(AGENT_SOURCES "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" +# src/sink/mqtt_entity_sink HEADER_FILE_ONLY + + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" + +#src/sink/mqtt_entity_sink SOURCE_FILES_ONLY + + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp" + # src/sink/rest_sink HEADER_FILE_ONLY "${SOURCE_DIR}/sink/rest_sink/cached_file.hpp" @@ -341,6 +349,7 @@ if(MSVC) # The modules including Beast required the /bigobj option in Windows set_property(SOURCE "${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp" + "${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp" "${SOURCE_DIR}/sink/rest_sink/session_impl.cpp" "${SOURCE_DIR}/source/adapter/mqtt/mqtt_adapter.cpp" "${SOURCE_DIR}/source/adapter/agent_adapter/agent_adapter.cpp" diff --git a/schemas/cfg.schema.json b/schemas/cfg.schema.json index 182341b3a..b81c9c9bf 100644 --- a/schemas/cfg.schema.json +++ b/schemas/cfg.schema.json @@ -314,6 +314,73 @@ "enum":["at_least_once", "at_most_once", "exactly_once"] } } + }, + "MqttEntitySink": { + "type": "object", + "properties": { + "MqttHost": { + "type": "string", + "description": "IP Address or name of the MQTT Broker", + "default": "127.0.0.1" + }, + "MqttPort": { + "type": "integer", + "description": "Port number of MQTT Broker", + "default": 1883 + }, + "MqttTls": { + "type": "boolean", + "description": "TLS Certificate for secure connection to the MQTT Broker", + "default": false + }, + "MqttUserName": { + "type": "string", + "description": "Username for MQTT authentication" + }, + "MqttPassword": { + "type": "string", + "description": "Password for MQTT authentication" + }, + "MqttClientId": { + "type": "string", + "description": "MQTT client identifier" + }, + "MqttQOS": { + "type": "string", + "description": "The quality of service level for the MQTT connection. Options are: at_least_once, at_most_once, and exactly_once", + "default": "at_least_once", + "enum": [ + "at_least_once", + "at_most_once", + "exactly_once" + ] + }, + "MqttRetain": { + "type": "boolean", + "description": "Retain the last message sent to the broker", + "default": false + }, + "ObservationTopicPrefix": { + "type": "string", + "description": "Prefix for the Observations topic", + "default": "MTConnect/Devices/[device]/Observations" + }, + "DeviceTopicPrefix": { + "type": "string", + "description": "Prefix for the Device topic", + "default": "MTConnect/Probe/[device]" + }, + "AssetTopicPrefix": { + "type": "string", + "description": "Prefix for the Asset topic", + "default": "MTConnect/Asset/[device]" + }, + "MqttLastWillTopic": { + "type": "string", + "description": "The topic used for the last will and testament for an agent", + "default": "MTConnect/Probe/[device]/Availability" + } + } } } }, diff --git a/src/mtconnect/configuration/agent_config.cpp b/src/mtconnect/configuration/agent_config.cpp index 2b6b9a67c..1d4c31632 100644 --- a/src/mtconnect/configuration/agent_config.cpp +++ b/src/mtconnect/configuration/agent_config.cpp @@ -58,6 +58,7 @@ #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/device_model/device.hpp" #include "mtconnect/printer/xml_printer.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" #include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" #include "mtconnect/sink/rest_sink/rest_service.hpp" #include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp" @@ -111,6 +112,7 @@ namespace mtconnect::configuration { sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory); sink::rest_sink::RestService::registerFactory(m_sinkFactory); + sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory); adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory); adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory); adapter::agent_adapter::AgentAdapter::registerFactory(m_sourceFactory); diff --git a/src/mtconnect/configuration/config_options.hpp b/src/mtconnect/configuration/config_options.hpp index b192c70d4..e2ac36aec 100644 --- a/src/mtconnect/configuration/config_options.hpp +++ b/src/mtconnect/configuration/config_options.hpp @@ -107,6 +107,9 @@ namespace mtconnect { DECLARE_CONFIGURATION(MqttMaxTopicDepth); DECLARE_CONFIGURATION(MqttLastWillTopic); DECLARE_CONFIGURATION(MqttXPath); + DECLARE_CONFIGURATION(ObservationTopicPrefix); + DECLARE_CONFIGURATION(DeviceTopicPrefix); + DECLARE_CONFIGURATION(AssetTopicPrefix); ///@} /// @name Adapter Configuration diff --git a/src/mtconnect/mqtt/mqtt_server_impl.hpp b/src/mtconnect/mqtt/mqtt_server_impl.hpp index ce8a9f621..0a147d9ac 100644 --- a/src/mtconnect/mqtt/mqtt_server_impl.hpp +++ b/src/mtconnect/mqtt/mqtt_server_impl.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -217,12 +218,12 @@ namespace mtconnect { LOG(debug) << "Server topic_name: " << topic_name; LOG(debug) << "Server contents: " << contents; - auto const &idx = m_subs.get(); - auto r = idx.equal_range(topic_name); - for (; r.first != r.second; ++r.first) + for (const auto &sub : m_subs) { - r.first->con->publish(topic_name, contents, - std::min(r.first->qos_value, pubopts.get_qos())); + if (mqtt::broker::compare_topic_filter(sub.topic, topic_name)) + { + sub.con->publish(topic_name, contents, std::min(sub.qos_value, pubopts.get_qos())); + } } return true; diff --git a/src/mtconnect/sink/mqtt_entity_sink/README.md b/src/mtconnect/sink/mqtt_entity_sink/README.md new file mode 100644 index 000000000..c01d2eea7 --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/README.md @@ -0,0 +1,243 @@ +# MTConnect MQTT Entity Sink + +## Overview + +The MQTT Entity Sink publishes MTConnect observations to an external MQTT broker in a flat, per-data-item topic structure compatible with MTConnect.NET's MQTT Relay Entity format. + +## Features + +- **Real-time streaming** of observations +- **Flat topic structure**: `MTConnect/Devices/[device-uuid]/Observations/[dataItemId]` +- **JSON format** with full observation metadata +- **ISO 8601 timestamps** +- **Supports SAMPLE, EVENT, and CONDITION observations** +- **Last Will Testament** for availability +- **TLS/SSL support** for secure communication +- **Configurable QoS levels** for MQTT message delivery +- **Message retention** support for new subscribers + +## Quick Start + +### 1. Configuration + +Add to `agent.cfg`: + +```properties +Sinks { + MqttEntitySink { + MqttHost = localhost + MqttPort = 1883 + MqttTls = false + + # QoS levels (string values): + # at_most_once = Fire and forget + # at_least_once = Acknowledged delivery (default) + # exactly_once = Guaranteed delivery + MqttQOS = at_least_once + + # Retain last message on broker for new subscribers + MqttRetain = false + } +} +``` + +## Configuration Options + +### Connection Settings + +| Parameter | Type | Default | Description | +|-------------------|---------|-------------|-----------------------------------------------| +| **MqttHost** | string | `127.0.0.1` | MQTT broker hostname or IP address | +| **MqttPort** | integer | `1883` | MQTT broker port (1883 for TCP, 8883 for TLS) | +| **MqttTls** | boolean | `false` | Enable TLS encryption | +| **MqttClientId** | string | auto-gen | MQTT client identifier (unique per connection) | + +### Authentication + +| Parameter | Type | Default | Description | +|-------------------|---------|---------|-----------------------------------------------| +| **MqttUserName** | string | - | Username for MQTT authentication (optional) | +| **MqttPassword** | string | - | Password for MQTT authentication (optional) | + +### TLS/SSL Settings + +| Parameter | Type | Default | Description | +|-------------------|---------|---------|-----------------------------------------------| +| **MqttCaCert** | string | - | Path to CA certificate for TLS verification | +| **MqttCert** | string | - | Path to client certificate for mutual TLS | +| **MqttPrivateKey**| string | - | Path to client private key | + +### Topic Configuration + +| Parameter | Type | Default | Description | +|--------------------------|---------|-------------------------------------------|----------------------------------------------| +| **ObservationTopicPrefix**| string | `MTConnect/Devices/[device]/Observations` | Topic prefix for observations. `[device]` is replaced with device UUID | +| **DeviceTopicPrefix** | string | `MTConnect/Probe/[device]` | Topic prefix for device models | +| **AssetTopicPrefix** | string | `MTConnect/Asset/[device]` | Topic prefix for assets | +| **MqttLastWillTopic** | string | `MTConnect/Probe/[device]/Availability` | Last Will Testament topic for availability | + +### MQTT Protocol Settings + +| Parameter | Type | Default | Description | +|---------------|---------|-----------------|---------------------------------------------| +| **MqttQOS** | string | `at_least_once` | Quality of Service level for MQTT messages | +| **MqttRetain**| boolean | `false` | Retain messages on the broker for new subscribers | + +#### MqttQOS Values + +The Quality of Service (QoS) level determines how messages are delivered: + +- **at_most_once** (Fire and forget) + - Fastest delivery + - No acknowledgment required + - Message may be lost if network fails + - **Use case**: High-frequency data where occasional loss is acceptable + +- **at_least_once** (Default) + - Guaranteed delivery with acknowledgment + - Messages may be duplicated + - Balance of reliability and performance + - **Use case**: General-purpose MTConnect data streaming + +- **exactly_once** + - Guaranteed exactly-once delivery + - Highest reliability, slowest performance + - Four-way handshake protocol + - **Use case**: Critical alarms, conditions, or important events + +```properties +# Example: High-reliability configuration for critical data +MqttQOS = exactly_once +``` + +#### MqttRetain Flag + +The retain flag determines if the broker stores the last message for each topic: + +- **false** (Default) + - Messages are not retained + - New subscribers only receive future messages + - Lower broker storage requirements + - **Use case**: High-frequency streaming data + +- **true** + - Broker stores the last message per topic + - New subscribers immediately receive last known value + - Useful for status and current values + - **Use case**: Availability, current state, alarms + +```properties +# Example: Retain messages for immediate status updates +MqttRetain = true +``` + +**⚠️ Important Notes:** +- Retained messages persist on the broker until explicitly cleared +- Use retention sparingly with high-frequency data to avoid broker storage issues +- Retained availability messages help dashboards show immediate device status + +## Complete Configuration Example + +```properties +Sinks { + MqttEntitySink { + MqttHost = localhost + MqttPort = 1883 + MqttTls = false + MqttQOS = at_least_once + MqttRetain = false + ObservationTopicPrefix = MTConnect/Devices/[device]/Observations + DeviceTopicPrefix = MTConnect/Probe/[device] + AssetTopicPrefix = MTConnect/Asset/[device] + MqttLastWillTopic = MTConnect/Probe/[device]/Availability + } +} +``` + +## Message Format + +### SAMPLE Observation +```json +{ + "dataItemId": "Xact", + "name": "X_actual", + "type": "POSITION", + "subType": "ACTUAL", + "category": "SAMPLE", + "timestamp": "2025-10-20T15:30:45.123456Z", + "result": "150.500000", + "sequence": 1234 +} +``` + +### EVENT Observation +```json +{ + "dataItemId": "mode", + "name": "ControllerMode", + "type": "CONTROLLER_MODE", + "category": "EVENT", + "timestamp": "2025-10-20T15:30:46.000000Z", + "result": "AUTOMATIC", + "sequence": 1235 +} +``` + +### CONDITION Observation +```json +{ + "dataItemId": "system", + "name": "SystemCondition", + "type": "SYSTEM", + "category": "CONDITION", + "level": "FAULT", + "nativeCode": "E001", + "conditionId": "E001", + "message": "Hydraulic pressure low", + "timestamp": "2025-10-20T15:30:47.000000Z", + "sequence": 1236 +} +``` + +## Topic Structure + +``` +MTConnect/ +├── Devices/ +│ └── {device-uuid}/ +│ └── Observations/ +│ ├── {dataItemId1} → Observation JSON +│ ├── {dataItemId2} → Observation JSON +│ └── ... +└── Probe/ + └── {device-uuid}/ + └── Availability → AVAILABLE/UNAVAILABLE (LWT) +``` + +**Example Topics:** +``` +MTConnect/Devices/OKUMA.123456/Observations/Xact +MTConnect/Devices/OKUMA.123456/Observations/avail +MTConnect/Probe/OKUMA.123456/Availability +``` + +## Performance Considerations + +### QoS Impact + +- **at_most_once**: ~2x faster than at_least_once, best for high-frequency sampling data +- **at_least_once**: Balanced performance, suitable for most use cases +- **exactly_once**: ~2x slower than at_least_once, use only for critical data + +### Retain Flag Impact + +- **Without Retain**: Minimal broker storage, suitable for streaming +- **With Retain**: Increases broker storage per topic, use selectively for: + - Availability status + - Current operating mode + - Alarm states + - Device health indicators + +**💡 Recommendation**: Use `MqttRetain = true` only for low-frequency, stateful data. + +--- diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp new file mode 100644 index 000000000..6dbdd23fc --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.cpp @@ -0,0 +1,559 @@ +// +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#include "mqtt_entity_sink.hpp" + +#include +#include + +#include + +#include "mtconnect/configuration/config_options.hpp" +#include "mtconnect/entity/entity.hpp" +#include "mtconnect/mqtt/mqtt_client_impl.hpp" +#include "mtconnect/observation/observation.hpp" + +using ptree = boost::property_tree::ptree; +using json = nlohmann::json; + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::asset; +using namespace mtconnect::observation; + +namespace asio = boost::asio; +namespace config = ::mtconnect::configuration; + +namespace mtconnect { + namespace sink { + namespace mqtt_entity_sink { + + MqttEntitySink::MqttEntitySink(boost::asio::io_context& context, + sink::SinkContractPtr&& contract, const ConfigOptions& options, + const ptree& config) + : Sink("MqttEntitySink", std::move(contract)), + m_context(context), + m_strand(context), + m_options(options) + { + GetOptions(config, m_options, options); + + AddOptions(config, m_options, + {{configuration::MqttCaCert, string()}, + {configuration::MqttPrivateKey, string()}, + {configuration::MqttCert, string()}, + {configuration::MqttClientId, string()}, + {configuration::MqttUserName, string()}, + {configuration::MqttPassword, string()}}); + + AddDefaultedOptions( + config, m_options, + {{configuration::MqttHost, "127.0.0.1"s}, + {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, + {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, + {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, + {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, + {configuration::MqttPort, 1883}, + {configuration::MqttTls, false}, + {configuration::MqttQOS, 1}, + {configuration::MqttRetain, false}}); + + m_observationTopicPrefix = get(m_options[configuration::ObservationTopicPrefix]); + m_deviceTopicPrefix = get(m_options[configuration::DeviceTopicPrefix]); + m_assetTopicPrefix = get(m_options[configuration::AssetTopicPrefix]); + } + + static MqttClient::QOS parseQos(const ConfigOptions& options) + { + if (auto qosInt = GetOption(options, configuration::MqttQOS)) + { + return *qosInt == 0 ? MqttClient::QOS::at_most_once + : *qosInt == 2 ? MqttClient::QOS::exactly_once + : MqttClient::QOS::at_least_once; + } + + if (auto qosStr = GetOption(options, configuration::MqttQOS)) + { + if (*qosStr == "at_most_once" || *qosStr == "0") + return MqttClient::QOS::at_most_once; + if (*qosStr == "exactly_once" || *qosStr == "2") + return MqttClient::QOS::exactly_once; + } + + return MqttClient::QOS::at_least_once; + } + + void MqttEntitySink::start() + { + if (!m_client) + { + auto clientHandler = make_unique(); + clientHandler->m_connected = [this](shared_ptr client) { + LOG(debug) << "MqttEntitySink: Client connected to broker"; + client->connectComplete(); + + auto agentDevice = m_sinkContract->getDeviceByName("Agent"); + if (agentDevice) + { + auto lwtTopic = get(m_options[configuration::MqttLastWillTopic]); + boost::replace_all(lwtTopic, "[device]", *agentDevice->getUuid()); + + // Get QoS and Retain from options + auto qos = parseQos(m_options); + bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); + LOG(debug) << "Publishing availability to: " << lwtTopic; + + client->publish(lwtTopic, "AVAILABLE", retain, qos); + } + + // Publish initial content + publishInitialContent(); + }; + + auto agentDevice = m_sinkContract->getDeviceByName("Agent"); + auto lwtTopic = get(m_options[configuration::MqttLastWillTopic]); + if (agentDevice) + { + boost::replace_all(lwtTopic, "[device]", *agentDevice->getUuid()); + } + m_lastWillTopic = lwtTopic; + + if (IsOptionSet(m_options, configuration::MqttTls)) + { + m_client = make_shared(m_context, m_options, std::move(clientHandler), + m_lastWillTopic, "UNAVAILABLE"s); + } + else + { + m_client = make_shared(m_context, m_options, std::move(clientHandler), + m_lastWillTopic, "UNAVAILABLE"s); + } + } + LOG(debug) << "Starting MQTT Entity Sink client"; + m_client->start(); + } + + void MqttEntitySink::stop() + { + if (m_client) + { + // Publish UNAVAILABLE before disconnecting + if (m_client->isConnected()) + { + auto qos = parseQos(m_options); + m_client->publish(m_lastWillTopic, "UNAVAILABLE", true, qos); + } + m_client->stop(); + } + } + + void MqttEntitySink::publishInitialContent() + { + LOG(debug) << "MqttEntitySink: Publishing initial content"; + + // Publish all devices + int deviceCount = 0; + for (auto& dev : m_sinkContract->getDevices()) + { + publish(dev); + deviceCount++; + } + LOG(debug) << "Published " << deviceCount << " devices"; + + // Publish current observations for all devices + int obsCount = 0; + for (auto& dev : m_sinkContract->getDevices()) + { + auto& buffer = m_sinkContract->getCircularBuffer(); + std::lock_guard lock(buffer); + + auto latest = buffer.getLatest(); + observation::ObservationList observations; + + for (auto& di : dev->getDeviceDataItems()) + { + auto dataItem = di.lock(); + if (dataItem) + { + auto obs = latest.getObservation(dataItem->getId()); + if (obs) + { + observations.push_back(obs); + } + } + } + + // Publish each observation + for (auto& obs : observations) + { + auto obsCopy = obs; + if (m_client && m_client->isConnected()) + { + publish(obsCopy); + obsCount++; + } + else + { + std::lock_guard lock(m_queueMutex); + if (m_queuedObservations.size() >= MAX_QUEUE_SIZE) + { + m_queuedObservations.erase(m_queuedObservations.begin()); + } + m_queuedObservations.push_back(obsCopy); + obsCount++; + } + } + } + LOG(debug) << "Published " << obsCount << " initial observations"; + } + + std::string MqttEntitySink::formatTimestamp(const Timestamp& timestamp) + { + using namespace date; + using namespace std::chrono; + + std::ostringstream oss; + oss << date::format("%FT%TZ", floor(timestamp)); + return oss.str(); + } + + std::string MqttEntitySink::getObservationValue( + const observation::ObservationPtr& observation) + { + if (observation->isUnavailable()) + { + return "UNAVAILABLE"; + } + + auto value = observation->getValue(); + + if (holds_alternative(value)) + { + return get(value); + } + else if (holds_alternative(value)) + { + return std::to_string(get(value)); + } + else if (holds_alternative(value)) + { + std::ostringstream oss; + oss << std::fixed << std::setprecision(6) << get(value); + return oss.str(); + } + else if (holds_alternative(value)) + { + auto& vec = get(value); + std::ostringstream oss; + for (size_t i = 0; i < vec.size(); ++i) + { + if (i > 0) + oss << " "; + oss << std::fixed << std::setprecision(6) << vec[i]; + } + return oss.str(); + } + else if (holds_alternative(value)) + { + // For DataSet, return as JSON string + json j = json::object(); + auto& ds = get(value); + for (auto& entry : ds) + { + // Convert the variant value to appropriate JSON type + if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + else if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + else if (holds_alternative(entry.m_value)) + { + j[entry.m_key] = get(entry.m_value); + } + } + return j.dump(); + } + + return "UNAVAILABLE"; + } + + std::string MqttEntitySink::formatObservationJson( + const observation::ObservationPtr& observation) + { + json j; + + try + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + LOG(error) << "Observation has no data item"; + return "{}"; + } + + j["dataItemId"] = dataItem->getId(); + + const auto& name = dataItem->getName(); + if (name && !name->empty()) + { + j["name"] = *name; + } + + j["type"] = dataItem->getType(); + + auto subType = dataItem->maybeGet("subType"); + if (subType && !subType->empty()) + { + j["subType"] = *subType; + } + + j["timestamp"] = formatTimestamp(observation->getTimestamp()); + + // Get the category + auto category = dataItem->getCategory(); + if (category == device_model::data_item::DataItem::SAMPLE) + { + j["category"] = "SAMPLE"; + } + else if (category == device_model::data_item::DataItem::EVENT) + { + j["category"] = "EVENT"; + } + else if (category == device_model::data_item::DataItem::CONDITION) + { + j["category"] = "CONDITION"; + } + + // Add the result/value + j["result"] = getObservationValue(observation); + + // Add sequence number + j["sequence"] = observation->getSequence(); + + auto result = j.dump(); + LOG(trace) << "Formatted observation JSON: " << result; + return result; + } + catch (const std::exception& e) + { + LOG(error) << "Exception formatting observation: " << e.what(); + return "{}"; + } + } + + std::string MqttEntitySink::formatConditionJson(const observation::ConditionPtr& condition) + { + json j; + + auto dataItem = condition->getDataItem(); + if (!dataItem) + { + return "{}"; + } + + j["dataItemId"] = dataItem->getId(); + + const auto& name = dataItem->getName(); + if (name && !name->empty()) + { + j["name"] = *name; + } + + j["type"] = dataItem->getType(); + + auto subType = dataItem->maybeGet("subType"); + if (subType && !subType->empty()) + { + j["subType"] = *subType; + } + + j["timestamp"] = formatTimestamp(condition->getTimestamp()); + j["category"] = "CONDITION"; + + // Add condition-specific fields + switch (condition->getLevel()) + { + case Condition::NORMAL: + j["level"] = "NORMAL"; + break; + case Condition::WARNING: + j["level"] = "WARNING"; + break; + case Condition::FAULT: + j["level"] = "FAULT"; + break; + case Condition::UNAVAILABLE: + j["level"] = "UNAVAILABLE"; + break; + } + + // Add native code if present + if (condition->hasProperty("nativeCode")) + { + j["nativeCode"] = condition->get("nativeCode"); + } + + // Add condition ID if present + if (!condition->getCode().empty()) + { + j["conditionId"] = condition->getCode(); + } + + // Add message/value if present + if (condition->hasValue()) + { + j["message"] = getObservationValue(condition); + } + + // Add sequence number + j["sequence"] = condition->getSequence(); + + return j.dump(); + } + + std::string MqttEntitySink::getObservationTopic( + const observation::ObservationPtr& observation) + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + return ""; + } + + auto device = dataItem->getComponent()->getDevice(); + if (!device) + { + return ""; + } + + std::string topic = m_observationTopicPrefix; + boost::replace_all(topic, "[device]", *device->getUuid()); + + // Append data item ID for flat structure + topic += "/" + dataItem->getId(); + + return topic; + } + + bool MqttEntitySink::publish(observation::ObservationPtr& observation) + { + auto dataItem = observation->getDataItem(); + if (!dataItem) + { + LOG(warning) << "MqttEntitySink::publish: Observation has no data item"; + return false; + } + + if (!m_client || !m_client->isConnected()) + { + std::lock_guard lock(m_queueMutex); + if (m_queuedObservations.size() >= MAX_QUEUE_SIZE) + { + LOG(warning) << "MqttEntitySink::publish: Observation queue full (" << MAX_QUEUE_SIZE + << "), dropping oldest observation for " + << m_queuedObservations.front()->getDataItem()->getId(); + m_queuedObservations.erase(m_queuedObservations.begin()); + } + LOG(debug) << "MqttEntitySink::publish: Client not connected, queuing observation for " + << dataItem->getId(); + m_queuedObservations.push_back(observation); + return false; + } + + std::string topic = getObservationTopic(observation); + if (topic.empty()) + { + LOG(warning) << "MqttEntitySink::publish: Empty topic for " << dataItem->getId(); + return false; + } + + // Get QoS setting + auto qos = parseQos(m_options); + bool retain = GetOption(m_options, configuration::MqttRetain).value_or(false); + + try + { + auto condition = dynamic_pointer_cast(observation); + if (condition) + { + observation::ConditionList condList; + condition->getFirst()->getConditionList(condList); + + for (auto& cond : condList) + { + std::string payload = formatConditionJson(cond); + LOG(debug) << "Publishing condition to: " << topic + << ", payload size: " << payload.size(); + m_client->publish(topic, payload, retain, qos); + } + } + else + { + std::string payload = formatObservationJson(observation); + LOG(debug) << "Publishing observation to: " << topic << ", size: " << payload.size(); + m_client->publish(topic, payload, retain, qos); + } + + return true; + } + catch (const std::exception& e) + { + LOG(error) << "Exception publishing observation: " << e.what(); + return false; + } + } + + bool MqttEntitySink::publish(device_model::DevicePtr device) + { + if (!m_client || !m_client->isConnected()) + { + return false; + } + + // For device, we could publish the device XML or JSON + // For now, just return true as device publishing is optional + return true; + } + + bool MqttEntitySink::publish(asset::AssetPtr asset) + { + if (!m_client || !m_client->isConnected()) + { + return false; + } + + // Asset publishing can be added here if needed + return true; + } + + void MqttEntitySink::registerFactory(SinkFactory& factory) + { + factory.registerFactory( + "MqttEntitySink", + [](const std::string& name, boost::asio::io_context& io, SinkContractPtr&& contract, + const ConfigOptions& options, const boost::property_tree::ptree& block) -> SinkPtr { + auto sink = std::make_shared(io, std::move(contract), options, block); + return sink; + }); + } + + } // namespace mqtt_entity_sink + } // namespace sink +} // namespace mtconnect diff --git a/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp new file mode 100644 index 000000000..db71cf0ad --- /dev/null +++ b/src/mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp @@ -0,0 +1,138 @@ +// +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include "boost/asio/io_context.hpp" +#include + +#include + +#include "mtconnect/buffer/checkpoint.hpp" +#include "mtconnect/config.hpp" +#include "mtconnect/configuration/agent_config.hpp" +#include "mtconnect/mqtt/mqtt_client.hpp" +#include "mtconnect/observation/observation.hpp" +#include "mtconnect/sink/sink.hpp" +#include "mtconnect/utilities.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::mqtt_client; +using json = nlohmann::json; + +namespace mtconnect { + namespace sink { + namespace mqtt_entity_sink { + + /// @brief MTConnect Entity MQTT Sink - publishes observations per data item + class AGENT_LIB_API MqttEntitySink : public sink::Sink + { + public: + /// @brief Create an MQTT Entity Sink + /// @param context the boost asio io_context + /// @param contract the Sink Contract from the agent + /// @param options configuration options + /// @param config additional configuration options + MqttEntitySink(boost::asio::io_context& context, sink::SinkContractPtr&& contract, + const ConfigOptions& options, const boost::property_tree::ptree& config); + + ~MqttEntitySink() = default; + + // Sink Methods + /// @brief Start the MQTT Entity service + void start() override; + + /// @brief Shutdown the MQTT Entity service + void stop() override; + + /// @brief Receive an observation and publish it + /// @param observation shared pointer to the observation + /// @return `true` if the publishing was successful + bool publish(observation::ObservationPtr& observation) override; + + /// @brief Receive an asset + /// @param asset shared point to the asset + /// @return `true` if successful + bool publish(asset::AssetPtr asset) override; + + /// @brief Receive a device + /// @param device shared pointer to the device + /// @return `true` if successful + bool publish(device_model::DevicePtr device) override; + + /// @brief Register the Sink factory to create this sink + /// @param factory + static void registerFactory(SinkFactory& factory); + + /// @brief Gets the MQTT Client + /// @return MqttClient + std::shared_ptr getClient() { return m_client; } + + /// @brief Check if MQTT Client is Connected + /// @return `true` when the client is connected + bool isConnected() { return m_client && m_client->isConnected(); } + + protected: + /// @brief Format observation as JSON matching MTConnect.NET format + /// @param observation the observation to format + /// @return JSON string + std::string formatObservationJson(const observation::ObservationPtr& observation); + + /// @brief Format condition observation as JSON + /// @param condition the condition observation + /// @return JSON string + std::string formatConditionJson(const observation::ConditionPtr& condition); + + /// @brief Get topic for observation using flat structure + /// @param observation the observation + /// @return formatted topic string + std::string getObservationTopic(const observation::ObservationPtr& observation); + + /// @brief Get value from observation as string + /// @param observation the observation + /// @return value as string + std::string getObservationValue(const observation::ObservationPtr& observation); + + /// @brief Publish initial device and current observations + void publishInitialContent(); + + /// @brief Convert timestamp to ISO 8601 format + /// @param timestamp the timestamp + /// @return ISO 8601 string + std::string formatTimestamp(const Timestamp& timestamp); + + protected: + static constexpr size_t MAX_QUEUE_SIZE = 10000; // Maximum queued observations + + std::string m_observationTopicPrefix; //! Observation topic prefix + std::string m_deviceTopicPrefix; //! Device topic prefix + std::string m_assetTopicPrefix; //! Asset topic prefix + std::string m_lastWillTopic; //! Topic to publish last will + + boost::asio::io_context& m_context; + boost::asio::io_context::strand m_strand; + + ConfigOptions m_options; + + std::shared_ptr m_client; + std::vector m_queuedObservations; + std::mutex m_queueMutex; + }; + } // namespace mqtt_entity_sink + } // namespace sink +} // namespace mtconnect diff --git a/test_package/CMakeLists.txt b/test_package/CMakeLists.txt index 2cd035cc7..2dc65edb9 100644 --- a/test_package/CMakeLists.txt +++ b/test_package/CMakeLists.txt @@ -297,6 +297,7 @@ add_agent_test(table TRUE observation) add_agent_test(checkpoint FALSE buffer) add_agent_test(circular_buffer FALSE buffer) +add_agent_test(mqtt_entity_sink FALSE sink/mqtt_entity_sink TRUE) if (WITH_RUBY) diff --git a/test_package/agent_test_helper.hpp b/test_package/agent_test_helper.hpp index 45ff79c00..f1aa59a7d 100644 --- a/test_package/agent_test_helper.hpp +++ b/test_package/agent_test_helper.hpp @@ -29,6 +29,7 @@ #include "mtconnect/configuration/agent_config.hpp" #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/pipeline/pipeline.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" #include "mtconnect/sink/mqtt_sink/mqtt_service.hpp" #include "mtconnect/sink/rest_sink/response.hpp" #include "mtconnect/sink/rest_sink/rest_service.hpp" @@ -124,6 +125,7 @@ class AgentTestHelper ~AgentTestHelper() { m_mqttService.reset(); + m_mqttEntitySink.reset(); m_restService.reset(); m_adapter.reset(); if (m_agent) @@ -169,6 +171,13 @@ class AgentTestHelper return rest; } + std::shared_ptr getMqttEntitySink() + { + using namespace mtconnect::sink::mqtt_entity_sink; + auto sink = m_agent->findSink("MqttEntitySink"); + return std::dynamic_pointer_cast(sink); + } + std::shared_ptr getMqttService() { using namespace mtconnect; @@ -189,6 +198,7 @@ class AgentTestHelper sink::rest_sink::RestService::registerFactory(m_sinkFactory); sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory); + sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory); source::adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory); ConfigOptions options = ops; @@ -230,6 +240,17 @@ class AgentTestHelper m_agent->addSink(m_mqttService); } + if (HasOption(options, "MqttEntitySink")) + { + auto mqttEntityContract = m_agent->makeSinkContract(); + mqttEntityContract->m_pipelineContext = m_context; + auto mqttEntitySink = m_sinkFactory.make("MqttEntitySink", "MqttEntitySink", m_ioContext, + std::move(mqttEntityContract), options, ptree {}); + m_mqttEntitySink = + std::dynamic_pointer_cast(mqttEntitySink); + m_agent->addSink(m_mqttEntitySink); + } + m_agent->initialize(m_context); if (observe) @@ -303,6 +324,7 @@ class AgentTestHelper std::shared_ptr m_context; std::shared_ptr m_adapter; std::shared_ptr m_mqttService; + std::shared_ptr m_mqttEntitySink; std::shared_ptr m_restService; std::shared_ptr m_loopback; diff --git a/test_package/mqtt_entity_sink_test.cpp b/test_package/mqtt_entity_sink_test.cpp new file mode 100644 index 000000000..3a6a513b2 --- /dev/null +++ b/test_package/mqtt_entity_sink_test.cpp @@ -0,0 +1,785 @@ +// +// Copyright Copyright 2009-2025, AMT – The Association For Manufacturing Technology ("AMT") +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Ensure that gtest is the first header otherwise Windows raises an error +#include +// Keep this comment to keep gtest.h above. (clang-format off/on is not working here!) + +#include +#include + +#include + +#include "agent_test_helper.hpp" +#include "json_helper.hpp" +#include "mtconnect/buffer/checkpoint.hpp" +#include "mtconnect/device_model/data_item/data_item.hpp" +#include "mtconnect/entity/entity.hpp" +#include "mtconnect/entity/json_parser.hpp" +#include "mtconnect/mqtt/mqtt_client_impl.hpp" +#include "mtconnect/mqtt/mqtt_server_impl.hpp" +#include "mtconnect/printer/json_printer.hpp" +#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp" +#include "test_utilities.hpp" + +using namespace std; +using namespace mtconnect; +using namespace mtconnect::device_model::data_item; +using namespace mtconnect::sink::mqtt_entity_sink; +using namespace mtconnect::asset; +using namespace mtconnect::configuration; + +// main +int main(int argc, char* argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +using json = nlohmann::json; + +class MqttEntitySinkTest : public testing::Test +{ +protected: + std::unique_ptr m_agentTestHelper; + std::shared_ptr m_server; + std::shared_ptr m_client; + std::unique_ptr m_jsonPrinter; + uint16_t m_port {0}; + + void SetUp() override + { + m_agentTestHelper = std::make_unique(); + m_jsonPrinter = std::make_unique(2, true); + } + + void TearDown() override + { + stopAgent(); + stopClient(); + stopServer(); + + m_agentTestHelper.reset(); + m_jsonPrinter.reset(); + } + + void createAgent(std::string testFile = {}, ConfigOptions options = {}) + { + if (testFile == "") + testFile = "/samples/test_config.xml"; + ConfigOptions opts(options); + MergeOptions( + opts, + { + {"MqttEntitySink", true}, + {configuration::MqttPort, m_port}, + {MqttCurrentInterval, 200ms}, + {MqttSampleInterval, 100ms}, + {configuration::MqttHost, "127.0.0.1"s}, + {configuration::ObservationTopicPrefix, "MTConnect/Devices/[device]/Observations"s}, + {configuration::DeviceTopicPrefix, "MTConnect/Probe/[device]"s}, + {configuration::AssetTopicPrefix, "MTConnect/Asset/[device]"s}, + {configuration::MqttLastWillTopic, "MTConnect/Probe/[device]/Availability"s}, + }); + m_agentTestHelper->createAgent("/samples/test_config.xml", 8, 4, "2.0", 25, false, true, opts); + addAdapter(); + m_agentTestHelper->getAgent()->start(); + } + + void createServer(const ConfigOptions& options) + { + using namespace mtconnect::configuration; + ConfigOptions opts(options); + MergeOptions(opts, {{ServerIp, "127.0.0.1"s}, + {MqttPort, 0}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); + m_server = std::make_shared( + m_agentTestHelper->m_ioContext, opts); + } + + template + bool waitFor(const chrono::duration& time, function pred) + { + boost::asio::steady_timer timer(m_agentTestHelper->m_ioContext); + timer.expires_after(time); + bool timeout = false; + timer.async_wait([&timeout](boost::system::error_code ec) { + if (!ec) + timeout = true; + }); + while (!timeout && !pred()) + { + m_agentTestHelper->m_ioContext.run_for(100ms); + } + timer.cancel(); + return pred(); + } + + void startServer() + { + if (m_server) + { + bool start = m_server->start(); + if (start) + { + m_port = m_server->getPort(); + m_agentTestHelper->m_ioContext.run_for(500ms); + } + } + } + + void createClient(const ConfigOptions& options, unique_ptr&& handler) + { + ConfigOptions opts(options); + MergeOptions(opts, {{MqttHost, "127.0.0.1"s}, + {MqttPort, m_port}, + {MqttTls, false}, + {AutoAvailable, false}, + {RealTime, false}}); + m_client = make_shared(m_agentTestHelper->m_ioContext, + opts, std::move(handler)); + } + + bool startClient() + { + bool started = m_client && m_client->start(); + if (started) + { + return waitFor(1s, [this]() { return m_client->isConnected(); }); + } + return started; + } + + void addAdapter(ConfigOptions options = ConfigOptions {}) + { + m_agentTestHelper->addAdapter(options, "localhost", 0, + m_agentTestHelper->m_agent->getDefaultDevice()->getName()); + } + + void stopAgent() + { + const auto agent = m_agentTestHelper->getAgent(); + if (agent) + { + m_agentTestHelper->getAgent()->stop(); + m_agentTestHelper->m_ioContext.run_for(100ms); + } + } + + void stopClient() + { + if (m_client) + { + m_client->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_client.reset(); + } + } + + void stopServer() + { + if (m_server) + { + m_server->stop(); + m_agentTestHelper->m_ioContext.run_for(500ms); + m_server.reset(); + } + } +}; + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_use_flat_topic_structure) +{ + bool gotMessage = false; + std::string receivedTopic; + + createServer({}); + startServer(); + + auto client = mqtt::make_async_client(m_agentTestHelper->m_ioContext.get(), "localhost", m_port); + + client->set_client_id("test_client"); + client->set_clean_session(true); + client->set_keep_alive_sec(30); + + bool subscribed = false; + client->set_connack_handler( + [client, &subscribed](bool sp, mqtt::connect_return_code connack_return_code) { + if (connack_return_code == mqtt::connect_return_code::accepted) + { + auto pid = client->acquire_unique_packet_id(); + client->async_subscribe(pid, "MTConnect/#", MQTT_NS::qos::at_least_once, + [](MQTT_NS::error_code ec) { EXPECT_FALSE(ec); }); + } + return true; + }); + + client->set_suback_handler( + [&subscribed](std::uint16_t packet_id, std::vector results) { + subscribed = true; + return true; + }); + + client->set_publish_handler([&gotMessage, &receivedTopic](mqtt::optional packet_id, + mqtt::publish_options pubopts, + mqtt::buffer topic_name, + mqtt::buffer contents) { + receivedTopic = topic_name; + std::cout << "Received topic: " << topic_name << " payload: " << contents << std::endl; + if (topic_name.find("MTConnect/Devices/") == 0 && + topic_name.find("/Observations/") != std::string::npos) + { + gotMessage = true; + } + return true; + }); + + client->async_connect([](mqtt::error_code ec) { ASSERT_FALSE(ec) << "Cannot connect"; }); + + // Wait for subscription to complete + ASSERT_TRUE(waitFor(10s, [&subscribed]() { return subscribed; })) + << "Subscription never completed"; + + // Create the agent and wait for its MQTT sink to connect. + createAgent(); + auto sink = m_agentTestHelper->getMqttEntitySink(); + ASSERT_TRUE(sink != nullptr); + ASSERT_TRUE(waitFor(10s, [&sink]() { return sink->isConnected(); })) + << "MqttEntitySink failed to connect to broker"; + + gotMessage = false; + receivedTopic.clear(); + + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })) + << "Timeout waiting for adapter data. Last topic: " << receivedTopic; + + EXPECT_TRUE(receivedTopic.find("MTConnect/Devices/") == 0) + << "Topic doesn't start with MTConnect/Devices/: " << receivedTopic; + EXPECT_TRUE(receivedTopic.find("/Observations/") != std::string::npos) + << "Topic doesn't contain /Observations/: " << receivedTopic; + EXPECT_EQ("MTConnect/Devices/000/Observations/p3", receivedTopic) + << "Topic does not match expected format: " << receivedTopic; + + client->async_disconnect(); + + stopAgent(); + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_entity_json_format) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotMessage = false; + json receivedJson; + + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + // Only set gotMessage if result matches expected value + if (receivedJson.contains("result") && receivedJson["result"] == "204") + { + gotMessage = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + // Ensure subscription is active before sending data + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + // Wait a bit to ensure sink and client are ready + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); + + // Verify Entity JSON format + EXPECT_TRUE(receivedJson.contains("dataItemId")); + EXPECT_TRUE(receivedJson.contains("timestamp")); + EXPECT_TRUE(receivedJson.contains("result")); + EXPECT_TRUE(receivedJson.contains("sequence")); + EXPECT_TRUE(receivedJson.contains("type")); + EXPECT_TRUE(receivedJson.contains("category")); + + EXPECT_EQ("204", receivedJson["result"].get()); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_include_optional_fields) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotMessage = false; + json receivedJson; + + handler->m_receive = [&gotMessage, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + json js = json::parse(payload); + if (js.contains("name")) + { + gotMessage = true; + } + receivedJson = js; + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|line|204"); + ASSERT_TRUE(waitFor(10s, [&gotMessage]() { return gotMessage; })); + + // Verify optional fields + if (receivedJson.contains("name")) + { + EXPECT_TRUE(receivedJson["name"].is_string()); + } + if (receivedJson.contains("subType")) + { + EXPECT_TRUE(receivedJson["subType"].is_string()); + } + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_samples) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotSample = false; + json receivedJson; + + handler->m_receive = [&gotSample, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + if (receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "z2" && + receivedJson.contains("category") && receivedJson["category"] == "SAMPLE" && + receivedJson.contains("result") && receivedJson["result"] == "204.000000") + { + gotSample = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|z2|204"); + ASSERT_TRUE(waitFor(10s, [&gotSample]() { return gotSample; })); + + EXPECT_EQ("SAMPLE", receivedJson["category"].get()); + EXPECT_EQ("204.000000", receivedJson["result"].get()); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_events) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotEvent = false; + json receivedJson; + + handler->m_receive = [&gotEvent, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + if (receivedJson.contains("category") && receivedJson["category"] == "EVENT" && + receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "p4" && + receivedJson.contains("result") && receivedJson["result"] == "READY") + { + gotEvent = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData("2021-02-01T12:00:00Z|p4|READY"); + ASSERT_TRUE(waitFor(10s, [&gotEvent]() { return gotEvent; })); + + EXPECT_EQ("EVENT", receivedJson["category"].get()); + EXPECT_EQ("READY", receivedJson["result"].get()); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_conditions) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotCondition = false; + json receivedJson; + + handler->m_receive = [&gotCondition, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + try + { + receivedJson = json::parse(payload); + if (receivedJson.contains("category") && receivedJson["category"] == "CONDITION" && + receivedJson.contains("dataItemId") && receivedJson["dataItemId"] == "zlc" && + receivedJson.contains("level") && receivedJson["level"] == "FAULT") + { + gotCondition = true; + } + } + catch (const std::exception& e) + { + LOG(error) << "Failed to parse JSON: " << e.what(); + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + m_agentTestHelper->m_ioContext.run_for(200ms); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + m_agentTestHelper->m_ioContext.run_for(200ms); + m_agentTestHelper->m_adapter->processData( + "2021-02-01T12:00:00Z|zlc|FAULT|1234|LOW|Hydraulic pressure low"); + ASSERT_TRUE(waitFor(10s, [&gotCondition]() { return gotCondition; })); + + EXPECT_EQ("CONDITION", receivedJson["category"].get()); + EXPECT_EQ("FAULT", receivedJson["level"].get()); + if (receivedJson.contains("nativeCode")) + { + EXPECT_EQ("1234", receivedJson["nativeCode"].get()); + } + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_availability) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotAvailable = false; + std::string availabilityValue; + + handler->m_receive = [&gotAvailable, &availabilityValue](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + if (topic.find("Availability") != std::string::npos) + { + availabilityValue = payload; + gotAvailable = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Probe/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&gotAvailable]() { return gotAvailable; })); + EXPECT_EQ("AVAILABLE", availabilityValue); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_initial_observations) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + int messageCount = 0; + + handler->m_receive = [&messageCount](std::shared_ptr client, const std::string& topic, + const std::string& payload) { + if (topic.find("/Observations/") != std::string::npos) + { + messageCount++; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + ASSERT_TRUE(waitFor(10s, [&messageCount]() { return messageCount > 0; })); + EXPECT_GT(messageCount, 0); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_handle_unavailable) +{ + ConfigOptions options; + + createServer({}); + startServer(); + + auto handler = make_unique(); + bool gotUnavailable = false; + json receivedJson; + + handler->m_receive = [&gotUnavailable, &receivedJson](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + + json j = json::parse(payload); + if (j["category"] != "CONDITION" && topic.starts_with("MTConnect/Devices/000/Observations/")) + { + if (j["result"] == "UNAVAILABLE") + { + gotUnavailable = true; + } + receivedJson = j; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink->isConnected(); })); + + // Initial observations should include UNAVAILABLE values + // The issue is the initial values will be received in a random order and the conditions have a + // level instead of a result. If in the interviening time a condition is received, the json may + // not be correct. + ASSERT_TRUE(waitFor(10s, [&gotUnavailable]() { return gotUnavailable; })); + EXPECT_EQ("UNAVAILABLE", receivedJson["result"].get()); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_authentication) +{ + ConfigOptions options; + options["MqttUserName"] = "mtconnect"; + options["MqttPassword"] = "password123"; + options["MqttClientId"] = "auth-client"; + + createServer({}); + startServer(); + + bool connected = false; + auto handler = std::make_unique(); + handler->m_connected = [&connected](std::shared_ptr) { connected = true; }; + + createClient(options, std::move(handler)); + startClient(); + + auto start = std::chrono::steady_clock::now(); + while (!connected && std::chrono::steady_clock::now() - start < std::chrono::seconds(5)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + ASSERT_TRUE(connected) << "MQTT client did not connect with authentication"; +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_qos_levels) +{ + ConfigOptions options; + options["MqttQOS"] = "exactly_once"; + options["MqttClientId"] = "qos-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool received = false; + handler->m_receive = [&received](std::shared_ptr client, const std::string& topic, + const std::string& payload) { received = true; }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&received]() { return received; })); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_support_retained_messages) +{ + ConfigOptions options; + options["MqttRetain"] = "true"; + options["MqttClientId"] = "retain-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool retainedReceived = false; + std::string retainedPayload; + handler->m_receive = [&retainedReceived, &retainedPayload](std::shared_ptr client, + const std::string& topic, + const std::string& payload) { + retainedReceived = true; + retainedPayload = payload; + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Devices/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + ASSERT_TRUE(waitFor(5s, [&retainedReceived]() { return retainedReceived; })); + ASSERT_FALSE(retainedPayload.empty()); + + stopClient(); +} + +TEST_F(MqttEntitySinkTest, mqtt_entity_sink_should_publish_last_will) +{ + ConfigOptions options; + options["MqttLastWillTopic"] = "MTConnect/Probe/J55-411045-cpp/Availability"; + options["MqttClientId"] = "lastwill-client"; + + createServer({}); + startServer(); + + auto handler = std::make_unique(); + bool lastWillReceived = false; + handler->m_receive = [&lastWillReceived](std::shared_ptr client, + const std::string& topic, const std::string& payload) { + if (topic.find("Availability") != std::string::npos) + { + lastWillReceived = true; + } + }; + + createClient(options, std::move(handler)); + ASSERT_TRUE(startClient()); + m_client->subscribe("MTConnect/Probe/#"); + + createAgent(); + auto sink = m_agentTestHelper->getAgent()->findSink("MqttEntitySink"); + auto mqttSink = std::dynamic_pointer_cast(sink); + ASSERT_TRUE(waitFor(10s, [&mqttSink]() { return mqttSink && mqttSink->isConnected(); })); + + m_client->stop(); + ASSERT_TRUE(waitFor(5s, [&lastWillReceived]() { return lastWillReceived; })); + + stopClient(); +}