Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
4e0d7ba
mqtt: new DataDescriptor approach (serialization/deserialization) for…
viacheslauK Oct 14, 2025
019a8a6
Disable openSLL dependency by default
Oct 14, 2025
f158bf3
Fix includes
Oct 14, 2025
874e706
Remove include and make connection attempt verbose
Oct 14, 2025
640d154
Stripped latest version of server module
Oct 14, 2025
c51831f
Fix boost dependencies
Oct 14, 2025
150a833
mqtt: new MQTT FB for raw data processing (MQTT msg -> signal binary …
viacheslauK Oct 15, 2025
563a857
mqtt: new approach for MQTT FB JSON parsing (JSON config)
viacheslauK Oct 16, 2025
1c15d51
mqtt: new example application for testing of customizable JSON
viacheslauK Oct 16, 2025
c653797
mqtt: autoparsing of timestamp
viacheslauK Oct 17, 2025
0f088f2
mqtt: support for signal without a domain
viacheslauK Oct 17, 2025
9444192
Merge branch 'client' into newApproach
viacheslauK Oct 21, 2025
ae413be
mqtt: json config example for client side
viacheslauK Oct 21, 2025
67e490b
mqtt: bugfix - moving creating of available FB types to device constr…
viacheslauK Oct 22, 2025
93328b2
mqtt: exeption if user tries to add more then one MQTT device
viacheslauK Oct 22, 2025
a563bf3
mqtt: correct name for MQTT device type
viacheslauK Oct 22, 2025
9097ab7
mqtt: topic validation for raw MQTT FB
viacheslauK Oct 23, 2025
eaa8258
mqtt: renaming and logs
viacheslauK Oct 23, 2025
004d6ee
mqtt: tests for Raw MQTT FB
viacheslauK Oct 24, 2025
af65ba3
mqtt: unnecessary include removing
viacheslauK Oct 24, 2025
1df968e
mqtt: new device property - DiscoveryTimeout; renaming InitDelay -> C…
viacheslauK Oct 24, 2025
5511ad5
mqtt: topic validation for JSON config
viacheslauK Oct 24, 2025
2ea3f11
mqtt: if JSON config for JSON FB contains a extraction field that fie…
viacheslauK Oct 28, 2025
c2b20a6
mqtt: check if a topic descriptor is present
viacheslauK Oct 28, 2025
87bb56e
mqtt: static methods for MQTT FBs for building signal names
viacheslauK Oct 28, 2025
9c29ae6
mqtt: const for method arguments
viacheslauK Oct 28, 2025
b046f7c
mqtt: JSON FB - check if subscriber is set, otherwise set a component…
viacheslauK Oct 28, 2025
1fd6587
mqtt: tests for the MQTT module
viacheslauK Oct 28, 2025
6e144e5
mqtt: tests for timestamp parsing
viacheslauK Oct 28, 2025
fe07c72
mqtt: tests for cheching a signal Unit in the JSON FB
viacheslauK Oct 28, 2025
b160797
mqtt: renaming
viacheslauK Oct 28, 2025
feebdc0
mqtt: helper for the module; filling in missing FBs properties with d…
viacheslauK Oct 28, 2025
40e5008
mqtt: implementing the use of a port for broker connection
viacheslauK Oct 29, 2025
cb42634
mqtt: ref-dev-mqtt-sub fixes
viacheslauK Oct 29, 2025
4b5ba32
mqtt: multidevice support; tests;
viacheslauK Oct 29, 2025
76a3bc5
mqtt: new returned value for MQTT commands
viacheslauK Oct 30, 2025
2398cff
mqtt: unsubscribing when a FB is removed or deleted
viacheslauK Oct 30, 2025
9799242
mqtt: mqtt class refactoring
viacheslauK Oct 31, 2025
774834c
mqtt: logs
viacheslauK Oct 31, 2025
1b8c012
mqtt: supporting for different value types: int64_t, double, string
viacheslauK Nov 4, 2025
a975fcd
mqtt: jsonFb - default value sample type SampleType::Float64 -> Sampl…
viacheslauK Nov 4, 2025
ca07804
mqtt: jsonFb signal list was aligned with json config
viacheslauK Nov 4, 2025
7f9d166
mqtt: tests for a string value in a JSON message
viacheslauK Nov 4, 2025
e41aa3a
mqtt: naming
viacheslauK Nov 5, 2025
16f0235
mqtt: tests
viacheslauK Nov 5, 2025
8bcaba9
mqtt: fixes for the ref-dev-mqtt-sub example
viacheslauK Nov 5, 2025
92f0e8c
mqtt: correct termination of the example applications; formatting
viacheslauK Nov 5, 2025
b3b0b36
mqtt: refactoring of example applications
viacheslauK Nov 5, 2025
80f997e
Merge remote-tracking branch 'origin/other/rpi-integration-fixes' int…
viacheslauK Nov 6, 2025
0b36e9f
mqtt: dependencies
viacheslauK Nov 7, 2025
9bdbf6f
mqtt: updating dependencies
viacheslauK Nov 13, 2025
76fa5c9
mqtt: removing unnecessary linkage from examples
viacheslauK Nov 13, 2025
802cc96
mqtt: refactoring
viacheslauK Nov 13, 2025
1a4e0d6
mqtt: sync disconnnection fix
viacheslauK Nov 13, 2025
e240597
mqtt: refactoring
viacheslauK Nov 13, 2025
ce2eaca
mqtt: {} for LOG_x
viacheslauK Nov 13, 2025
0d94e9e
mqtt: refactoring
viacheslauK Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
set(CMAKE_POLICY_VERSION_MINIMUM 3.5)
cmake_minimum_required(VERSION 3.25)

set(SDK_TARGET_NAMESPACE daq)
set(REPO_NAME mqtt_module)
set(REPO_OPTION_PREFIX MQTT_MODULE)

Expand All @@ -30,6 +31,7 @@ set(CMAKE_MESSAGE_CONTEXT_SHOW ON CACHE BOOL "Show CMake message context")

list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake")

option(OPENDAQ_MQTT_MODULE_ENABLE_SSL "Enable building with openSSL" OFF)
option(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS "Enable building example applications" OFF)

if ((CMAKE_COMPILER_IS_GNUCXX OR CMAKE_COMPILER_IS_CLANGXX) AND NOT MSVC)
Expand All @@ -42,17 +44,20 @@ include(CommonUtils)
setup_repo(${REPO_OPTION_PREFIX})

if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
set(DAQMODULES_REF_DEVICE_MODULE ON CACHE BOOL "" FORCE)
endif()

option(OPENDAQ_MQTT_ENABLE_TESTS "Enable module testing" OFF)
option(OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS "Enable example applications building" OFF)

find_package(OpenSSL REQUIRED)
if (OPENSSL_FOUND)
message(STATUS "Found OpenSSL ${OPENSSL_VERSION}")
else()
message(STATUS "OpenSSL Not Found")
if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
find_package(OpenSSL REQUIRED)
if (OPENSSL_FOUND)
message(STATUS "Found OpenSSL ${OPENSSL_VERSION}")
else()
message(FATAL_ERROR "OpenSSL Not Found")
endif()
add_compile_definitions(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
endif()

add_subdirectory(external)
Expand Down
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,6 @@ cmake_minimum_required(VERSION 3.16)
if (OPENDAQ_MQTT_ENABLE_EXAMPLE_APPS)
add_subdirectory(ref-dev-mqtt-pub)
add_subdirectory(ref-dev-mqtt-sub)
add_subdirectory(ref-dev-mqtt-raw-sub)
add_subdirectory(custom-mqtt-sub)
endif()
10 changes: 9 additions & 1 deletion examples/InputArgs.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,15 @@ class InputArgs
return false;
}

void setUsageHelp(const std::string& str)
{
usageString = str;
}

void printHelp() const
{
if (!usageString.empty())
std::cout << "Usage: " << usageString << std::endl;
std::cout << "Available arguments:" << std::endl;
for (const auto& [name, descStruct] : argDescriptions)
{
Expand All @@ -100,4 +107,5 @@ class InputArgs
std::vector<std::string> parsedArgs;
std::unordered_map<std::string, std::string> argValues;
std::vector<std::string> positionalArgs;
};
std::string usageString;
};
12 changes: 12 additions & 0 deletions examples/custom-mqtt-sub/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.16)

