Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cmake_minimum_required(VERSION 3.25)
set(SDK_TARGET_NAMESPACE daq)
set(REPO_NAME mqtt_module)
set(REPO_OPTION_PREFIX MQTT_MODULE)
set(CMAKE_FOLDER "${CMAKE_FOLDER}/${REPO_NAME}")

project (${REPO_NAME} CXX)

Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
- A set of examples and *gtests* for verifying functionality.

### Key components
1) **MQTT device**:
- **Where**: *mqtt_streaming_client_module/src/mqtt_streaming_device_impl.cpp, include/mqtt_streaming_client_module/...*
- **Purpose**: Represents the MQTT broker as an openDAQ device - the connection point through which function blocks are created.
1) **MQTT root Function Block (@rootMqttFb)**:
Comment thread
NikolaiShipilov marked this conversation as resolved.
- **Where**: *mqtt_streaming_module/src/mqtt_root_fb_impl.cpp, include/mqtt_streaming_module/...*
- **Purpose**: Represents the MQTT broker as an openDAQ function block - the connection point through which function blocks are created.
- **Main properties:**
- *MqttBrokerAddress* (string) - MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
- *MqttBrokerPort* (integer) - Port number for the MQTT broker connection. By default, it is set to *1883*.
- *MqttUsername* (string) — Username for MQTT broker authentication. By default, it is empty.
- *MqttPassword* (string) — Password for MQTT broker authentication. By default, it is empty.
- *ConnectTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
2) **Publisher MQTT Function Block (@publisherMqttFb)**:
- **Where**: *include/mqtt_streaming_client_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
- **Where**: *include/mqtt_streaming_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*

Expand Down Expand Up @@ -52,13 +52,13 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul

3) **Raw MQTT Function Block (@rawMqttFb)**:

- **Where**: *include/mqtt_streaming_client_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
- **Where**: *include/mqtt_streaming_module/mqtt_raw_receiver_fb_impl.h, src/mqtt_raw_receiver_fb_impl.cpp*
- **Purpose**: Subscribes to raw MQTT messages and converts them into openDAQ signals (binary data) without any parsing — suitable for binary/unstructured messages or simple numeric values.
- **Main properties**:
- *SignalList* (list of strings) — List of MQTT topics to subscribe to for receiving raw binary data.

4) **JSON MQTT Function Block (@jsonMqttFb)**:
- **Where**: *include/mqtt_streaming_client_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
- **Where**: *include/mqtt_streaming_module/mqtt_json_receiver_fb_impl.h, src/mqtt_json_receiver_fb_impl.cpp*
- **Purpose**: Subscribes to MQTT topics, extracts values and timestamps from MQTT JSON messages, and converts them into openDAQ signal data samples.
- **Main properties**:
- *SignalList* (string) — **JSON configuration string** that defines the list of MQTT topics and the corresponding signals to subscribe to. A typical string structure:
Expand Down Expand Up @@ -164,15 +164,15 @@ cmake --build .
## Examples

There are 3 example C++ application:
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT device* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
- **custom-mqtt-sub** - demonstrates how to work with the *JSON MQTT FB*. The application creates an *MQTT root FB* and a *JSON MQTT FB* to receive JSON MQTT messages, parse them, and create openDAQ signals to send the parsed data. The application also creates *packet readers* for all FB signals and prints the samples to standard output. The *SignalList* property of the JSON MQTT FB is set to the value read from a file whose path is provided as a command-line argument when the application starts (see the **Key components** section). Usage:
```bash
./custom-mqtt-sub --address broker.emqx.io examples/custom-mqtt-sub/public-example0.json
```
- **raw-mqtt-sub** - demonstrates how to work with the raw MQTT FB. The application creates an MQTT device and a raw MQTT FB to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The SignalList property of the raw MQTT FB is filled from the application arguments. Usage:
- **raw-mqtt-sub** - demonstrates how to work with the *raw MQTT FB*. The application creates an *MQTT root FB* and a *raw MQTT FB* to receive MQTT messages and create openDAQ signals to send the data as binary packets. The application also creates packet readers for all FB signals and prints the binary packets as strings to standard output. The *SignalList* property of the raw MQTT FB is filled from the application arguments. Usage:
```bash
./raw-mqtt-sub --address broker.emqx.io /agvstate /mirip/UNet3AC2/sensor/data
```
- **ref-dev-mqtt-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT device*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
- **ref-dev-mqtt-pub** - demonstrates how to work with the *publisher MQTT FB*. The application creates an *openDAQ ref-device* with four channels, an *MQTT root FB*, and a *publisher MQTT FB* to publish JSON MQTT messages with the channels’ data. The properties of the *publisher MQTT FB* are set according to the selected mode, which can be specified via the *--mode* option. Posible values are:
- 0 - One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample;
- 1 - One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples);
- 2 - One MQTT message for several signals (from 1 to N) / one message per sample for each signal / one topic for all signals / separate timestamps for each signal;
Expand Down
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
cmake_minimum_required(VERSION 3.16)

