Skip to content
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,10 @@ Sinks {

*Default*: All data items

## MQTT Entity Sink Documentation

For detailed configuration, usage, and message format for the MQTT Entity Sink, see: [docs: MTConnect MQTT Entity Sink](src/mtconnect/sink/mqtt_entity_sink/README.md)

### Adapter Configuration Items ###

* `Adapters` - Contains a list of device blocks. If there are no Adapters
Expand Down
9 changes: 9 additions & 0 deletions agent_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,14 @@ set(AGENT_SOURCES

"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"

# src/sink/mqtt_entity_sink HEADER_FILE_ONLY

"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.hpp"

#src/sink/mqtt_entity_sink SOURCE_FILES_ONLY

"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp"

# src/sink/rest_sink HEADER_FILE_ONLY

"${SOURCE_DIR}/sink/rest_sink/cached_file.hpp"
Expand Down Expand Up @@ -341,6 +349,7 @@ if(MSVC)
# The modules including Beast required the /bigobj option in Windows
set_property(SOURCE
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
"${SOURCE_DIR}/sink/mqtt_entity_sink/mqtt_entity_sink.cpp"
"${SOURCE_DIR}/sink/rest_sink/session_impl.cpp"
"${SOURCE_DIR}/source/adapter/mqtt/mqtt_adapter.cpp"
"${SOURCE_DIR}/source/adapter/agent_adapter/agent_adapter.cpp"
Expand Down
67 changes: 67 additions & 0 deletions schemas/cfg.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,73 @@
"enum":["at_least_once", "at_most_once", "exactly_once"]
}
}
},
"MqttEntitySink": {
"type": "object",
"properties": {
"MqttHost": {
"type": "string",
"description": "IP Address or name of the MQTT Broker",
"default": "127.0.0.1"
},
"MqttPort": {
"type": "integer",
"description": "Port number of MQTT Broker",
"default": 1883
},
"MqttTls": {
"type": "boolean",
"description": "TLS Certificate for secure connection to the MQTT Broker",
"default": false
},
"MqttUserName": {
"type": "string",
"description": "Username for MQTT authentication"
},
"MqttPassword": {
"type": "string",
"description": "Password for MQTT authentication"
},
"MqttClientId": {
"type": "string",
"description": "MQTT client identifier"
},
"MqttQOS": {
"type": "string",
"description": "The quality of service level for the MQTT connection. Options are: at_least_once, at_most_once, and exactly_once",
"default": "at_least_once",
"enum": [
"at_least_once",
"at_most_once",
"exactly_once"
]
},
"MqttRetain": {
"type": "boolean",
"description": "Retain the last message sent to the broker",
"default": false
},
"ObservationTopicPrefix": {
"type": "string",
"description": "Prefix for the Observations topic",
"default": "MTConnect/Devices/[device]/Observations"
},
"DeviceTopicPrefix": {
"type": "string",
"description": "Prefix for the Device topic",
"default": "MTConnect/Probe/[device]"
},
"AssetTopicPrefix": {
"type": "string",
"description": "Prefix for the Asset topic",
"default": "MTConnect/Asset/[device]"
},
"MqttLastWillTopic": {
"type": "string",
"description": "The topic used for the last will and testament for an agent",
"default": "MTConnect/Probe/[device]/Availability"
}
}
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions src/mtconnect/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/device_model/device.hpp"
#include "mtconnect/printer/xml_printer.hpp"
#include "mtconnect/sink/mqtt_entity_sink/mqtt_entity_sink.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp"
#include "mtconnect/sink/rest_sink/rest_service.hpp"
#include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp"
Expand Down Expand Up @@ -111,6 +112,7 @@ namespace mtconnect::configuration {

sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory);
sink::rest_sink::RestService::registerFactory(m_sinkFactory);
sink::mqtt_entity_sink::MqttEntitySink::registerFactory(m_sinkFactory);
adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory);
adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory);
adapter::agent_adapter::AgentAdapter::registerFactory(m_sourceFactory);
Expand Down
3 changes: 3 additions & 0 deletions src/mtconnect/configuration/config_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ namespace mtconnect {
DECLARE_CONFIGURATION(MqttMaxTopicDepth);
DECLARE_CONFIGURATION(MqttLastWillTopic);
DECLARE_CONFIGURATION(MqttXPath);
DECLARE_CONFIGURATION(ObservationTopicPrefix);
DECLARE_CONFIGURATION(DeviceTopicPrefix);
DECLARE_CONFIGURATION(AssetTopicPrefix);
///@}

/// @name Adapter Configuration
Expand Down
11 changes: 6 additions & 5 deletions src/mtconnect/mqtt/mqtt_server_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <inttypes.h>
#include <mqtt/async_client.hpp>
#include <mqtt/broker/topic_filter.hpp>
#include <mqtt/setup_log.hpp>
#include <mqtt_server_cpp.hpp>

Expand Down Expand Up @@ -217,12 +218,12 @@ namespace mtconnect {
LOG(debug) << "Server topic_name: " << topic_name;
LOG(debug) << "Server contents: " << contents;

auto const &idx = m_subs.get<tag_topic>();
auto r = idx.equal_range(topic_name);
for (; r.first != r.second; ++r.first)
for (const auto &sub : m_subs)
{
r.first->con->publish(topic_name, contents,
std::min(r.first->qos_value, pubopts.get_qos()));
if (mqtt::broker::compare_topic_filter(sub.topic, topic_name))
{
sub.con->publish(topic_name, contents, std::min(sub.qos_value, pubopts.get_qos()));
}
}

return true;
Expand Down
Loading