set(EXAMPLE_PROJECT_NAME "custom-mqtt-sub")

project(${EXAMPLE_PROJECT_NAME} LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)

add_subdirectory(src)
54 changes: 54 additions & 0 deletions examples/custom-mqtt-sub/pub-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"openDAQ/RefDev0/IO/AI/RefCh0/Sig/AI0": [
{
"AI0": {
"Value": "value",
"Timestamp": "timestamp",
"Unit": [
"V",
"volts",
"voltage"
]
}
}
],
"openDAQ/RefDev0/IO/AI/RefCh1/Sig/AI1": [
{
"AI1": {
"Value": "value",
"Timestamp": "timestamp",
"Unit": [
"V",
"volts",
"voltage"
]
}
}
],
"openDAQ/RefDev0/IO/AI/RefCh2/Sig/AI2": [
{
"AI2": {
"Value": "value",
"Timestamp": "timestamp",
"Unit": [
"V",
"volts",
"voltage"
]
}
}
],
"openDAQ/RefDev0/IO/AI/RefCh3/Sig/AI3": [
{
"AI3": {
"Value": "value",
"Timestamp": "timestamp",
"Unit": [
"V",
"volts",
"voltage"
]
}
}
]
}
8 changes: 8 additions & 0 deletions examples/custom-mqtt-sub/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
cmake_minimum_required(VERSION 3.16)