set(CMAKE_FOLDER "${CMAKE_FOLDER}/examples")

if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
add_subdirectory(raw-mqtt-sub)
add_subdirectory(custom-mqtt-sub)
Expand Down
15 changes: 9 additions & 6 deletions examples/custom-mqtt-sub/src/custom-mqtt-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,24 @@ int main(int argc, char* argv[])
return appConfig.error;
}

// Create OpenDAQ instance and add MQTT broker device
// Create OpenDAQ instance and add MQTT broker FB
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
const std::string rootFbName = "@rootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();

const std::string fbName = "@jsonMqttFb";
std::cout << "Try to add the " << fbName << std::endl;

// Read JSON function block configuration from file and fill out the function block config
const std::string jsonConfig = readFileToString(appConfig.configFilePath);
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
auto config = availableFbs.get(fbName).createDefaultConfig();
config.setPropertyValue("SignalList", jsonConfig);

// Add the JSON function block to the broker device
daq::FunctionBlockPtr jsonFb = brokerDevice.addFunctionBlock(fbName, config);
// Add the JSON function block to the broker FB
daq::FunctionBlockPtr jsonFb = brokerFB.addFunctionBlock(fbName, config);

// Create packet readers for all signals
const auto signals = jsonFb.getSignals();
Expand Down
15 changes: 9 additions & 6 deletions examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,28 @@ int main(int argc, char* argv[])
return appConfig.error;
}

// Create OpenDAQ instance and add MQTT broker device
// Create OpenDAQ instance and add MQTT broker FB
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
const std::string rootFbName = "@rootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();

const std::string fbName = "@rawMqttFb";
std::cout << "Try to add the " << fbName << std::endl;

// Create RAW function block configuration
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
auto config = availableFbs.get(fbName).createDefaultConfig();
auto topicList = List<IString>();
for (auto& topic : appConfig.topics)
{
addToList(topicList, std::move(topic));
}
config.setPropertyValue("SignalList", topicList);

// Add the RAW function block to the broker device
daq::FunctionBlockPtr rawFb = brokerDevice.addFunctionBlock(fbName, config);
// Add the RAW function block to the broker FB
daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config);

// Create packet readers for all signals
const auto signals = rawFb.getSignals();
Expand Down
11 changes: 7 additions & 4 deletions examples/ref-dev-mqtt-pub/src/ref-dev-mqtt-pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ int main(int argc, char* argv[])
channels[3].setPropertyValue("Frequency", 20);

// Create and configure MQTT server
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();
const std::string rootFbName = "@rootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
const std::string fbName = "@publisherMqttFb";
std::cout << "Try to add the " << fbName << std::endl;

auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
auto config = availableFbs.get(fbName).createDefaultConfig();
config.setPropertyValue("MqttQoS", 1);
config.setPropertyValue("ReaderPeriod", 20);
config.setPropertyValue("UseSignalNames", True);
Expand Down Expand Up @@ -126,7 +129,7 @@ int main(int argc, char* argv[])


