Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b2ff1a6
mqtt client implementation
viacheslauK Sep 22, 2025
4e6e14a
mqtt: changes to correctly create an MQTT FB with default config
viacheslauK Sep 22, 2025
6130110
mqtt: ref-dev-mqtt-subscriber updating
viacheslauK Sep 22, 2025
38b174d
mqtt: openDAQ dependence updating
viacheslauK Sep 22, 2025
e8dc4bd
mqtt: reworking of mutex locking in the MQTT wrapper
viacheslauK Sep 22, 2025
bf09af7
mqtt: MQTT wrapper refactoring
viacheslauK Sep 23, 2025
eec9798
module versions = openDAQ version
viacheslauK Sep 23, 2025
ff68775
mqtt client: mqtt clientId = device globalId
viacheslauK Sep 23, 2025
8146a27
mqtt client: tests
viacheslauK Sep 23, 2025
0b63a11
mqtt server: moving constants to a separate file; removins unused pro…
viacheslauK Sep 23, 2025
1df9bb7
mqtt server: tests
viacheslauK Sep 23, 2025
79b2316
mqtt: MQTT wrapper refactoring
viacheslauK Sep 24, 2025
4c837e8
mqtt: wrapper tests
viacheslauK Sep 24, 2025
c2322ad
mqtt: nullptr checks for wrappers
viacheslauK Sep 25, 2025
d89a8de
mqtt: publisher/subscriber wrapper tests
viacheslauK Sep 25, 2025
25c7997
mqtt: reworking of mqtt wrappers; MqttAsyncPublisher/MqttAsyncSubscri…
viacheslauK Sep 25, 2025
a7b6cc3
mqtt: deleta move and copy constructor/operator= of MqttAsyncClient
viacheslauK Sep 25, 2025
d38f2a5
mqtt: renaming
viacheslauK Sep 25, 2025
43d0d1f
mqtt: MqttAsyncClient disconnection improving; tests
viacheslauK Sep 25, 2025
93cc314
mqtt: only formatting
viacheslauK Sep 25, 2025
795a680
mqtt: MqttAsyncClient improving; tests
viacheslauK Sep 25, 2025
1c054eb
mqtt: MqttAsyncClient improving; tests
viacheslauK Sep 26, 2025
570fd6f
mqtt: improving client/server thread safety
viacheslauK Sep 29, 2025
878383b
mqtt: ref-dev-mqtt-pub example changes
viacheslauK Sep 30, 2025
2c919c6
mqtt: separate MqttDataWrapper class for serialize/deserialize MQTT p…
viacheslauK Sep 30, 2025
fe8d0de
mqtt: list of signals improving - adding Unit data
viacheslauK Sep 30, 2025
236e167
mqtt: improving serialization/deserialization of the "Unit" field of …
viacheslauK Oct 1, 2025
3220ca5
mqtt: --address arg for examples to set MQTT broker IP
viacheslauK Oct 6, 2025
c41ce8b
mqtt: README updating
viacheslauK Oct 6, 2025
39eadcc
mqtt: cleaning up
viacheslauK Oct 9, 2025
e8317c6
mqtt: OpenDAQ version changing
viacheslauK Oct 9, 2025
6c9bc91
mqtt: initing properties bug
viacheslauK Oct 14, 2025
8da4771
mqtt: logs
viacheslauK Oct 14, 2025
132b22a
mqtt: removed unused SLL options from MQTT wrapper
viacheslauK Oct 14, 2025
7aad38d
mqtt: boost dependencies
viacheslauK Oct 17, 2025
29323c0
mqtt: ssh -> https
viacheslauK Oct 17, 2025
7275434
mqtt: OPENDAQ_MQTT_ENABLE_TESTS cmake flag
viacheslauK Oct 17, 2025
3a54f56
mqtt: OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS cmake flag
viacheslauK Oct 17, 2025
bd5adc7
mqtt: cmake flags definition
viacheslauK Oct 20, 2025
ca8816f
mqtt: useless mutex lock removing
viacheslauK Oct 20, 2025
1823586
mqtt: formatting only
viacheslauK Oct 20, 2025
9ce2b9b
mqtt: pthread issue fixing
viacheslauK Oct 20, 2025
024af35
mqtt: enabling PAHO tests according to OPENDAQ_MQTT_ENABLE_TESTS
viacheslauK Oct 20, 2025
033c406
mqtt: disconnecting on device removing
viacheslauK Oct 21, 2025
3889cfa
mqtt: removing unused boost asio
viacheslauK Oct 21, 2025
dd0e070
mqtt: PAHO_ENABLE_TESTING is always ON
viacheslauK Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# CMakeList.txt : CMake project for OpenDAQHistorianFB, include source and define
# CMakeList.txt : CMake project for MQTTStreamingModule, include source and define
# project specific logic here.
#
set(CMAKE_POLICY_VERSION_MINIMUM 3.5)
Expand Down Expand Up @@ -32,13 +32,21 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")