add_compile_definitions(MODULE_PATH="${OPENDAQ_MODULES_DIR}")
add_compile_definitions(APP_NAME="${EXAMPLE_PROJECT_NAME}")

add_executable(${EXAMPLE_PROJECT_NAME} custom-mqtt-sub.cpp)
target_link_libraries(${EXAMPLE_PROJECT_NAME} PRIVATE daq::opendaq)
target_include_directories(${EXAMPLE_PROJECT_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/../../)
183 changes: 183 additions & 0 deletions examples/custom-mqtt-sub/src/custom-mqtt-sub.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#include "../../InputArgs.h"
#include <iomanip>
#include <opendaq/opendaq.h>

#include <fstream>
#include <iostream>
#include <sstream>

using namespace daq;

struct ConfigStruct {
std::string brokerAddress;
std::string configFilePath;
bool exit = true;
int error = 0;
};

std::string to_string(uint64_t ts)
{
using namespace std::chrono;

system_clock::time_point tp = system_clock::time_point(microseconds(ts));

auto tt = system_clock::to_time_t(tp);
std::tm tm = *std::localtime(&tt);

auto us = duration_cast<milliseconds>(tp.time_since_epoch()) % 1000;

std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S") << '.' << std::setfill('0') << std::setw(3) << us.count();
return oss.str();
}

std::string to_string(daq::DataPacketPtr packet)
{
std::string result;
std::string data;
switch (packet.getDataDescriptor().getSampleType())
{
case SampleType::Float64:
data = std::to_string(*(static_cast<double*>(packet.getData())));
break;
case SampleType::UInt64:
data = std::to_string(*(static_cast<uint64_t*>(packet.getData())));
break;
case SampleType::Int64:
data = std::to_string(*(static_cast<int64_t*>(packet.getData())));
break;
case SampleType::Binary:
data = '\"' + std::string(static_cast<char*>(packet.getData()), packet.getDataSize()) + '\"';
break;
default:
break;
}
std::string unitStr;
if (auto unit = packet.getDataDescriptor().getUnit(); unit.assigned())
{
if (auto s = unit.getSymbol(); s.assigned())
unitStr = " " + s.toStdString();
}

result = fmt::format("SampleType : {}; Data: {}{};", convertSampleTypeToString(packet.getDataDescriptor().getSampleType()), data, unitStr);
if (auto domainPacket = packet.getDomainPacket(); domainPacket.assigned())
{
uint64_t ts = *(static_cast<uint64_t*>(domainPacket.getData()));
result += fmt::format(" Time : {};", to_string(ts));
}
return result;
}

std::string readFileToString(const std::string& filePath)
{
std::ifstream file(filePath);
if (!file)
throw std::runtime_error("Failed to open file: " + filePath);

std::ostringstream buffer;
buffer << file.rdbuf(); // Read the entire file buffer
return buffer.str();
}

ConfigStruct StartUp(int argc, char* argv[])
{
ConfigStruct config;
InputArgs args;
args.addArg("--help", "Show help message");
args.addArg("--address", "MQTT broker address", true);
args.setUsageHelp(APP_NAME " [options] <config file>");
args.parse(argc, argv);

if (args.hasArg("--help") || args.hasUnknownArgs())
{
args.printHelp();
config.error = 0;
return config;
}

config.brokerAddress = args.getArgValue("--address", "127.0.0.1");
auto configFilePath = args.getPositionalArgs();
if (configFilePath.size() != 1)
{
std::cout << "Configuration file path is required." << std::endl;
config.error = -1;
return config;
}
if (configFilePath.size() > 1)
{
std::cout << "Only one configuration file path is allowed. The first one will be used - " << configFilePath[0] << std::endl;
}
config.configFilePath = std::move(configFilePath[0]);
config.exit = false;
return config;
}

int main(int argc, char* argv[])
{
// Parse input arguments
auto appConfig = StartUp(argc, argv);
if (appConfig.exit)
{
return appConfig.error;
}

// Create OpenDAQ instance and add MQTT broker device
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
auto brokerDevice = instance.addDevice("daq.mqtt://" + appConfig.brokerAddress);
auto availableDeviceNodes = brokerDevice.getAvailableFunctionBlockTypes();

const std::string fbName = "@jsonMqttFb";
std::cout << "Try to add the " << fbName << std::endl;

// Read JSON function block configuration from file and fill out the function block config
const std::string jsonConfig = readFileToString(appConfig.configFilePath);
auto config = availableDeviceNodes.get(fbName).createDefaultConfig();
config.setPropertyValue("SignalList", jsonConfig);

// Add the JSON function block to the broker device
daq::FunctionBlockPtr jsonFb = brokerDevice.addFunctionBlock(fbName, config);

// Create packet readers for all signals
const auto signals = jsonFb.getSignals();
std::map<std::string, PacketReaderPtr> packetReaders;
for (const auto& s : signals)
{
packetReaders.emplace(std::pair<std::string, PacketReaderPtr>(s.getName().toStdString(), daq::PacketReader(s)));
}

// Start a thread to read packets from the readers
std::atomic<bool> running = true;
std::thread readerThread(
[&packetReaders, &running]()
{
while (running)
{
for (const auto& [signal, reader] : packetReaders)
{
while (!reader.getEmpty() && running)
{
auto packet = reader.read();
if (packet.getType() == PacketType::Event)
{
std::cout << "Event packet is skipped!" << std::endl;
}
else if (packet.getType() == PacketType::Data)
{
const auto dataPacket = packet.asPtr<IDataPacket>();
std::cout << signal << " - " << to_string(dataPacket) << std::endl;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
});

std::cout << "Press \"enter\" to exit the application..." << std::endl;
std::cin.get();

running = false;
readerThread.join();
std::cout << "Reader thread finished. Exiting.\n";

return 0;
}
Loading