// Add the publisher function block to the broker device
daq::FunctionBlockPtr fb = brokerDevice.addFunctionBlock(fbName, config);
daq::FunctionBlockPtr fb = brokerFB.addFunctionBlock(fbName, config);
const auto signals = refDevice.getSignals(search::Recursive(search::Any()));
for (const auto& s : signals)
{
Expand Down
2 changes: 1 addition & 1 deletion external/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
set(CMAKE_FOLDER external)
set(CMAKE_FOLDER "${CMAKE_FOLDER}/external")
list(APPEND CMAKE_MESSAGE_CONTEXT external)

if (${CMAKE_SOURCE_DIR} STREQUAL ${CMAKE_BINARY_DIR})
Expand Down
3 changes: 2 additions & 1 deletion external/mqtt/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ set(PAHO_BUILD_SHARED OFF CACHE BOOL "Build dynamic paho library" FORCE)
set(CMAKE_POSITION_INDEPENDENT_CODE ${PAHO_BUILD_STATIC} CACHE BOOL "" FORCE)
set(PAHO_ENABLE_TESTING ON CACHE BOOL "" FORCE)

# Now add the project
set(CMAKE_FOLDER "${CMAKE_FOLDER}/mqtt")

add_subdirectory(${paho_mqtt_c_SOURCE_DIR} ${paho_mqtt_c_BINARY_DIR})

# Apply WIN32_LEAN_AND_MEAN to the Paho MQTT C library as well
Expand Down
26 changes: 8 additions & 18 deletions mqtt_streaming_module/include/mqtt_streaming_module/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,15 @@

BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

static const char* DaqMqttDeviceTypeId = "OpenDAQMQTTStreaming";
static const char* DaqMqttProtocolId = "OpenDAQMQTTStreaming";
static const char* DaqMqttDevicePrefix = "daq.mqtt";
static const char* MqttScheme = "mqtt";

static const char* MODULE_NAME = "OpenDAQMQTTClientModule";
static const char* MODULE_ID = "OpenDAQMQTTClientModule";
static const char* SHORT_MODULE_NAME = "MQTTClient";
static const char* PROTOCOL_NAME = "OpenDAQMQTT";
static const char* CONNECTION_TYPE = "TCP/IP";
static const char* MODULE_NAME = "OpenDaqMqttModule";
static const char* MODULE_ID = "OpenDaqMqttModule";
static const char* SHORT_MODULE_NAME = "MqttModule";

static constexpr const char* DEFAULT_BROKER_ADDRESS = "127.0.0.1";
static constexpr uint16_t DEFAULT_PORT = 1883;
static constexpr const char* DEFAULT_USERNAME = "";
static constexpr const char* DEFAULT_PASSWORD = "";
static constexpr uint32_t DEFAULT_INIT_TIMEOUT = 3000; // ms
static constexpr uint32_t DEFAULT_DISCOVERY_TIMEOUT = 1000; // ms

static constexpr uint32_t DEFAULT_PUB_READ_PERIOD = 20; // ms
static constexpr uint32_t DEFAULT_PUB_QOS = 1;
Expand All @@ -31,7 +23,6 @@ static constexpr const char* PROPERTY_NAME_MQTT_BROKER_PORT = "MqttBrokerPort";
static constexpr const char* PROPERTY_NAME_MQTT_USERNAME = "MqttUsername";
static constexpr const char* PROPERTY_NAME_MQTT_PASSWORD = "MqttPassword";
static constexpr const char* PROPERTY_NAME_CONNECT_TIMEOUT = "ConnectTimeout";
static constexpr const char* PROPERTY_NAME_DISCOVERY_TIMEOUT = "DiscoveryTimeout";
static constexpr const char* PROPERTY_NAME_SIGNAL_LIST = "SignalList";

static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode";
Expand All @@ -42,16 +33,15 @@ static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES_PACK_SIZE = "GroupVa
static constexpr const char* PROPERTY_NAME_PUB_QOS = "MqttQoS";
static constexpr const char* PROPERTY_NAME_PUB_READ_PERIOD = "ReaderPeriod";



static constexpr const char* RAW_FB_NAME = "@rawMqttFb";
static constexpr const char* JSON_FB_NAME = "@jsonMqttFb";
static constexpr const char* PUB_FB_NAME = "@publisherMqttFb";
static constexpr const char* ROOT_FB_NAME = "@rootMqttFb";

static const char* TOPIC_ALL_SIGNALS = "openDAQ/+/$signals";
static const char* MQTT_LOCAL_ROOT_FB_ID_PREFIX = "rootMqttFb";
static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";

static const char* MQTT_LOCAL_DEVICE_ID_PREFIX = "MqttDevice";
static const char* MQTT_DEVICE_NAME = "MqttStreamingClientPseudoDevice";

static const char* MQTT_LOCAL_PUB_FB_ID_PREFIX = "publisherMqttFb";
static const char* MQTT_ROOT_FB_CON_STATUS_TYPE = "BrokerConnectionStatusType";

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,54 @@
#include "MqttSettings.h"
#include <future>
#include <mqtt_streaming_module/common.h>
#include <opendaq/device_impl.h>
#include <opendaq/function_block_impl.h>
#include <opendaq/streaming_ptr.h>
#include "MqttDataWrapper.h"


BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

class MqttStreamingDeviceImpl : public Device
class MqttRootFbImpl : public FunctionBlock
{
enum class ConnectionStatus : EnumType
{
Connected = 0,
Reconnecting,
Disconnected
};

public:
explicit MqttStreamingDeviceImpl(const ContextPtr& ctx,
explicit MqttRootFbImpl(const ContextPtr& ctx,
const ComponentPtr& parent,
const PropertyObjectPtr& config);

static FunctionBlockTypePtr CreateType();

protected:
static std::atomic<int> localIndex;
static std::string getLocalId();
Comment thread
NikolaiShipilov marked this conversation as resolved.
static std::vector<std::pair<MqttRootFbImpl::ConnectionStatus, std::string>> connectionStatusMap;

void removed() override;
DeviceInfoPtr onGetInfo() override;

bool allowAddFunctionBlocksFromModules() override
{
return true;
};
DictPtr<IString, IFunctionBlockType> onGetAvailableFunctionBlockTypes() override;
FunctionBlockPtr onAddFunctionBlock(const StringPtr& typeId, const PropertyObjectPtr& config) override;

void initBaseFunctionalBlocks();
void initMqttSubscriber();
void buildFunctionBlockTypes();
void initConnectionStatus();
void initProperties(const PropertyObjectPtr& config);
void readProperties();
bool waitForConnection(const int timeoutMs);
void receiveSignalTopics(const int timeoutMs);
void onSignalsMessage(const mqtt::MqttAsyncClient& subscriber, mqtt::MqttMessage& msg);
void setConnectionStatus(const ConnectionStatus status, std::string message = "");

DictObjectPtr<IDict, IString, IFunctionBlockType> fbTypes;
DictObjectPtr<IDict, IString, IFunctionBlockType> baseFbTypes;

StringPtr connectionString;
EnumerationPtr connectionStatus;

std::shared_ptr<mqtt::MqttAsyncClient> subscriber;
Mqtt::Utils::Settings::MqttConnectionSettings connectionSettings;
int connectTimeout;

std::promise<bool> connectedPromise;
std::future<bool> connectedFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,14 @@ class MqttStreamingModule final : public Module
public:
MqttStreamingModule(ContextPtr context);

ListPtr<IDeviceInfo> onGetAvailableDevices() override;
DictPtr<IString, IDeviceType> onGetAvailableDeviceTypes() override;
DevicePtr onCreateDevice(const StringPtr& connectionString,
const ComponentPtr& parent,
const PropertyObjectPtr& config) override;
bool acceptsConnectionParameters(const StringPtr& connectionString, const PropertyObjectPtr& config);
Bool onCompleteServerCapability(const ServerCapabilityPtr& source, const ServerCapabilityConfigPtr& target) override; // ???
DictPtr<IString, IFunctionBlockType> onGetAvailableFunctionBlockTypes() override;
FunctionBlockPtr onCreateFunctionBlock(const StringPtr& id,
const ComponentPtr& parent,
const StringPtr& localId,
const PropertyObjectPtr& config) override;

private:
void extractConnectionParams(const StringPtr& connectionString, const PropertyObjectPtr& config, std::string& hostType);
static DeviceTypePtr createDeviceType();
static PropertyObjectPtr createDefaultConfig();
static PropertyObjectPtr populateDefaultConfig(const PropertyObjectPtr& config);

std::mutex sync;
static FunctionBlockTypePtr createFbType();
};

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Loading