option(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS "Enable building example applications" OFF)

if ((CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) AND NOT MSVC)
if (NOT WIN32)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread")
endif()
endif()

include(CommonUtils)
setup_repo(${REPO_OPTION_PREFIX})

if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
endif()

option(OPENDAQ_MQTT_ENABLE_TESTS "Enable module testing" OFF)
option(OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS "Enable example applications building" OFF)

find_package(OpenSSL REQUIRED)
if (OPENSSL_FOUND)
Expand All @@ -50,30 +58,9 @@ endif()
add_subdirectory(external)
add_subdirectory(mqtt_streaming_protocol)
add_subdirectory(mqtt_streaming_server_module)
add_subdirectory(mqtt_streaming_client_module)

if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
message(STATUS "Example applications have been enabled")
add_subdirectory(examples)
endif()



# Set CPack variables
set(CPACK_COMPONENTS_ALL RUNTIME)
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
set(CPACK_PACKAGE_VERSION ${PROJECT_VERSION})
set(CPACK_OUTPUT_FILE_PREFIX "${CMAKE_BINARY_DIR}/package")

# Set the CPack generator based on the platform
if (WIN32)
set(CPACK_GENERATOR "ZIP")
elseif (UNIX AND NOT APPLE)
cmake_host_system_information(RESULT DISTRO_ID QUERY DISTRIB_ID)
cmake_host_system_information(RESULT DISTRO_VERSION_ID QUERY DISTRIB_VERSION_ID)
set(CPACK_SYSTEM_NAME "${DISTRO_ID}${DISTRO_VERSION_ID}")
set(CPACK_GENERATOR "TGZ")
endif()

# Include CPack for packaging
include(CPack)
4 changes: 3 additions & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@
"name": "full",
"hidden": true,
"cacheVariables": {
"OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS": "true"
"OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS": "true",
"OPENDAQ_MQTT_ENABLE_TESTS": "true",
"OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS": "true"
}
},
{
Expand Down
94 changes: 4 additions & 90 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,102 +49,16 @@ cmake --build .

## Testing

There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *SimpleMQTTModule* functional blocks with each other and with third-party MQTT tools.
There are several example applications in the *"examples"* folder. These examples are based on OpenDAQ SDK and allow testing of *MQTTStreamingModule* client/server sides with each other and with third-party MQTT tools.

> ***Note:*** *Using the applications involves using a third-party broker. It must be started before example applications. See a **External MQTT tools** section for more details*

> ***Note:*** *The applications depend on **MQTTStreamingModule** and [**RefDeviceModule**](https://github.com/openDAQ/openDAQ/tree/main/examples/modules/ref_device_module).*

#### ref-dev-mqtt-pub

The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server. By default it uses the following *MQTTStreamingModule* server settings:
```
StreamingDataPollingPeriod: 20
MaxPacketReadCount: 1000
BrokerAddress: "127.0.0.1"
MqttUsername: ""
MqttPassword: ""
```

### External MQTT tools

It is suggested to use [***Eclipse Mosquitto***](https://github.com/eclipse-mosquitto/mosquitto) as a third-party MQTT tool set. It includes MQTT broker and MQTT publisher/subscriber clients.
Utilities could be installed on **Ubuntu**:

```shell
sudo apt install mosquitto mosquitto-clients
```
The *ref-dev-mqtt-pub* application is a console example which publishes *ref-device Signal* samples via the *MQTTStreamingModule* server.

The MQTT broker will be run automatically after installing. For simple testing run a subscriber with the following options:
#### ref-dev-mqtt-sub

```shell
mosquitto_sub -h 127.0.0.1 -t "openDAQ/#" -v
```
The subscriber will wait for incoming data and then print it. Then run a publisher with the following options:
```shell
mosquitto_pub -h 127.0.0.1 -t "openDAQ/publisher" -m '{"Input0":2, "Input1":1.2, "Input3":3.3}'
```
This command publishes a message and exits. From the subscriber's side you can see:

```shell
user@machine:$ mosquitto_sub -h 127.0.0.1 -t "openDAQ/publisher" -v
openDAQ/publisher {"Input0":2, "Input1":1.2, "Input3":3.3}
```
Now, you can test examples with 3rd-party tools. For example, run *ref-dev-mqtt-pub* and *mosquitto_sub* in different terminals with proper settings.
```shell
user@machine:$ ./ref-dev-mqtt-pub
[tid: 29784][2025-09-15 11:17:02.663] [ModuleManager] [info] Loaded module [v3.31.0 ReferenceDeviceModule] from "libref_device_module-64-3-debug.module.so".
[tid: 29784][2025-09-15 11:17:02.664] [ModuleManager] [info] DEV [daqref] Reference device: "Reference device"
[tid: 29784][2025-09-15 11:17:02.668] [ModuleManager] [info] Loaded module [v3.4.0 OpenDAQMqttStreamingServerModule] from "libmqtt_stream_srv_module-64-3-debug.module.so".
[tid: 29784][2025-09-15 11:17:02.670] [ModuleManager] [info] SRV [OpenDAQMQTT] openDAQ MQTT Streaming server: "Streams data over MQTT"
[tid: 29784][2025-09-15 11:17:03.196] [ReferenceDevice] [info] Properties: NumberOfChannels 2
[tid: 29784][2025-09-15 11:17:03.201] [/RefDev1/IO/AI/RefCh0] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
[tid: 29784][2025-09-15 11:17:03.201] [/RefDev1/IO/AI/RefCh0] [info] Properties: SampleRate 1000, ClientSideScaling false
[tid: 29784][2025-09-15 11:17:03.207] [/RefDev1/IO/AI/RefCh1] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
[tid: 29784][2025-09-15 11:17:03.207] [/RefDev1/IO/AI/RefCh1] [info] Properties: SampleRate 1000, ClientSideScaling false
[tid: 29784][2025-09-15 11:17:03.209] [ReferenceDevice] [info] Properties: AcquisitionLoopTime 20
[tid: 29784][2025-09-15 11:17:03.216] [Instance] [info] Root device set to daqref://device1
[tid: 29784][2025-09-15 11:17:03.231] [ReferenceDevice] [info] Properties: NumberOfChannels 2
[tid: 29784][2025-09-15 11:17:03.235] [/RefDev1/Dev/RefDev0/IO/AI/RefCh0] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
[tid: 29784][2025-09-15 11:17:03.235] [/RefDev1/Dev/RefDev0/IO/AI/RefCh0] [info] Properties: SampleRate 1000, ClientSideScaling false
[tid: 29784][2025-09-15 11:17:03.240] [/RefDev1/Dev/RefDev0/IO/AI/RefCh1] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
[tid: 29784][2025-09-15 11:17:03.240] [/RefDev1/Dev/RefDev0/IO/AI/RefCh1] [info] Properties: SampleRate 1000, ClientSideScaling false
[tid: 29784][2025-09-15 11:17:03.242] [ReferenceDevice] [info] Properties: AcquisitionLoopTime 20
[tid: 29784][2025-09-15 11:17:03.253] [/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel] [info] Properties: Waveform Sine, Frequency 10, DC 0, Amplitude 5, NoiseAmplitude 0, ConstantValue 2
[tid: 29784][2025-09-15 11:17:03.253] [/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel] [info] Properties: SampleRate 1000, ClientSideScaling false
[tid: 29784][2025-09-15 11:17:03.265] [OpenDAQMQTT] [info] MQTT: Trying to connect to MQTT broker (127.0.0.1)
[tid: 29784][2025-09-15 11:17:03.267] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/IO/AI/RefCh0/Sig/AI0;
[tid: 29784][2025-09-15 11:17:03.268] [OpenDAQMQTT] [info] Signal /RefDev1/IO/AI/RefCh0/Sig/AI0Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.268] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/IO/AI/RefCh1/Sig/AI1;
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/IO/AI/RefCh1/Sig/AI1Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/Sig/Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0;
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.269] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1;
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Adding the Signal to reader: /RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2;
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2Time doesn't has domain signal assigned, skipping
[tid: 29784][2025-09-15 11:17:03.270] [OpenDAQMQTT] [info] Signal /RefDev1/Dev/RefDev0/Sig/Time doesn't has domain signal assigned, skipping
[tid: 29812][2025-09-15 11:17:03.271] [OpenDAQMQTT] [info] Streaming-to-device read thread started
[tid: 29784][2025-09-15 11:17:03.271] [OpenDAQMQTT] [info] Added Component: /RefDev1/Srv/OpenDAQMQTT;
Press "enter" to exit the application...
[tid: 29811][2025-09-15 11:17:03.276] [OpenDAQMQTT] [info] MQTT: Connection established

```
In this case you can see messages on the *mosquitto_sub* side:

```shell
user@machine:$ mosquitto_sub -h 127.0.0.1 -t "openDAQ/#" -v
openDAQ/RefDev1/$signals ["openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0","openDAQ/RefDev1/IO/AI/RefCh1/Sig/AI1","openDAQ/RefDev1/Dev/RefDev0/IO/AI/RefCh0/Sig/AI0","openDAQ/RefDev1/Dev/RefDev0/IO/AI/RefCh1/Sig/AI1","openDAQ/RefDev1/Dev/RefDev0/IO/AI/ProtectedChannel/Sig/AI2"]
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":1.243449435824274,"timestamp":1757928009227270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.9369065729286229,"timestamp":1757928009228270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.6266661678215204,"timestamp":1757928009229270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":0.3139525976465657,"timestamp":1757928009230270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.6081226496766366e-15,"timestamp":1757928009231270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.31395259764656677,"timestamp":1757928009232270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.6266661678215214,"timestamp":1757928009233270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-0.9369065729286239,"timestamp":1757928009234270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.2434494358242752,"timestamp":1757928009235270}
openDAQ/RefDev1/IO/AI/RefCh0/Sig/AI0 {"value":-1.5450849718747386,"timestamp":1757928009236270}
<...>
```
The *ref-dev-mqtt-sub* application is a console example which subscribes to an available MQTT openDAQ device and prints signal samples.
5 changes: 4 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cmake_minimum_required(VERSION 3.16)

add_subdirectory(ref-dev-mqtt-pub)
if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
add_subdirectory(ref-dev-mqtt-pub)
add_subdirectory(ref-dev-mqtt-sub)
endif()
103 changes: 103 additions & 0 deletions examples/InputArgs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#pragma once
#include <string>
#include <vector>
#include <unordered_map>
#include <algorithm>
#include <iostream>

class InputArgs
{
public:
void addArg(const std::string& name, const std::string& description, bool hasValue = false)
{
argDescriptions[name] = {description, hasValue};
}

void parse(int argc, char* argv[])
{
parsedArgs.clear();
argValues.clear();
positionalArgs.clear();

for (int i = 1; i < argc; ++i)
{
std::string arg = argv[i];
if (arg.rfind("--", 0) == 0)
{
auto eqPos = arg.find('=');
if (eqPos != std::string::npos)
{
std::string key = arg.substr(0, eqPos);
std::string value = arg.substr(eqPos + 1);
argValues[key] = value;
parsedArgs.push_back(key);
}
else if (i + 1 < argc && argDescriptions[arg].hasValue)
{
argValues[arg] = argv[i + 1];
parsedArgs.push_back(arg);
++i;
}
else
{
parsedArgs.push_back(arg);
}
}
else
{
positionalArgs.push_back(arg);
}
}
}

bool hasArg(const std::string& name) const
{
return std::find(parsedArgs.begin(), parsedArgs.end(), name) != parsedArgs.end();
}

std::string getArgValue(const std::string& name, const std::string& defaultValue = "") const
{
auto it = argValues.find(name);
if (it != argValues.end())
return it->second;
return defaultValue;
}

const std::vector<std::string>& getPositionalArgs() const
{
return positionalArgs;
}

bool hasUnknownArgs() const
{
for (const auto& arg : parsedArgs)
{
if (argDescriptions.find(arg) == argDescriptions.end())
return true;
}
return false;
}

void printHelp() const
{
std::cout << "Available arguments:" << std::endl;
for (const auto& [name, descStruct] : argDescriptions)
{
std::cout << " " << name;
if (descStruct.hasValue)
std::cout << " <value>";
std::cout << " : " << descStruct.description << std::endl;
}
}

private:
struct ArgDesc
{
std::string description;
bool hasValue;
};
std::unordered_map<std::string, ArgDesc> argDescriptions;
std::vector<std::string> parsedArgs;
std::unordered_map<std::string, std::string> argValues;
std::vector<std::string> positionalArgs;
};
3 changes: 2 additions & 1 deletion examples/ref-dev-mqtt-pub/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ add_compile_definitions(MODULE_PATH="${OPENDAQ_MODULES_DIR}")

add_executable(${EXAMPLE_PROJECT_NAME} ref-dev-mqtt-pub.cpp)
add_dependencies(${EXAMPLE_PROJECT_NAME} daq::ref_device_module)
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
target_include_directories(${EXAMPLE_PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../)
Loading